Source code for pept.tracking.trajectory_separation.trajectory_separation

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


#    pept is a Python library that unifies Positron Emission Particle
#    Tracking (PEPT) research, including tracking, simulation, data analysis
#    and visualisation tools.
#
#    If you used this codebase or any software making use of it in a scientific
#    publication, you must cite the following paper:
#        Nicuşan AL, Windows-Yule CR. Positron emission particle tracking
#        using machine learning. Review of Scientific Instruments.
#        2020 Jan 1;91(1):013329.
#        https://doi.org/10.1063/1.5129251
#
#    Copyright (C) 2019-2021 the pept developers
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.


# File   : trajectory_separation.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date   : 23.08.2019


import  os
import  warnings

import  numpy                       as      np
from    scipy.spatial               import  cKDTree
from    scipy.sparse.csgraph        import  minimum_spanning_tree

import  pept
import  hdbscan

from    .distance_matrix_reachable  import  distance_matrix_reachable




def trajectory_errors(
    true_positions,
    tracked_positions,
    averages = True,
    stds = False,
    errors = False,
    max_workers = None
):

    # true_positions, tracked_positions cols: [t, x, y, z, ...]
    true_positions = np.asarray(true_positions, dtype = float)
    tracked_positions = np.asarray(tracked_positions, dtype = float)

    # Construct k-d tree for fast nearest neighbour lookup.
    tree = cKDTree(true_positions[:, 1:4])

    # errors cols: [err, err_x, err_y, err_z]
    errors = np.zeros((len(tracked_positions), 4))

    if max_workers is None:
        max_workers = os.cpu_count()

    # Find closest true point to each tracked point.
    for i, pos in enumerate(tracked_positions[:, 1:4]):
        dist, index = tree.query(pos, k = 1, n_jobs = max_workers)

        errors[i, 0] = np.linalg.norm(pos - true_positions[index])
        errors[i, 1:4] = np.abs(pos - true_positions[index, 0:3])
        # errors[i, 1] = np.abs(pos[0] - true_positions[index, 0])
        # errors[i, 2] = np.abs(pos[1] - true_positions[index, 1])
        # errors[i, 3] = np.abs(pos[2] - true_positions[index, 2])

    # Append and return the results that were asked for.
    results = []

    if averages:
        results.append(np.average(errors, axis = 0))
    if stds:
        results.append(np.std(errors, axis = 0))
    if errors:
        results.append(errors)

    # If a single result was asked for, return it directly; otherwise, use a
    # tuple.
    if len(results) == 1:
        return results[0]
    else:
        return tuple(results)




