Source code for jnormcorre.utils.lazy_array
from typing import *
from abc import ABC, abstractmethod
import numpy as np
[docs]
class lazy_data_loader(ABC):
"""
This captures the numpy array-like functionality that all data loaders for motion correction need to contain
Key: To implement support for a new file type, you just need to specify the key properties below (dtype, shape, ndim)
and then implement the function _compute_at_indices.
Adapted from mesmerize core: https://github.com/nel-lab/mesmerize-core/blob/master/mesmerize_core/arrays/_base.py
"""
@property
@abstractmethod
def dtype(self) -> str:
"""
data type
"""
pass
@property
@abstractmethod
def shape(self) -> Tuple[int, int, int]:
"""
Array shape (n_frames, dims_x, dims_y)
"""
pass
@property
def ndim(self) -> int:
"""
Number of dimensions
"""
return len(self.shape)
[docs]
def __getitem__(
self,
item: Union[int, list, np.ndarray, Tuple[Union[int, np.ndarray, slice, range]]],
):
# Step 1: index the frames (dimension 0)
if isinstance(item, tuple):
if len(item) > len(self.shape):
raise IndexError(
f"Cannot index more dimensions than exist in the array. "
f"You have tried to index with <{len(item)}> dimensions, "
f"only <{len(self.shape)}> dimensions exist in the array"
)
frame_indexer = item[0]
else:
frame_indexer = item
# Step 2: Do some basic error handling for frame_indexer before using it to slice
if isinstance(frame_indexer, np.ndarray):
frame_indexer = frame_indexer.tolist()
if isinstance(frame_indexer, list):
pass
elif isinstance(frame_indexer, int):
pass
# numpy int scaler
elif isinstance(frame_indexer, np.integer):
frame_indexer = frame_indexer.item()
# treat slice and range the same
elif isinstance(frame_indexer, (slice, range)):
start = frame_indexer.start
stop = frame_indexer.stop
step = frame_indexer.step
if start is not None:
if start > self.shape[0]:
raise IndexError(
f"Cannot index beyond `n_frames`.\n"
f"Desired frame start index of <{start}> "
f"lies beyond `n_frames` <{self.shape[0]}>"
)
if stop is not None:
if stop > self.shape[0]:
raise IndexError(
f"Cannot index beyond `n_frames`.\n"
f"Desired frame stop index of <{stop}> "
f"lies beyond `n_frames` <{self.shape[0]}>"
)
if step is None:
step = 1
# convert indexer to slice if it was a range, allows things like decord.VideoReader slicing
frame_indexer = slice(start, stop, step) # in case it was a range object
else:
raise IndexError(
f"Invalid indexing method, " f"you have passed a: <{type(item)}>"
)
# Step 3: Now slice the data with frame_indexer (careful: if the ndims has shrunk, add a dim)
frames = self._compute_at_indices(frame_indexer)
if len(frames.shape) < len(self.shape):
frames = np.expand_dims(frames, axis=0)
# Step 4: Deal with remaining indices after lazy computing the frame(s)
if isinstance(item, tuple):
if len(item) == 2:
frames = frames[:, item[1]]
elif len(item) == 3:
frames = frames[:, item[1], item[2]]
return frames.squeeze()
[docs]
@abstractmethod
def _compute_at_indices(self, indices: Union[list, int, slice]) -> np.ndarray:
"""
Lazy computation logic goes here to return frames. Slices the array over time (dimension 0) at the desired indices.
Parameters
----------
indices: Union[list, int, slice]
the user's desired way of picking frames, either an int, list of ints, or slice
i.e. slice object or int passed from `__getitem__()`
Returns
-------
np.ndarray
array at the indexed slice
"""
pass