Source code for pept.processing.mixing

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : mixing.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date   : 24.08.2022


import  os
import  textwrap
from    numbers             import  Number


import  numpy               as      np
import  scipy
from    scipy.interpolate   import  interp1d

from    tqdm                import  tqdm
from    joblib              import  Parallel, delayed

from    pept                import  PointData, Voxels
from    pept.base           import  Reducer
from    pept                import  plots

from    plotly.subplots     import  make_subplots
import  plotly.graph_objs   as      go




def enlarge(lim):
    '''Correctly enlarge two limits, e.g. [xmin, xmax]
    '''
    eps = np.finfo(float).resolution
    lim = lim.copy()

    if lim[0] < 0:
        lim[0] = lim[0] * 1.0001 - eps
    else:
        lim[0] = lim[0] * 0.9999 - eps

    if lim[1] < 0:
        lim[1] = lim[1] * 0.9999 + eps
    else:
        lim[1] = lim[1] * 1.0001 + eps

    return lim




def _check_point3d(**kwargs):
    # Extract first argument
    for name, point in kwargs.items():
        break

    if point.ndim != 1 or len(point) != 3:
        raise ValueError(textwrap.fill((
            f"The input point `{name}` should be a vector-like with exactly 3 "
            f"elements. Received `{point}`."
        )))




def _parallel(f, samples, executor, max_workers, desc = "", verbose = True):
    if executor == "sequential":
        if verbose:
            samples = tqdm(samples, desc = desc)

        return [f(s) for s in samples]

    elif executor == "joblib":
        if verbose:
            samples = tqdm(samples, desc = desc)

        # Joblib's `max_workers` behaviour is different than for Executors.
        # The latter simply sets the maximum number of threads available,
        # while the former uses a max_worker=1. Make behaviour consistent.
        if max_workers is None:
            max_workers = os.cpu_count()

        return Parallel(n_jobs = max_workers)(delayed(f)(s) for s in samples)

    else:
        # Otherwise assume `executor` is a `concurrent.futures.Executor`
        # subclass (e.g. ProcessPoolExecutor, MPIPoolExecutor).
        with executor(max_workers = max_workers) as exe:
            futures = [exe.submit(f, s) for s in samples]

            if verbose:
                futures = tqdm(futures, desc = desc)

            return [fut.result() for fut in futures]