[docs]class Segregate(pept.base.Reducer): '''Segregate the intertwined points from multiple trajectories into individual paths. Reducer signature: :: pept.PointData -> Segregate.fit -> pept.PointData list[pept.PointData] -> Segregate.fit -> pept.PointData numpy.ndarray -> Segregate.fit -> pept.PointData The points in `point_data` (a numpy array or `pept.PointData`) are used to construct a minimum spanning tree in which every point can only be connected to `points_window` points around it - this "window" refers to the points in the initial data array, sorted based on the time column; therefore, only points within a certain timeframe can be connected. All edges (or "connections") in the minimum spanning tree that are larger than `trajectory_cut_distance` are removed (or "cut") and the remaining connected "clusters" are deemed individual trajectories if they contain more than `min_trajectory_size` points. The trajectory indices (or labels) are appended to `point_data`. That is, for each data point (i.e. row) in `point_data`, a label will be appended starting from 0 for the corresponding trajectory; a label of -1 represents noise. If `point_data` is a numpy array, a new numpy array is returned; if it is a `pept.PointData` instance, a new instance is returned. This function uses single linkage clustering with a custom metric for spatio-temporal data to segregate trajectory points. The single linkage clustering was optimised for this use-case: points are only connected if they are within a certain `points_window` in the time-sorted input array. Sparse matrices are also used for minimising the memory footprint. Attributes ---------- window : int Two points are "reachable" (i.e. they can be connected) if and only if they are within `points_window` in the time-sorted input `point_data`. As the points from different trajectories are intertwined (e.g. for two tracers A and B, the `point_data` array might have two entries for A, followed by three entries for B, then one entry for A, etc.), this should optimally be the largest number of points in the input array between two consecutive points on the same trajectory. If `points_window` is too small, all points in the dataset will be unreachable. Naturally, a larger `time_window` correponds to more pairs needing to be checked (and the function will take a longer to complete). cut_distance : float Once all the closest points are connected (i.e. the minimum spanning tree is constructed), separate all trajectories that are further apart than `trajectory_cut_distance`. min_trajectory_size : float, default 5 After the trajectories have been cut, declare all trajectories with fewer points than `min_trajectory_size` as noise. max_time_interval : float, default np.finfo(float).max Only connect points if the time difference between their timestamps is smaller than `max_time_interval`. *Setting added in pept-0.5.2*. See Also -------- Reconnect : Connect segregated trajectories based on tracer signatures. PlotlyGrapher : Easy, publication-ready plotting of PEPT-oriented data. Examples -------- A typical workflow would involve transforming LoRs into points using some tracking algorithm. These points include all tracers moving through the system, being intertwined (e.g. for two tracers A and B, the `point_data` array might have two entries for A, followed by three entries for B, then one entry for A, etc.). They can be segregated based on position alone using this function; take for example two tracers that go downwards (below, 'x' is the position, and in parens is the array index at which that point is found). :: `points`, numpy.ndarray, shape (10, 4), columns [time, x, y, z]: x (1) x (2) x (3) x (4) x (5) x (7) x (6) x (9) x (8) x (10) >>> import pept.tracking.trajectory_separation as tsp >>> points_window = 10 >>> trajectory_cut_distance = 15 # mm >>> segregated_trajectories = tsp.segregate_trajectories( >>> points, points_window, trajectory_cut_distance >>> ) :: `segregated_trajectories`, numpy.ndarray, shape (10, 5), columns [time, x, y, z, trajectory_label]: x (1, label = 0) x (2, label = 1) x (3, label = 0) x (4, label = 1) x (5, label = 0) x (7, label = 1) x (6, label = 0) x (9, label = 1) x (8, label = 0) x (10, label = 1) '''
[docs] def __init__( self, window, cut_distance, min_trajectory_size = 5, max_time_interval = np.finfo(np.float64).max, ): self.window = int(window) self.cut_distance = float(cut_distance) self.min_trajectory_size = int(min_trajectory_size) self.max_time_interval = float(max_time_interval)
[docs] def fit(self, points): # Stack the input points into a single PointData if not isinstance(points, pept.PointData): points = pept.PointData(points) if len(points.points) == 0: return points.copy( data = points.points[0:0], columns = points.columns + ["label"], ) pts = points.points # Sort pts based on the time column (col 0) and create a C-ordered copy # to send to Cython. pts = np.asarray(pts[pts[:, 0].argsort()], dtype = float, order = "C") # Calculate the sparse distance matrix between reachable points. This # is an optimised Cython function returning a sparse CSR matrix. distance_matrix = distance_matrix_reachable( pts, self.window, self.max_time_interval, ) # Construct the minimum spanning tree from the sparse distance matrix. # Note that `mst` is also a sparse CSR matrix. mst = minimum_spanning_tree(distance_matrix) # Get the minimum spanning tree edges into the [vertex 1, vertex 2, # edge distance] format, then sort it based on the edge distance. mst = mst.tocoo() mst_edges = np.vstack((mst.row, mst.col, mst.data)).T mst_edges = mst_edges[mst_edges[:, 2].argsort()] # Ignore deprecation warning from HDBSCAN's use of `np.bool` with warnings.catch_warnings(): warnings.simplefilter("ignore", category = DeprecationWarning) # Create the single linkage tree from the minimum spanning tree # edges using internal hdbscan methods (because they're damn fast). # This should be a fairly quick step. linkage_tree = hdbscan._hdbscan_linkage.label(mst_edges) linkage_tree = hdbscan.plots.SingleLinkageTree(linkage_tree) # Cut the single linkage tree at `trajectory_cut_distance` and get # the cluster labels, setting clusters smaller than # `min_trajectory_size` to -1 (i.e. noise). labels = linkage_tree.get_clusters( self.cut_distance, self.min_trajectory_size, ) # Append the labels to `pts`. return points.copy( data = np.c_[pts, labels], columns = points.columns + ["label"], )
[docs]class Reconnect(pept.base.Reducer): '''Best-fit trajectory segment reconstruction based on time, distance and arbitrary tracer signatures. Reducer signature: :: pept.PointData -> Segregate.fit -> pept.PointData list[pept.PointData] -> Segregate.fit -> pept.PointData numpy.ndarray -> Segregate.fit -> pept.PointData After a trajectory segregation step (e.g. using ``Segregate``), you may be left with multiple smaller trajectory segments. Some trajectories can be reconstructed even when losing the tracers for a bit. When a tracer is lost for less than `tmax` time and `dmax` distance, its trajectory segments are reconnected; if multiple condidates are possible, the best fit is used. Multiple tracer signatures can be used to improve the reconnection step; supply them as data column names and difference thresholds, e.g. an extra keyword argument ``v = 1`` will join trajectories whose difference in velocity is smaller than 1 m/s. The last `num_points` points on a segment are averaged before they are connected with the first `num_points` on another segment. *New in pept-0.4.2* Examples -------- Reconnect segments that are closer than 1 second in time and 0.1 m apart: >>> from pept.tracking import * >>> trajectories = Reconnect(tmax = 1000, dmax = 100).fit(segments) You can use the `cluster_size` (set by the ``Centroids`` filter) as a tracer signature; allow segments to be reconnected if the difference in their cluster size is < 100: >>> trajectories = Reconnect(1000, 100, cluster_size = 100).fit(segments) And a velocity `v` difference < 0.1: >>> Reconnect(1000, 100, cluster_size = 100, v = 0.1).fit(segments) '''
[docs] def __init__( self, tmax, dmax, column = "label", num_points = 10, **signatures, ): self.column = column self.num_points = int(num_points) self.tmax = float(tmax) self.dmax = float(dmax) self.signatures = dict() for sig, thresh in signatures.items(): self.signatures[sig] = float(thresh)
[docs] def fit(self, points): points = pept.tracking.Stack().fit(points) if not isinstance(points, pept.PointData): points = pept.PointData(points) # Columns corresponding to the signatures sig_cols = [points.columns.index(sn) for sn in self.signatures.keys()] trajs = pept.tracking.SplitAll(self.column).fit(points) trajs.sort(key = lambda traj: traj["t"][0]) # List of connections to do, list[tuple[int, int]] connections = [] # Try to forward-connect the end of trajs[i] to the start of trajs[j] start_times = np.array([t["t"][0] for t in trajs]) for i in range(len(trajs)): # Select all future trajectories whose start time is within tmax cur_traj = trajs[i] indices = np.argwhere( (start_times > cur_traj["t"][-1]) & (start_times - cur_traj["t"][-1] < self.tmax), ).flatten() # If no feasible times were found, carry on if not indices.any(): continue # Compute connection costs between trajectory ends costs = [] for j in indices: e2 = trajs[i].points[-self.num_points:].mean(axis = 0) e1 = trajs[j].points[:self.num_points].mean(axis = 0) # The first cost is the distance between traj ends; the rest # are the signature differences cost = [np.linalg.norm(e2[1:4] - e1[1:4])] for sc in sig_cols: cost.append(np.abs(e2[sc] - e1[sc])) costs.append(cost) # Keep track of trajectory indices and associated costs costs = np.c_[indices, np.array(costs)] # Remove condidate connections that have costs larger than threshs selection = costs[:, 1] < self.dmax for i, sthresh in enumerate(self.signatures.values()): selection = selection & (costs[:, 2 + i] < sthresh) costs = costs[selection] # If no feasible connection was found, carry on if not len(costs): continue # Otherwise, establish connection with minimum overall cost best = costs[:, 1:].mean(axis = 1).argmin() connection_index = int(costs[best, 0]) connections.append((i, connection_index)) # Set connected labels if isinstance(self.column, str): label_col = points.columns.index(self.column) else: label_col = self.column for i1, i2 in connections: trajs[i2].points[:, label_col] = trajs[i1].points[0, label_col] # Stack trajectories and map labels from [0, 2, 2, 3, 0] to # [0, 1, 1, 2, 0] trajs = pept.tracking.Stack().fit(trajs) labels = trajs.points[:, label_col] _, ordered = np.unique(labels, return_inverse = True) trajs.points[:, label_col] = ordered return trajs