Source code for pept.tracking.space_transformers

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : space_transformers.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date   : 11.08.2021


import  textwrap

import  numpy               as      np
from    scipy.interpolate   import  interp1d
from    scipy.spatial       import  KDTree

import  pept
from    pept                import  LineData, PointData, Voxels
from    pept.base           import  LineDataFilter, PointDataFilter, Reducer

from    pept.base.sampling_extensions import (
    samples_indices_adaptive_window_ext,
)





[docs]class Voxelize(LineDataFilter): '''Asynchronously voxelize samples of lines from a `pept.LineData`. Filter signature: :: LineData -> Voxelize.fit_sample -> PointData This filter is much more memory-efficient than voxelizing all samples of LoRs at once - which often overflows the available memory. Most often this is used alongside voxel-based tracking algorithms, e.g. ``pept.tracking.FPI``: >>> from pept.tracking import * >>> pipeline = pept.Pipeline([ >>> Voxelize((50, 50, 50)), >>> FPI(3, 0.4), >>> Stack(), >>> ]) Parameters ---------- number_of_voxels : 3-tuple A tuple-like containing exactly three integers specifying the number of voxels to be used in each dimension. xlim : (2,) list[float], optional The lower and upper boundaries of the voxellised volume in the x-dimension, formatted as [x_min, x_max]. If undefined, it is inferred from the bounding box of each sample of lines. ylim : (2,) list[float], optional The lower and upper boundaries of the voxellised volume in the y-dimension, formatted as [y_min, y_max]. If undefined, it is inferred from the bounding box of each sample of lines. zlim : (2,) list[float], optional The lower and upper boundaries of the voxellised volume in the z-dimension, formatted as [z_min, z_max]. If undefined, it is inferred from the bounding box of each sample of lines. set_lims : (N, 7) numpy.ndarray or pept.LineData, optional If defined, set the system limits upon creating the class to the bounding box of the lines in `set_lims`. '''
[docs] def __init__( self, number_of_voxels, xlim = None, ylim = None, zlim = None, set_lims = None, ): # Type-checking inputs number_of_voxels = np.asarray( number_of_voxels, order = "C", dtype = int ) if number_of_voxels.ndim != 1 or len(number_of_voxels) != 3: raise ValueError(textwrap.fill(( "The input `number_of_voxels` must be a list-like " "with exactly three values, corresponding to the " "number of voxels in the x-, y-, and z-dimension. " f"Received parameter with shape {number_of_voxels.shape}." ))) if (number_of_voxels < 2).any(): raise ValueError(textwrap.fill(( "The input `number_of_voxels` must set at least two " "voxels in each dimension (i.e. all elements in " "`number_of_elements` must be larger or equal to two). " f"Received `{number_of_voxels}`." ))) # Keep track of limits we have to set set_xlim = True set_ylim = True set_zlim = True if xlim is not None: set_xlim = False xlim = np.asarray(xlim, dtype = float) if xlim.ndim != 1 or len(xlim) != 2: raise ValueError(textwrap.fill(( "The input `xlim` parameter must be a list with exactly " "two values, corresponding to the minimum and maximum " "coordinates of the voxel space in the x-dimension. " f"Received parameter with shape {xlim.shape}." ))) if ylim is not None: set_ylim = False ylim = np.asarray(ylim, dtype = float) if ylim.ndim != 1 or len(ylim) != 2: raise ValueError(textwrap.fill(( "The input `ylim` parameter must be a list with exactly " "two values, corresponding to the minimum and maximum " "coordinates of the voxel space in the y-dimension. " f"Received parameter with shape {ylim.shape}." ))) if zlim is not None: set_zlim = False zlim = np.asarray(zlim, dtype = float) if zlim.ndim != 1 or len(zlim) != 2: raise ValueError(textwrap.fill(( "The input `zlim` parameter must be a list with exactly " "two values, corresponding to the minimum and maximum " "coordinates of the voxel space in the z-dimension. " f"Received parameter with shape {ylim.shape}." ))) # Setting class attributes self._number_of_voxels = tuple(number_of_voxels) self._xlim = xlim self._ylim = ylim self._zlim = zlim if set_lims is not None: if isinstance(set_lims, LineData): self.set_lims(set_lims.lines, set_xlim, set_ylim, set_zlim) else: self.set_lims(set_lims, set_xlim, set_ylim, set_zlim)
[docs] def set_lims( self, lines, set_xlim = True, set_ylim = True, set_zlim = True, ): lines = np.asarray(lines, dtype = float, order = "C") if set_xlim: xlim = Voxels.get_cutoff(lines[:, 1], lines[:, 4]) if set_ylim: ylim = Voxels.get_cutoff(lines[:, 2], lines[:, 5]) if set_zlim: zlim = Voxels.get_cutoff(lines[:, 3], lines[:, 6]) self._xlim = xlim self._ylim = ylim self._zlim = zlim
@property def number_of_voxels(self): return self._number_of_voxels @property def xlim(self): return self._xlim @property def ylim(self): return self._ylim @property def zlim(self): return self._zlim
[docs] def fit_sample(self, sample_lines): if not isinstance(sample_lines, LineData): sample_lines = LineData(sample_lines) vox = Voxels.from_lines( sample_lines.lines, self.number_of_voxels, self.xlim, self.ylim, self.zlim, verbose = False, ) # Save the constituent LoRs as a hidden attribute vox.attrs["_lines"] = sample_lines return vox
[docs]class Interpolate(PointDataFilter): '''Interpolate between data points at a fixed sampling rate; useful for Eulerian fields computation. Filter signature: :: PointData -> Interpolate.fit_sample -> PointData By default, the linear interpolator `scipy.interpolate.interp1d` is used. You can specify a different interpolator and keyword arguments to send it. E.g. nearest-neighbour interpolation: ``Interpolate(1., kind = "nearest")`` or cubic interpolation: ``Interpolate(1., kind = "cubic")``. All data columns except timestamps are interpolated. '''
[docs] def __init__(self, timestep, interpolator = interp1d, **kwargs): self.timestep = float(timestep) self.interpolator = interpolator self.kwargs = kwargs
[docs] def fit_sample(self, sample): if not isinstance(sample, PointData): sample = PointData(sample) if not len(sample): return sample[0:0] # Create interpolators for each dimension (column) wrt time interps = [ self.interpolator( sample.points[:, 0], sample.points[:, i], **self.kwargs, ) for i in range(1, sample.points.shape[1]) ] # Sampling points, between the first and last timestamps sampling = np.arange(sample.points[0, 0], sample.points[-1, 0], self.timestep) data = np.vstack([sampling] + [interp(sampling) for interp in interps]) return sample.copy(data = data.T)
[docs]class Reorient(Reducer): '''Rotate a dataset such that it is oriented according to its principal axes. Reducer signature: :: PointData -> Reorient.fit -> PointData list[PointData] -> Reorient.fit -> PointData np.ndarray -> Reorient.fit -> PointData By default, this reducer reorients the points such that the axis along which it is most spread out (e.g. lengthwise in a pipe) becomes the X-axis. The input argument `dimensions` sets this - the default `"xyz"` can be changed to e.g. `"zyx"` so that the longest data axis becomes the Z-axis. The reducer also sets three attributes on the returned `PointData`: - `origin`: the origin relative to which the initial data was rotated. - `basis`: the principal components - or change of basis 3x3 matrix. - `eigenvalues`: how spread out the data is in each initial dimension. If you'd like to reorient a second dataset to the same basis as a first one, set the `basis` and `origin` arguments. *New in pept-0.5.0* Examples -------- Reorient a dataset by aligning the longest principal component (e.g. lengthwise in a pipe) to the X-axis: >>> import pept.tracking as pt >>> data = PointData(...) >>> reoriented = pt.Reorient().fit(data) Reorient it such that the longest principal component (e.g. vertical in a mixer) becomes the Z-axis: >>> reoriented = pt.Reorient("zyx").fit(data) Reorient a second dataset to the same orientation basis as the first one: >>> reoriented2 = pt.Reorient( >>> basis = reoriented.attrs["basis"], >>> origin = reoriented.attrs["origin"], >>> ).fit(other_data) '''
[docs] def __init__(self, dimensions = "xyz", basis = None, origin = None): # Type-checking inputs self.dimensions = np.array([0, 0, 0]) if isinstance(dimensions, str): d = str(dimensions) if len(d) != 3 or d[0] not in "xyz" or d[1] not in "xyz" or \ d[2] not in "xyz": raise ValueError(textwrap.fill(( "The input `dimensions`, if given as a str, must have " "exactly three characters containing 'x', 'y', or 'z'; " f"e.g. 'xyz', 'zxy'. Received `{d}`." ))) # Transform x -> 1, y -> 2, z -> 3 using ASCII integer `ord`er self.dimensions[0] = ord(d[0]) - ord('x') + 1 self.dimensions[1] = ord(d[1]) - ord('x') + 1 self.dimensions[2] = ord(d[2]) - ord('x') + 1 else: d = np.asarray(dimensions, dtype = int) if d.ndim != 1 or len(d) != 3: raise ValueError(textwrap.fill(( "The input `dimensions`, if given as a list, must contain " "exactly three integers representing the column indices " "to use; e.g. `[1, 2, 3]` for xyz, `[3, 1, 2]` for zxy. " f"Received `{d}`." ))) self.dimensions[0] = d[0] self.dimensions[1] = d[1] self.dimensions[2] = d[2] if (basis is None and origin is not None) or \ (basis is not None and origin is None): raise ValueError(textwrap.fill(( "If a change of `basis` matrix is given, an `origin` is " "required too to do the rotation relative to." ))) if basis is not None: basis = np.asarray(basis, dtype = float) if basis.ndim != 2 or basis.shape != (3, 3): raise ValueError(textwrap.fill(( "The input `basis`, if defined, must be a 3x3 matrix. " f"Received `{basis.shape}` matrix." ))) if origin is not None: origin = np.asarray(origin, dtype = float) if origin.ndim != 1 or basis.shape[0] != 3: raise ValueError(textwrap.fill(( "The input `origin`, if defined, must be a (3,) vector. " f"Received `{origin.shape}` matrix." ))) self.basis = basis self.origin = origin
[docs] def fit(self, samples): # Reduce / stack list of samples onto a single PointData / array samples = pept.tracking.Stack().fit(samples) if not isinstance(samples, PointData): samples = PointData(samples) points = samples.points[:, 1:4] # Compute principal components - i.e. eigenvectors of covariance matrix if self.basis is None: points_mean = points.mean(axis = 0) points_centred = points - points_mean cov = np.cov(points_centred.T) evals, evecs = np.linalg.eig(cov) # Order eigenvectors such that the largest eigenvalue (i.e. the # dimensions that's most spread out) corresponds to the first of # `self.dimesions` reorder = evals.argsort()[2 - self.dimensions.argsort()] eigenvalues = evals[reorder] basis = evecs[:, reorder] else: points_mean = self.origin points_centred = points - points_mean basis = self.basis eigenvalues = None rotpoints = samples.points.copy() rotpoints[:, 1:4] = points_mean + points_centred @ basis return samples.copy( data = rotpoints, sample_size = samples.sample_size, overlap = samples.overlap, eigenvalues = eigenvalues, basis = basis, origin = points_mean, )
class Center(pept.base.Reducer): '''Center a dataset around `origin` (default [0, 0, 0]). Reducer signature: :: PointData -> Center.fit -> PointData list[PointData] -> Center.fit -> PointData np.ndarray -> Center.fit -> PointData *New in pept-0.5.1* ''' def __init__(self, origin = None): if origin is not None: self.origin = np.asarray(origin) if self.origin.ndim != 1 or len(self.origin) != 3: raise ValueError(textwrap.fill(( "The input `origin` must be a 1D vector-like with exactly " f"3 elements [x, y, z]. Received {self.origin}." ))) else: self.origin = origin def fit(self, sample): sample = pept.tracking.Stack().fit(sample) if not isinstance(sample, pept.PointData): sample = pept.PointData(sample) if self.origin is None: self.origin = np.nanmedian(sample.points[:, 1:4], axis = 0) centered = sample.points.copy() centered[:, 1:4] -= self.origin return sample.copy( data = centered, sample_size = sample.sample_size, overlap = sample.overlap, )
[docs]class OutOfViewFilter(pept.base.Reducer): '''Remove tracer locations that are sparse *in time* - ie the ``k``-th nearest detection is later than ``max_time``. Reducer signature: :: PointData -> OutOfViewFilter.fit -> PointData list[PointData] -> OutOfViewFilter.fit -> PointData numpy.ndarray -> OutOfViewFilter.fit -> PointData This reducer (i.e. stacks all data samples, then processes it) is useful when the tracer goes out of the PEPT scanners and there are a few sparse noisy detections to remove. *New in pept-0.5.1* Examples -------- Select only tracer locations whose next detection is within 200 ms. >>> import pept >>> import pept.tracking as pt >>> trajectories = pept.PointData(...) >>> # Only keep points whose next detection is within 200 ms >>> inview = pt.OutOfViewFilter(max_time = 200.).fit(trajectories) '''
[docs] def __init__(self, max_time = 200., k = 5): self.max_time = float(max_time) self.k = int(k)
[docs] def fit(self, samples): # Reduce / stack list of samples onto a single PointData / array samples = pept.tracking.Stack().fit(samples) if not isinstance(samples, pept.base.PEPTObject): samples = pept.PointData(samples) # Sort points by time points = samples.data[samples.data[:, 0].argsort()] # Select only points whose k-th nearest detection is sooner than # `max_time` times = points[:, 0][:, None] tree = KDTree(times) dt, _ = tree.query(times, [self.k + 1]) nonsparse = points[dt[:, 0] < self.max_time] return samples.copy(data = nonsparse)
[docs]class RemoveStatic(pept.base.Reducer): '''Remove parts of a `PointData` where the tracer remains static. Reducer signature: :: PointData -> OutOfViewFilter.fit -> PointData list[PointData] -> OutOfViewFilter.fit -> PointData numpy.ndarray -> OutOfViewFilter.fit -> PointData If there is a `time_window` in which the tracer does not move more than `max_distance`, it is removed. The distances moved are computed relative to the average position within each time window; to make the reducer more robust to noise, the given distance `quantile` is compared to `max_distance`. *New in pept-0.5.2* Examples -------- Given some trajectories from e.g. a long experiment where the particle may have got stuck at some points, we can remove the static windows with: .. code-block:: python import pept import pept.tracking as pt trajectories = ... # Remove positions that spent more than 2 seconds without moving more # than 20 mm trajectories_nonstatic = RemoveStatic( time_window = 2000, max_distance = 20, ).fit(trajectories) This reducer, like the rest in `pept.tracking`, can be chained into a pipeline, for example: .. code-block:: python import pept import pept.tracking as pt pipeline = pept.Pipeline([ # Remove positions with high errors pt.Condition("error < 20"), # Remove tracers that got stuck pt.RemoveStatic(time_window = 2000, max_distance = 20), # Trajectory separation pt.Segregate(window = 20, cut_distance = 15), # Group each trajectory into its own sample, then stack them pt.GroupBy("label"), pt.Stack(), ]) '''
[docs] def __init__(self, time_window, max_distance, quantile = 0.9): self.time_window = float(time_window) self.max_distance = float(max_distance) self.quantile = float(quantile)
[docs] def fit(self, samples): if not isinstance(samples, PointData): samples = PointData(samples) # Compute index ranges of points within time window if self.time_window > samples.points[-1, 0] - samples.points[0, 0]: samples_indices = np.array([[0, len(samples.points)]]) else: samples_indices = samples_indices_adaptive_window_ext( samples.points, self.time_window, 0, np.iinfo(np.int32).max, ) # For each time window, compute the radial distance of points kept = [] for i, irow in enumerate(samples_indices): coords = samples.points[irow[0]:irow[1], 1:4] dists = np.linalg.norm(coords - coords.mean(axis=0), axis = 1) if np.quantile(dists, self.quantile) > self.max_distance: kept.append(i) kept = set(kept) # Extract points in windows whose neighbouring windows are to be kept kept_points = [] if len(kept) == 1: irow = samples_indices[kept.pop()] kept_points.append(samples.points[irow[0]:irow[1]]) else: for i, irow in enumerate(samples_indices): # Next window is also included if i == 0 and i in kept and i + 1 in kept: kept_points.append(samples.points[irow[0]:irow[1]]) # Previous window is also included elif i == len(kept) - 1 and i in kept and i - 1 in kept: kept_points.append(samples.points[irow[0]:irow[1]]) # Both previous and next window are also included elif ( i != 0 and i != len(kept) - 1 and i in kept and i - 1 in kept and i + 1 in kept ): kept_points.append(samples.points[irow[0]:irow[1]]) # Stack remaining points if len(kept_points): kept_points = np.vstack(kept_points) else: kept_points = np.empty((0, samples.points.shape[1])) return samples.copy( data = kept_points, sample_size = None, overlap = None, )