[docs]class LaceyColors(Reducer): '''Compute Lacey-like mixing image, with streamlines passing through plane 1 being split into Red and Blue tracers, then evaluated into RGB pixels at a later point in plane 2. Intuitively, red and blue pixels will contain unmixed streamlines, while purple pixels will indicate mixing. Reducer signature: :: PointData -> LaceyColors.fit -> (height, width, 3) pept.Voxels list[PointData] -> LaceyColors.fit -> (height, width, 3) pept.Voxels list[np.ndarray] -> LaceyColors.fit -> (height, width, 3) pept.Voxels **Each sample in the input `PointData` is treated as a separate streamline / tracer pass. You can group passes using `Segregate + GroupBy("label")`**. The first plane where tracers are split into Red and Blue streamlines is defined by a point `p1` and direction axis `ax1`. **The point `p1` should be the middle of the pipe**. The second plane where mixing is evaluated is similarly defined by `p2` and `ax2`. **The point `p2` should be the middle of the volume / pipe**. If the direction vectors `ax1` and `ax2` are undefined (`None`), the tracers are assumed to follow a straight line between `p1` and `p2`. The `max_distance` parameter defines the maximum distance allowed between a point and a plane to be considered part of it. The `resolution` defines the number of pixels in the height and width of the resulting image. *New in pept-0.5.1* Examples -------- Consider a pipe-flow experiment, with tracers moving from side to side in multiple passes / streamlines. First locate the tracers, then split their trajectories into each individual pass: >>> import pept >>> from pept.tracking import * >>> >>> split_pipe = pept.Pipeline([ >>> Segregate(window = 10, max_distance = 20), # Appends label column >>> GroupBy("label"), # Splits into samples >>> Reorient(), # Align with X axis >>> Center(), # Center points at 0 >>> Stack(), >>> ]) >>> streamlines = split_pipe.fit(trajectories) Now each sample in `streamlines` corresponds to a single tracer pass, e.g. `streamlines[0]` is the first pass, `streamlines[1]` is the second. The passes were reoriented and centred such that the pipe is aligned with the X axis. Now the `LaceyColors` reducer can be used to create an image of the mixing between the pipe entrance and exit: >>> from pept.processing import LaceyColors >>> entrance = [-100, 0, 0] # Pipe data was aligned with X and centred >>> exit = [100, 0, 0] >>> lacey_image = LaceyColors(entrance, exit).fit(streamlines) >>> print(lacey_image.voxels) # RGB channels of image (8, 8, 3) Now the image can be visualised e.g. with Plotly: >>> from pept.plots import PlotlyGrapher2D >>> PlotlyGrapher2D().add_image(lacey_image).show() '''
[docs] def __init__( self, p1, p2, ax1 = None, ax2 = None, basis1 = None, basis2 = None, xlim = None, ylim = None, max_distance = 10, resolution = (8, 8), ): self.p1 = np.asarray(p1, dtype = float) _check_point3d(p1 = self.p1) if ax1 is not None: ax1 = np.asarray(ax1, dtype = float) ax1 = ax1 / np.linalg.norm(ax1) _check_point3d(ax1 = ax1) self.ax1 = ax1 self.p2 = np.asarray(p2, dtype = float) _check_point3d(p2 = self.p2) if ax2 is not None: ax2 = np.asarray(ax2, dtype = float) ax2 = ax2 / np.linalg.norm(ax2) _check_point3d(ax2 = ax2) self.ax2 = ax2 if basis1 is not None: basis1 = np.asarray(basis1, dtype = float) if basis1.ndim != 2 or len(basis1) != 3 or basis1.shape[1] != 2: raise ValueError(textwrap.fill(( "The input `basis1`, if defined, should be two column " "vectors - i.e. a (3, 2) matrix. Received " f"`basis1.shape={basis1.shape}`." ))) self.basis1 = basis1 if basis2 is not None: basis2 = np.asarray(basis2, dtype = float) if basis2.ndim != 2 or len(basis2) != 3 or basis2.shape[1] != 2: raise ValueError(textwrap.fill(( "The input `basis2`, if defined, should be two column " "vectors - i.e. a (3, 2) matrix. Received " f"`basis2.shape={basis2.shape}`." ))) self.basis2 = basis2 if xlim is not None: xlim = np.asarray(xlim, dtype = float) if xlim.ndim != 1 or len(xlim) != 2: raise ValueError(textwrap.fill(( "The input `xlim`, if given, should be a vector-like with " f"two values [xmin, xmax]. Received `{xlim}`." ))) self.xlim = xlim if ylim is not None: ylim = np.asarray(ylim, dtype = float) if ylim.ndim != 1 or len(ylim) != 2: raise ValueError(textwrap.fill(( "The input `ylim`, if given, should be a vector-like with " f"two values [ymin, ymax]. Received `{ylim}`." ))) self.ylim = ylim self.max_distance = float(max_distance) self.resolution = tuple(resolution)
[docs] def fit( self, trajectories, executor = "joblib", max_workers = None, verbose = True, ): if not isinstance(trajectories, PointData): try: trajectories = PointData(trajectories) except (ValueError, TypeError): raise TypeError(textwrap.fill(( "The input `trajectories` must be pept.PointData-like. " f"Received `{type(trajectories)}`." ))) if self.ax1 is None: self.ax1 = self.p2 - self.p1 if self.ax2 is None: self.ax2 = self.p2 - self.p1 self.ax1 = self.ax1 / np.linalg.norm(self.ax1) self.ax2 = self.ax2 / np.linalg.norm(self.ax2) # Aliases p1 = self.p1 ax1 = self.ax1 p2 = self.p2 ax2 = self.ax2 max_distance = self.max_distance resolution = self.resolution # For each pass, find the closest point to the start and end planes points_start = [] points_end = [] for traj in trajectories: xyz = traj.points[:, 1:4] xyz = xyz[np.isfinite(xyz).all(axis = 1)] dists_start = np.abs(np.dot(xyz - p1, ax1)) dists_end = np.abs(np.dot(xyz - p2, ax2)) min_dist_start = np.min(dists_start) min_dist_end = np.min(dists_end) if min_dist_start < max_distance and min_dist_end < max_distance: points_start.append(xyz[np.argmin(dists_start)]) points_end.append(xyz[np.argmin(dists_end)]) points_start = np.array(points_start) points_end = np.array(points_end) # Split starting points into two species - Red and Blue - along the PCA points_start_centred = points_start - p1 if self.basis1 is None: cov = np.cov(points_start_centred.T) evals, evecs = np.linalg.eig(cov) evals_argsorted = evals.argsort()[::-1] evals = evals[evals_argsorted] self.basis1 = evecs[:, evals_argsorted][:, :2] points_start2d = points_start_centred @ self.basis1 # Project ending points onto 2D plane by the Principal Components points_end_centred = points_end - p2 if self.basis2 is None: cov = np.cov(points_end_centred.T) evals, evecs = np.linalg.eig(cov) evals_argsorted = evals.argsort()[::-1] evals = evals[evals_argsorted] self.basis2 = evecs[:, evals_argsorted][:, :2] points_end2d = points_end_centred @ self.basis2 # Create RGB image with the R and B channels == the number of passes ired = points_start2d[:, 0] < 0 iblu = points_start2d[:, 0] >= 0 # Compute image physical limits limits = np.c_[points_end2d.min(axis = 0), points_end2d.max(axis = 0)] if self.xlim is None: self.xlim = enlarge(limits[0, :]) if self.ylim is None: self.ylim = enlarge(limits[1, :]) xlim = self.xlim ylim = self.ylim xsize = (xlim[1] - xlim[0]) / resolution[0] ysize = (ylim[1] - ylim[0]) / resolution[1] red_mix = np.zeros(resolution) blu_mix = np.zeros(resolution) # Find indices within pixels red_ix = ((points_end2d[ired, 0] - xlim[0]) / xsize).astype(int) red_iy = ((points_end2d[ired, 1] - ylim[0]) / ysize).astype(int) # Ensure all red pixel indices are within bounds mask = ( (red_ix >= 0) & (red_ix < resolution[0]) & (red_iy >= 0) & (red_iy < resolution[1]) ) red_ix = red_ix[mask] red_iy = red_iy[mask] for i in range(len(red_ix)): red_mix[red_ix[i], red_iy[i]] += 1 blu_ix = ((points_end2d[iblu, 0] - xlim[0]) / xsize).astype(int) blu_iy = ((points_end2d[iblu, 1] - ylim[0]) / ysize).astype(int) # Ensure all blue pixel indices are within bounds mask = ( (blu_ix >= 0) & (blu_ix < resolution[0]) & (blu_iy >= 0) & (blu_iy < resolution[1]) ) blu_ix = blu_ix[mask] blu_iy = blu_iy[mask] for i in range(len(blu_ix)): blu_mix[blu_ix[i], blu_iy[i]] += 1 # Stack RGB channels red_mix *= 255 / red_mix.max() blu_mix *= 255 / blu_mix.max() mix_img = np.zeros((resolution[0], resolution[1], 3), dtype = int) mix_img[:, :, 0] = red_mix.astype(int) mix_img[:, :, 2] = blu_mix.astype(int) return Voxels( mix_img, xlim = xlim, ylim = ylim, zlim = [0, 2], )
[docs]class LaceyColorsLinear(Reducer): '''Apply the `LaceyColors` mixing algorithm at `num_divisions` equidistant points between `p1` and `p2`, saving images at each step in `directory`. Reducer signature: :: PointData -> LaceyColors.fit -> (height, width, 3) np.ndarray list[PointData] -> LaceyColors.fit -> (height, width, 3) np.ndarray list[np.ndarray] -> LaceyColors.fit -> (height, width, 3) np.ndarray For details about the mixing algorithm itself, check the `LaceyColors` documentation. The generated images (saved in `directory` with `height` x `width` pixels) can be stitched into a video using `pept.plots.make_video`. *New in pept-0.5.1* Examples -------- Consider a pipe-flow experiment, with tracers moving from side to side in multiple passes / streamlines. First locate the tracers, then split their trajectories into each individual pass: >>> import pept >>> from pept.tracking import * >>> >>> split_pipe = pept.Pipeline([ >>> Segregate(window = 10, max_distance = 20), # Appends label column >>> GroupBy("label"), # Splits into samples >>> Reorient(), # Align with X axis >>> Center(), # Center points at 0 >>> Stack(), >>> ]) >>> streamlines = split_pipe.fit(trajectories) Now each sample in `streamlines` corresponds to a single tracer pass, e.g. `streamlines[0]` is the first pass, `streamlines[1]` is the second. The passes were reoriented and centred such that the pipe is aligned with the X axis. Now the `LaceyColorsLinear` reducer can be used to create images of the mixing between the pipe entrance and exit: >>> from pept.processing import LaceyColorsLinear >>> entrance = [-100, 0, 0] # Pipe data was aligned with X and centred >>> exit = [100, 0, 0] >>> LaceyColorsLinear( >>> directory = "lacey", # Creates directory and saves images there >>> p1 = entrance, >>> p2 = exit, >>> ).fit(streamlines) Now the directory "lacey" was created inside your current working folder, and all Lacey images saved there as "frame0000.png", "frame0001.png", etc. You can stitch all images together into a video using `pept.plots.make_video`: >>> import pept >>> pept.plots.make_video("lacey/frame*.png", output = "lacey/video.avi") '''
[docs] def __init__( self, directory, p1, p2, xlim = None, ylim = None, num_divisions = 50, max_distance = 10, resolution = (8, 8), height = 1000, width = 1000, prefix = "frame", ): self.directory = directory self.num_divisions = int(num_divisions) if self.num_divisions <= 1: raise ValueError(textwrap.fill(( "The input `num_divisions` should be an integer >= 2. " f"Received `{self.num_divisions}`." ))) self.p1 = np.asarray(p1, dtype = float) self.p2 = np.asarray(p2, dtype = float) self.xlim = xlim self.ylim = ylim self.max_distance = float(max_distance) self.resolution = tuple(resolution) self.height = int(height) self.width = int(width) self.prefix = prefix
[docs] def fit( self, trajectories, executor = "joblib", max_workers = None, verbose = True, ): if not os.path.isdir(self.directory): os.mkdir(self.directory) divisions = np.c_[ np.linspace(self.p1[0], self.p2[0], self.num_divisions), np.linspace(self.p1[1], self.p2[1], self.num_divisions), np.linspace(self.p1[2], self.p2[2], self.num_divisions), ] axis = self.p2 - self.p1 # Compute basis vectors for splitting flow lacey = LaceyColors( p1 = divisions[0], p2 = divisions[1], ax1 = axis, ax2 = axis, max_distance = self.max_distance, resolution = self.resolution, ) lacey.fit(trajectories) basis1 = lacey.basis1 basis2 = lacey.basis2 if self.xlim is None: self.xlim = lacey.xlim if self.ylim is None: self.ylim = lacey.ylim def compute_save_lacey(idiv): image = LaceyColors( p1 = self.p1, p2 = divisions[idiv], ax1 = axis, ax2 = axis, basis1 = basis1, basis2 = basis2, xlim = self.xlim, ylim = self.ylim, max_distance = self.max_distance, resolution = self.resolution, ).fit(trajectories) travelled = np.linalg.norm(divisions[idiv] - self.p1) grapher = plots.PlotlyGrapher2D(subplot_titles = [ f"Travelled {travelled} mm" ]) grapher.add_image(image) grapher.fig.write_image( f"{self.directory}/{self.prefix}{idiv:0>4}.png", height = self.height, width = self.width, ) _parallel( compute_save_lacey, range(1, self.num_divisions), executor = executor, max_workers = max_workers, desc = "LaceyColorsLinear :", verbose = verbose, )
[docs]class RelativeDeviations(Reducer): '''Compute a Lagrangian mixing measure - the changes in tracer distances to a point P1 as they pass through an "inlet" plane and another point P2 when reaching an "outlet" plane. A deviation is computed for each tracer trajectory, yielding a range of deviations that can e.g be histogrammed (default). Intuitively, mixing is stronger if this distribution of deviations is wider. Reducer signature: :: If ``histogram = True`` (default) PointData -> LaceyColors.fit -> plotly.graph_objs.Figure list[PointData] -> LaceyColors.fit -> plotly.graph_objs.Figure list[np.ndarray] -> LaceyColors.fit -> plotly.graph_objs.Figure If ``histogram = False`` (return deviations) PointData -> LaceyColors.fit -> (N,) np.ndarray list[PointData] -> LaceyColors.fit -> (N,) np.ndarray list[np.ndarray] -> LaceyColors.fit -> (N,) np.ndarray **Each sample in the input `PointData` is treated as a separate streamline / tracer pass. You can group passes using `Segregate + GroupBy("label")`**. The first plane where the distances from tracers to a point `p1` is defined by the point `p1` and direction axis `ax1`. **The point `p1` should be the middle of the pipe**. The second plane where relative distances are evaluated is similarly defined by `p2` and `ax2`. **The point `p2` should be the middle of the volume / pipe**. If the direction vectors `ax1` and `ax2` are undefined (`None`), the tracers are assumed to follow a straight line between `p1` and `p2`. The `max_distance` parameter defines the maximum distance allowed between a point and a plane to be considered part of it. The `resolution` defines the number of pixels in the height and width of the resulting image. The following attributes are always set. A Plotly figure is only generated and returned if `histogram = True` (default). The extra keyword arguments ``**kwargs`` are passed to the histogram creation routine `pept.plots.histogram`. You can e.g. set the YAxis limits by adding `ylim = [0, 20]`. *New in pept-0.5.1* Attributes ---------- points1 : pept.PointData The tracer points selected at the inlet around `p1`. points2 : pept.PointData The tracer points selected at the outlet around `p2`. deviations : (N,) np.ndarray The vector of tracer deviations for each tracer pass in `points1` and `points2`. Examples -------- Consider a pipe-flow experiment, with tracers moving from side to side in multiple passes / streamlines. First locate the tracers, then split their trajectories into each individual pass: >>> import pept >>> from pept.tracking import * >>> >>> split_pipe = pept.Pipeline([ >>> Segregate(window = 10, max_distance = 20), # Appends label column >>> GroupBy("label"), # Splits into samples >>> Reorient(), # Align with X axis >>> Center(), # Center points at 0 >>> Stack(), >>> ]) >>> streamlines = split_pipe.fit(trajectories) Now each sample in `streamlines` corresponds to a single tracer pass, e.g. `streamlines[0]` is the first pass, `streamlines[1]` is the second. The passes were reoriented and centred such that the pipe is aligned with the X axis. Now the `RelativeDeviations` reducer can be used to create a histogram of tracer deviations due to mixing: >>> from pept.processing import RelativeDeviations >>> entrance = [-100, 0, 0] # Pipe data was aligned with X and centred >>> exit = [100, 0, 0] >>> fig = RelativeDeviations(entrance, exit).fit(streamlines) >>> fig.show() The deviations themselves can be extracted directly for further processing: >>> mixing_algorithm = RelativeDeviations(entrance, exit, histogram=False) >>> mixing_algorithm.fit(streamlines) >>> deviations = mixing_algorithm.deviations >>> inlet_points = mixing_algorithm.points1 >>> outlet_points = mixing_algorithm.points2 '''
[docs] def __init__( self, p1, p2, ax1 = None, ax2 = None, max_distance = 10, histogram = True, **kwargs, ): self.p1 = np.asarray(p1, dtype = float) _check_point3d(p1 = self.p1) if ax1 is not None: ax1 = np.asarray(ax1, dtype = float) self.ax1 = ax1 / np.linalg.norm(ax1) _check_point3d(ax1 = self.ax1) self.p2 = np.asarray(p2, dtype = float) _check_point3d(p2 = self.p2) if ax2 is not None: ax2 = np.asarray(ax2, dtype = float) self.ax2 = ax2 / np.linalg.norm(ax2) _check_point3d(ax2 = self.ax2) self.max_distance = float(max_distance) self.histogram = bool(histogram) self.kwargs = kwargs # Will be set in `fit` self.points1 = None self.points2 = None self.deviations = None
[docs] def fit( self, trajectories, executor = "joblib", max_workers = None, verbose = True, ): if not isinstance(trajectories, PointData): try: trajectories = PointData(trajectories) except (ValueError, TypeError): raise TypeError(textwrap.fill(( "The input `trajectories` must be pept.PointData-like. " f"Received `{type(trajectories)}`." ))) if self.ax1 is None: self.ax1 = self.p2 - self.p1 if self.ax2 is None: self.ax2 = self.p2 - self.p1 self.ax1 = self.ax1 / np.linalg.norm(self.ax1) self.ax2 = self.ax2 / np.linalg.norm(self.ax2) # Aliases p1 = self.p1 ax1 = self.ax1 p2 = self.p2 ax2 = self.ax2 max_distance = self.max_distance # For each pass, find the closest point to the start and end planes points_start = [] points_end = [] for traj in trajectories: xyz = traj.points[:, 1:4] xyz = xyz[np.isfinite(xyz).all(axis = 1)] dists_start = np.abs(np.dot(xyz - p1, ax1)) dists_end = np.abs(np.dot(xyz - p2, ax2)) min_dist_start = np.min(dists_start) min_dist_end = np.min(dists_end) if min_dist_start < max_distance and min_dist_end < max_distance: points_start.append(traj.points[np.argmin(dists_start)]) points_end.append(traj.points[np.argmin(dists_end)]) else: points_start.append(np.full(traj.points.shape[1], np.nan)) points_end.append(np.full(traj.points.shape[1], np.nan)) # TODO: check empty trajectories self.points1 = trajectories.copy(data = points_start) self.points2 = trajectories.copy(data = points_end) # Distances to P1 and P2 and relative deviations d1 = np.linalg.norm(self.points1.points[:, 1:4] - self.p1, axis = 1) d2 = np.linalg.norm(self.points2.points[:, 1:4] - self.p2, axis = 1) self.deviations = np.abs(d2 - d1) # Return histogram of deviations if self.histogram: return plots.histogram( self.deviations, xaxis_title = "Relative Deviation (mm)", yaxis_title = "Probability (%)", **self.kwargs, ) else: return self.deviations
[docs]class RelativeDeviationsLinear(Reducer): '''Apply the `RelativeDeviations` mixing algorithm at `num_divisions` equidistant points between `p1` and `p2`, saving histogram images at each step in `directory`. Reducer signature: :: PointData -> LaceyColors.fit -> plotly.graph_objs.Figure list[PointData] -> LaceyColors.fit -> plotly.graph_objs.Figure list[np.ndarray] -> LaceyColors.fit -> plotly.graph_objs.Figure For details about the mixing algorithm itself, check the `RelativeDeviations` documentation. This algorithm saves a rich set of data: - Individual histograms for each point along P1-P2 are saved in the given `directory`. - A Plotly figure of computed statistics is returned, including the deviations' mean, standard deviation, skewness and kurtosis. - The raw data is saved as object attributes (see below). The generated images (saved in `directory` with `height` x `width` pixels) can be stitched into a video using `pept.plots.make_video`. The extra keyword arguments ``**kwargs`` are passed to the histogram creation routine `pept.plots.histogram`. You can e.g. set the YAxis limits by adding `ylim = [0, 20]`. *New in pept-0.5.1* Attributes ---------- deviations : list[(N,) np.ndarray] A list of deviations computed by `RelativeDeviations` at each point between P1 and P2. mean : (N,) np.ndarray A vector of mean tracer deviations at each point between P1 and P2. std : (N,) np.ndarray A vector of the tracer deviations' standard deviation at each point between P1 and P2. skew : (N,) np.ndarray A vector of the tracer deviations' adjusted skewness at each point between P1 and P2. A normal distribution has a value of 0; positive values indicate a longer right distribution tail; negative values indicate a heavier left tail. kurtosis : (N,) np.ndarray A vector of the tracer deviations' Fisher kurtosis at each point between P1 and P2. A normal distribution has a value of 0; positive values indicate a "thin" distribution; negative values indicate a heavy, wide distribution. Examples -------- Consider a pipe-flow experiment, with tracers moving from side to side in multiple passes / streamlines. First locate the tracers, then split their trajectories into each individual pass: >>> import pept >>> from pept.tracking import * >>> >>> split_pipe = pept.Pipeline([ >>> Segregate(window = 10, max_distance = 20), # Appends label column >>> GroupBy("label"), # Splits into samples >>> Reorient(), # Align with X axis >>> Center(), # Center points at 0 >>> Stack(), >>> ]) >>> streamlines = split_pipe.fit(trajectories) Now each sample in `streamlines` corresponds to a single tracer pass, e.g. `streamlines[0]` is the first pass, `streamlines[1]` is the second. The passes were reoriented and centred such that the pipe is aligned with the X axis. Now the `RelativeDeviationsLinear` reducer can be used to create images of the mixing between the pipe entrance and exit: >>> from pept.processing import RelativeDeviationsLinear >>> entrance = [-100, 0, 0] # Pipe data was aligned with X and centred >>> exit = [100, 0, 0] >>> summary_fig = RelativeDeviationsLinear( >>> directory = "deviations", # Creates directory to save images >>> p1 = entrance, >>> p2 = exit, >>> ).fit(streamlines) >>> summary_fig.show() # Summary statistics: mean, std, etc. Now the directory "deviations" was created inside your current working folder, and all relative deviation histograms were saved there as "frame0000.png", "frame0001.png", etc. You can stitch all images together into a video using `pept.plots.make_video`: >>> import pept >>> pept.plots.make_video( >>> "deviations/frame*.png", >>> output = "deviations/video.avi" >>> ) The raw deviations and statistics can also be extracted directly: >>> mixing_algorithm = RelativeDeviationsLinear( >>> directory = "deviations", # Creates directory to save images >>> p1 = entrance, >>> p2 = exit, >>> ) >>> fig = mixing_algorithm.fit(streamlines) >>> fig.show() >>> deviations = mixing_algorithm.deviations >>> mean = mixing_algorithm.mean >>> std = mixing_algorithm.std >>> skew = mixing_algorithm.skew >>> kurtosis = mixing_algorithm.kurtosis '''
[docs] def __init__( self, directory, p1, p2, num_divisions = 50, max_distance = 10, height = 1000, width = 2000, prefix = "frame", **kwargs, ): self.directory = directory self.num_divisions = int(num_divisions) self.p1 = np.asarray(p1, dtype = float) self.p2 = np.asarray(p2, dtype = float) self.max_distance = float(max_distance) self.height = int(height) self.width = int(width) self.prefix = prefix self.kwargs = kwargs # Will be set in `fit` self.deviations = None self.mean = None self.std = None self.skew = None self.kurtosis = None
[docs] def fit( self, trajectories, executor = "joblib", max_workers = None, verbose = True, ): if not os.path.isdir(self.directory): os.mkdir(self.directory) divisions = np.c_[ np.linspace(self.p1[0], self.p2[0], self.num_divisions), np.linspace(self.p1[1], self.p2[1], self.num_divisions), np.linspace(self.p1[2], self.p2[2], self.num_divisions), ] axis = self.p2 - self.p1 # Save vectors of deviations so that we can set the XAxis range def compute_deviations(division): return RelativeDeviations( p1 = self.p1, p2 = division, ax1 = axis, ax2 = axis, max_distance = self.max_distance, histogram = False, ).fit(trajectories) self.deviations = _parallel( compute_deviations, divisions, executor = executor, max_workers = max_workers, desc = "RelativeDeviationsLinear 1 / 3 :", verbose = verbose, ) # Save histograms to disk xlim = [0, np.nanmax([np.nanmax(d) for d in self.deviations])] def save_histograms(idiv): fig = plots.histogram( self.deviations[idiv], xlim = xlim, **self.kwargs, ) travelled = np.linalg.norm(divisions[idiv] - self.p1) fig.update_layout( title = f"Travelled {travelled:4.4f} mm", xaxis_title = "Deviation (mm)", yaxis_title = "Probability (%)", ) fig.write_image( f"{self.directory}/{self.prefix}{idiv:0>4}.png", height = self.height, width = self.width, ) _parallel( save_histograms, range(self.num_divisions), executor = executor, max_workers = max_workers, desc = "RelativeDeviationsLinear 2 / 3 :", verbose = verbose, ) # Compute summarised statistics about distributions def compute_stats(idiv): dev = self.deviations[idiv] dev = dev[np.isfinite(dev)] return [ np.mean(dev), np.std(dev), scipy.stats.skew(dev), scipy.stats.kurtosis(dev), ] stats = _parallel( compute_stats, range(self.num_divisions), executor = executor, max_workers = max_workers, desc = "RelativeDeviationsLinear 3 / 3 :", verbose = verbose, ) stats = np.array(stats, order = "F") self.mean = stats[:, 0] self.std = stats[:, 1] self.skew = stats[:, 2] self.kurtosis = stats[:, 3] # Plot summarised statistics distance = np.linspace( 0, np.linalg.norm(self.p2 - self.p1), self.num_divisions, ) fig = make_subplots(rows = 2, cols = 1, subplot_titles = [ "Mean Tracer Deviation Along P1-P2 Axis", "Distribution Skewness & Kurtosis (How Many Outliers?)", ]) fig.add_trace( go.Scatter( x = distance, y = self.mean, mode = "lines", name = "Mean Deviation", ), row = 1, col = 1, ) fig.add_trace( go.Scatter( x = distance, y = self.mean - self.std, mode = 'lines', marker_color = "#444", line_width = 0, showlegend = False, ), row = 1, col = 1, ) fig.add_trace( go.Scatter( x = distance, y = self.mean + self.std, mode = 'lines', marker_color = "#444", line_width = 0, fillcolor = 'rgba(68, 68, 68, 0.3)', fill = 'tonexty', name = "Standard Deviation", ), row = 1, col = 1, ) # Plot skewness and kurtosis fig.add_trace( go.Scatter( x = distance, y = self.skew, name = "Skewness", ), row = 2, col = 1, ) fig.add_trace( go.Scatter( x = distance, y = self.kurtosis, name = "Kurtosis", ), row = 2, col = 1, ) plots.format_fig(fig) fig.update_xaxes(title = "Length Along P1-P2 Axis (mm)") fig.update_layout(yaxis_title = "Deviation (mm)") return fig
[docs]class AutoCorrelation(Reducer): r'''Compute autocorrelation of multiple measures (eg YZ velocities) as a function of a lagging variable (eg time). Reducer signature: :: PointData -> AutoCorrelation.fit -> PlotlyGrapher2D list[PointData] -> AutoCorrelation.fit -> PlotlyGrapher2D list[np.ndarray] -> AutoCorrelation.fit -> PlotlyGrapher2D **Each sample in the input `PointData` is treated as a separate streamline / tracer pass. You can group passes using `Segregate + GroupBy("label")`**. Autocorrelation and autocovariance each refer to about 3 different things in each field. The formula used here, inspired by the VACF in molecular dynamics and generalised for arbitrary measures, is: .. math:: C(L_i) = \frac{ \sum_{N} V(L_0) \cdot V(L_i) }{N} i.e. the autocorrelation C at a lag of Li is the average of the dot products of quantities V for all N tracers. For example, the velocity autocorrelation function with respect to time would be the average of `vx(0) vx(t) + vy(0) vy(t) + vz(0) vz(t)` at a given time `t`. The input `lag` defines the column used as a lagging variable; it can be given as a named column string (e.g. `"t"`) or index (e.g. `0`). The input `signals` define the quantities for which the autocorrelation is computed, given as a list of column names (e.g. `["vy", "vz"]`) or indices (e.g. `[5, 6]`). The input `span`, if defined, is the minimum and maximum values for the `lag` (e.g. start and end times) for which the autocorrelation will be computed. By default it is automatically computed as the range of values. The input `num_divisions` is the number of lag points between `span[0]` and `span[1]` for which the autocorrelation will be computed. The `max_distance` parameter defines the maximum distance allowed between a lag value and the closest trajectory value for it to be considered. If `normalize` is True, then the formula used becomes: .. math:: C(L_i) = \frac{ \sum_{N} V(L_0) \cdot V(L_i) / V(L_0) \cdot V(L_0)}{N} If `preprocess` is True, then the times of each tracer pass is taken relative to its start; only relevant if using time as the lagging variable. The extra keyword arguments ``**kwargs`` are passed to `PlotlyGrapher2D.add_points`. You can e.g. set the YAxis limits by adding `ylim = [0, 20]`. The extra keyword arguments ``**kwargs`` are passed to `plotly.graph_objs.Scatter`. You can e.g. set a different colorscheme with "marker_colorscheme = 'Viridis'". *New in pept-0.5.1* Examples -------- Consider a pipe-flow experiment, with tracers moving from side to side in multiple passes / streamlines. First locate the tracers, then split their trajectories into each individual pass: >>> import pept >>> from pept.tracking import * >>> >>> split_pipe = pept.Pipeline([ >>> Segregate(window = 10, max_distance = 20), # Appends label column >>> GroupBy("label"), # Splits into samples >>> Reorient(), # Align with X axis >>> Center(), # Center points at 0 >>> Velocity(7), # Compute vx, vy, vz >>> Stack(), >>> ]) >>> streamlines = split_pipe.fit(trajectories) Now each sample in `streamlines` corresponds to a single tracer pass, e.g. `streamlines[0]` is the first pass, `streamlines[1]` is the second. The passes were reoriented and centred such that the pipe is aligned with the X axis. Now the `AutoCorrelation` algorithm can be used to compute the VACF: >>> from pept.processing import AutoCorrelation >>> fig = AutoCorrelation("t", ["vx", "vy", "vz"]).fit(streamlines) >>> fig.show() The radial velocity autocorrelation can be computed as a function of the pipe length (X axis as it was reoriented): >>> entrance = -100 >>> exit = 100 >>> ac = AutoCorrelation("x", ["vy", "vz"], span = [entrance, exit]) >>> ac.fit(streamlines).show() The raw lags and autocorrelations plotted can be accessed directly: >>> ac.lags >>> ac.correlation The radial location can be autocorrelated with time, then normalised to show periodic movements (e.g. due to a mixer): >>> ac = AutoCorrelation("t", ["y", "z"], normalize = True) >>> ac.fit(streamlines).show() '''
[docs] def __init__( self, lag = "t", signals = ["vx", "vy", "vz"], span = None, num_divisions = 500, max_distance = 10, normalize = False, preprocess = True, **kwargs, ): self.lag = lag if isinstance(signals, str) or isinstance(signals, Number): signals = [signals] self.signals = signals if span is not None: span = tuple(span) if len(span) != 2: raise ValueError(textwrap.fill(( "The input `span`, if defined, must contain two " f"values [lag_start, lag_end]. Received `{span}`." ))) self.span = span self.num_divisions = int(num_divisions) self.max_distance = float(max_distance) self.normalize = bool(normalize) self.preprocess = bool(preprocess) self.kwargs = kwargs # Will be set in `fit` self.lags = None self.correlation = None
[docs] def fit( self, trajectories, executor = "joblib", max_workers = None, verbose = True, ): if not isinstance(trajectories, PointData): try: trajectories = PointData(trajectories) except (ValueError, TypeError): raise TypeError(textwrap.fill(( "The input `trajectories` must be pept.PointData-like. " f"Received `{type(trajectories)}`." ))) # Extract relevant columns' indices if isinstance(self.lag, str): ilag = trajectories.columns.index(self.lag) else: ilag = int(self.lag) isignals = [] for sig in self.signals: if isinstance(sig, str): isignals.append(trajectories.columns.index(sig)) else: isignals.append(int(sig)) # If preprocessing is enabled, make times relative to the start of each # trajectory pass if self.preprocess and ilag == 0: trajectories = trajectories.copy() for traj in trajectories: traj.points[:, 0] -= traj.points[0, 0] # Set span if undefined if self.span is None: self.span = ( np.min(trajectories.points[:, ilag]), np.max(trajectories.points[:, ilag]), ) # For each tracer pass, find closest value to the lag in `divisions` divisions = np.linspace(self.span[0], self.span[1], self.num_divisions) correlation = np.zeros(self.num_divisions) # Save correlations as class attributes self.lags = divisions self.correlation = correlation def closest_signal(traj, lag): lag_values = traj.points[:, ilag] # Find closest lag value lag_distances = np.abs(lag_values - lag) ilag_closest = np.argmin(lag_distances) # If not lag was close enough, return NaNs if lag_distances[ilag_closest] > self.max_distance: return np.full(len(isignals), np.nan) else: return traj.points[ilag_closest, isignals] # Compute signals at lag=0 signals0 = [ closest_signal(traj, divisions[0]) for traj in trajectories ] def compute_correlation(lag): # Find signal values close to this lag value signal_values = [ closest_signal(traj, lag) for traj in trajectories ] # Compute correlation as dot product with signals at lag=0 if self.normalize: corr = [ np.dot(signals0[i], signal_values[i]) / np.dot(signals0[i], signals0[i]) for i in range(len(signal_values)) ] else: corr = [ np.dot(signals0[i], signal_values[i]) for i in range(len(signal_values)) ] # Return ensemble average, ignoring NaNs (i.e. when no signal was # found close enough to the given lag value) finites = np.isfinite(corr) if not len(corr) or np.all(~finites): return np.nan, 0 return np.nanmean(corr), finites.sum() # Compute correlation at each lag in `divisions` stats = _parallel( compute_correlation, divisions, executor = executor, max_workers = max_workers, desc = "AutoCorrelation :", verbose = verbose, ) correlation[:] = [s[0] for s in stats] finites = np.array([s[1] for s in stats]) # Make points denser for colorbars dense_lags = np.linspace( self.span[0], self.span[1], 50 * self.num_divisions, ) dense_correlation = interp1d(self.lags, self.correlation)(dense_lags) dense_finites = interp1d(self.lags, finites)(dense_lags) # Plot correlation as a function of lag if len(self.correlation): ylim = [np.nanmin(self.correlation), np.nanmax(self.correlation)] else: ylim = None if "marker_size" not in self.kwargs.keys(): self.kwargs["marker_size"] = 5 if "marker_colorscale" not in self.kwargs.keys(): self.kwargs["marker_colorscale"] = "Magma_r" if "line_color" not in self.kwargs.keys(): self.kwargs["line_color"] = "black" grapher = plots.PlotlyGrapher2D(ylim = ylim) grapher.add_trace(go.Scatter( x = dense_lags, y = dense_correlation, mode = "markers+lines", marker = dict( colorbar_title = "N", color = dense_finites, ), **self.kwargs, )) norm = "Normalised " if self.normalize else "" grapher.fig.update_layout( xaxis_title = f"Lag (column `{self.lag}`)", yaxis_title = f"{norm}AutoCorrelation (columns {self.signals})", ) # Set scaleratios to be automatically determined for i in range(grapher._rows): for j in range(grapher._cols): index = i * grapher._cols + j + 1 yaxis = f"yaxis{index}" if index != 1 else "yaxis" grapher._fig["layout"][yaxis].update( scaleanchor = None, scaleratio = None, ) # If the span is inversed, reverse the X-axis if self.span[1] < self.span[0]: grapher.fig.update_layout(xaxis_autorange = "reversed") return grapher
[docs]class SpatialProjections(Reducer): '''Project multiple tracer passes onto a moving 2D plane along a given `direction` between `start` and `end` coordinates, saving each frame in `directory`. Reducer signature: :: PointData -> SpatialProjections.fit -> None list[PointData] -> SpatialProjections.fit -> None list[np.ndarray] -> SpatialProjections.fit -> None **Each sample in the input `PointData` is treated as a separate streamline / tracer pass. You can group passes using `Segregate + GroupBy("label")`**. The generated images (saved in `directory` with `height` x `width` pixels) can be stitched into a video using `pept.plots.make_video`. The extra keyword arguments ``**kwargs`` are passed to the histogram creation routine `pept.plots.histogram`. You can e.g. set the YAxis limits by adding `ylim = [0, 20]`. *New in pept-0.5.1* Attributes ---------- projections : list[(N, 5), np.ndarray] A list of frames for each division between `start` and `end`, with each frame saving 5 columns [t, x, y, z, colorbar_col]. Examples -------- Consider a pipe-flow experiment, with tracers moving from side to side in multiple passes / streamlines. First locate the tracers, then split their trajectories into each individual pass: >>> import pept >>> from pept.tracking import * >>> >>> split_pipe = pept.Pipeline([ >>> Segregate(window = 10, max_distance = 20), # Appends label column >>> GroupBy("label"), # Splits into samples >>> Reorient(), # Align with X axis >>> Center(), # Center points at 0 >>> Stack(), >>> ]) >>> streamlines = split_pipe.fit(trajectories) Now each sample in `streamlines` corresponds to a single tracer pass, e.g. `streamlines[0]` is the first pass, `streamlines[1]` is the second. The passes were reoriented and centred such that the pipe is aligned with the X axis. Now the `RelativeDeviationsLinear` reducer can be used to create images of the mixing between the pipe entrance and exit: >>> from pept.processing import SpatialProjections >>> entrance_x = -100 # Pipe data was aligned with X >>> exit_x = 100 >>> SpatialProjections( >>> directory = "projections", # Creates directory to save images >>> start = entrance_x, >>> end = exit_x, >>> ).fit(streamlines) Now the directory "projections" was created inside your current working folder, and eachc projected frame was saved there as "frame0000.png", "frame0001.png", etc. You can stitch all images together into a video using `pept.plots.make_video`: >>> import pept >>> pept.plots.make_video( >>> "projections/frame*.png", >>> output = "projections/video.avi" >>> ) The raw projections can also be extracted directly: >>> sp = SpatialProjections( >>> directory = "projections", # Creates directory to save images >>> p1 = entrance_x, >>> p2 = exit_x, >>> ) >>> sp.fit(streamlines) >>> sp.projections '''
[docs] def __init__( self, directory, start, end, dimension = "x", num_divisions = 500, max_distance = 10, colorbar_col = -1, height = 1000, width = 1000, prefix = "frame", **kwargs, ): self.directory = directory self.start = float(start) self.end = float(end) # Transform x -> 1, y -> 2, z -> 3 using ASCII integer `ord`er if isinstance(dimension, str): if dimension != "x" and dimension != "y" and dimension != "z": raise ValueError(textwrap.fill(( "The input `dimension` must be either 'x', 'y' or 'z'. " f"Received `{dimension}`." ))) self.dimension = ord(dimension) - ord('x') + 1 else: self.dimension = int(dimension) if self.dimension < 1 or self.dimension > 3: raise ValueError(textwrap.fill(( "The input `dimension`, if given as an index, must be " f"between 1 and 3 (inclusive). Received `{dimension}`." ))) self.num_divisions = int(num_divisions) self.max_distance = float(max_distance) self.colorbar_col = colorbar_col self.height = int(height) self.width = int(width) self.prefix = prefix self.kwargs = kwargs # Will be set in `fit` self.projections = None
[docs] def fit( self, trajectories, executor = "joblib", max_workers = None, verbose = True, ): if not isinstance(trajectories, PointData): try: trajectories = PointData(trajectories) except (ValueError, TypeError): raise TypeError(textwrap.fill(( "The input `trajectories` must be pept.PointData-like. " f"Received `{type(trajectories)}`." ))) # Find colorbar column's index if isinstance(self.colorbar_col, str): icolorbar_col = trajectories.columns.index(self.colorbar_col) else: icolorbar_col = int(self.colorbar_col) # Create directory for saving projection plots if not os.path.isdir(self.directory): os.mkdir(self.directory) # Points at which to project the trajectories self.divisions = np.linspace(self.start, self.end, self.num_divisions) # For each point along the start-end axis, save the closest point's # coordinates and colorbar column def project_division(idiv): division = self.divisions[idiv] points = [] for traj in trajectories: coords = traj.points[:, self.dimension] distances = np.abs(coords - division) imin_dist = np.argmin(distances) min_dist = distances[imin_dist] if min_dist < self.max_distance: points.append(traj.points[ imin_dist, [0, 1, 2, 3, icolorbar_col], ]) if not len(points): return np.empty((0, 5)) return np.array(points) self.projections = _parallel( project_division, range(self.num_divisions), executor = executor, max_workers = max_workers, desc = "SpatialProjections 1 / 2 :", verbose = verbose, ) # Other remaining dimensions other = list({1, 2, 3} - {self.dimension}) ix = other[0] iy = other[1] # Find plot limits if "xlim" not in self.kwargs.keys(): points = np.concatenate([p[:, ix] for p in self.projections]) xlim = [points.min(), points.max()] extra = 0.05 * (xlim[1] - xlim[0]) self.kwargs["xlim"] = [xlim[0] - extra, xlim[1] + extra] if "ylim" not in self.kwargs.keys(): points = np.concatenate([p[:, iy] for p in self.projections]) ylim = [points.min(), points.max()] extra = 0.05 * (ylim[1] - ylim[0]) self.kwargs["ylim"] = [ylim[0] - extra, ylim[1] + extra] if "size" not in self.kwargs.keys(): self.kwargs["size"] = 15 colors = np.concatenate([p[:, -1] for p in self.projections]) if "marker_cmin" not in self.kwargs.keys(): self.kwargs["marker_cmin"] = colors.min() if "marker_cmax" not in self.kwargs.keys(): self.kwargs["marker_cmax"] = colors.max() if "colorscale" not in self.kwargs.keys(): self.kwargs["marker_colorscale"] = "Magma" if "colorbar_title" not in self.kwargs.keys(): self.kwargs["marker_colorbar_title"] = \ trajectories.columns[icolorbar_col] def plot_save_projections(idiv): # dim = '_XYZ'[self.dimension] # dist = self.divisions[idiv] - self.divisions[0] grapher = plots.PlotlyGrapher2D( # subplot_titles = [f"Length along {dim} = {dist:4.4f} mm"], xlim = self.kwargs["xlim"], ylim = self.kwargs["ylim"], ) kwargs = self.kwargs.copy() del kwargs["xlim"] del kwargs["ylim"] grapher.add_points( self.projections[idiv][:, [0, ix, iy]], color = self.projections[idiv][:, -1], **kwargs, ) # Set axis labels grapher.fig.update_layout( xaxis_title = f"{'_XYZ'[other[0]]} (mm)", yaxis_title = f"{'_XYZ'[other[1]]} (mm)", ) grapher.fig.write_image( f"{self.directory}/{self.prefix}{idiv:0>4}.png", height = self.height, width = self.width, ) self.projections = _parallel( plot_save_projections, range(self.num_divisions), executor = executor, max_workers = max_workers, desc = "SpatialProjections 2 / 2 :", verbose = verbose, )