Source code for z5py.util
from __future__ import print_function
from itertools import product
from concurrent import futures
from contextlib import closing
from datetime import datetime
import numpy as np
from .file import File
# ND blocking generator
[docs]def blocking(shape, block_shape):
""" Generator for nd blocking.
"""
if len(shape) != len(block_shape):
raise RuntimeError("Invalid number of dimensions.")
ranges = [range(sha // bsha if sha % bsha == 0 else sha // bsha + 1)
for sha, bsha in zip(shape, block_shape)]
start_points = product(*ranges)
for start_point in start_points:
positions = [sp * bshape for sp, bshape in zip(start_point, block_shape)]
yield tuple(slice(pos, min(pos + bsha, sha))
for pos, bsha, sha in zip(positions, block_shape, shape))
[docs]def rechunk(in_path,
out_path,
in_path_in_file,
out_path_in_file,
out_chunks,
n_threads,
out_blocks=None,
dtype=None,
use_zarr_format=None,
**new_compression):
""" Copy and rechunk a dataset.
The input dataset will be copied to the output dataset chunk by chunk.
Allows to change datatype, file format and compression as well.
Args:
in_path (str): path to the input file.
out_path (str): path to the output file.
in_path_in_file (str): name of input dataset.
out_path_in_file (str): name of output dataset.
out_chunks (tuple): chunks of the output dataset.
n_threads (int): number of threads used for copying.
out_blocks (tuple): blocks used for copying. Must be a multiple
of ``out_chunks``, which are used by default (default: None)
dtype (str): datatype of the output dataset, default does not change datatype (default: None).
use_zarr_format (bool): file format of the output file,
default does not change format (default: None).
**new_compression: compression library and options for output dataset. If not given,
the same compression as in the input is used.
"""
f_in = File(in_path)
# check if the file format was specified
# if not, keep the format of the input file
# otherwise set the file format
is_zarr = f_in.is_zarr if use_zarr_format is None else use_zarr_format
f_out = File(out_path, use_zarr_format=is_zarr)
# if we don't have out-blocks explitictly given,
# we iterate over the out chunks
if out_blocks is None:
out_blocks = out_chunks
ds_in = f_in[in_path_in_file]
# if no out dtype was specified, use the original dtype
if dtype is None:
dtype = ds_in.dtype
shape = ds_in.shape
compression_opts = new_compression if new_compression else ds_in.compression_options
ds_out = f_out.create_dataset(out_path_in_file,
dtype=dtype,
shape=shape,
chunks=out_chunks,
**compression_opts)
def write_single_chunk(roi):
data_in = ds_in[roi].astype(dtype, copy=False)
if np.sum(data_in) == 0:
return
ds_out[roi] = data_in
with futures.ThreadPoolExecutor(max_workers=n_threads) as tp:
tasks = [tp.submit(write_single_chunk, roi)
for roi in blocking(shape, out_blocks)]
[t.result() for t in tasks]
# copy attributes
in_attrs = ds_in.attrs
out_attrs = ds_out.attrs
for key, val in in_attrs.items():
out_attrs[key] = val
[docs]class Timer(object):
def __init__(self):
self.start_time = None
self.stop_time = None
@property
def elapsed(self):
try:
return (self.stop_time - self.start_time).total_seconds()
except TypeError as e:
if "'NoneType'" in str(e):
raise RuntimeError("{} either not started, or not stopped".format(self))
[docs] def start(self):
self.start_time = datetime.utcnow()
[docs] def stop(self):
self.stop_time = datetime.utcnow()
return self.elapsed
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
[docs]def fetch_test_data_stent():
from imageio import volread
data_i16 = volread('imageio:stent.npz')
return (data_i16 / data_i16.max() * 255).astype(np.uint8)
[docs]def fetch_test_data():
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
try:
from io import BytesIO as Buffer
except ImportError:
from StringIO import StringIO as Buffer
import zipfile
from imageio import volread
im_url = "https://imagej.nih.gov/ij/images/t1-head-raw.zip"
with closing(urlopen(im_url)) as response:
if response.status != 200:
raise RuntimeError("Test data could not be found at {}, status code {}".format(
im_url, response.status
))
zip_buffer = Buffer(response.read())
with zipfile.ZipFile(zip_buffer) as zf:
tif_buffer = Buffer(zf.read('JeffT1_le.tif'))
return np.asarray(volread(tif_buffer, format='tif'), dtype=np.uint8)