#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# pept is a Python library that unifies Positron Emission Particle
# Tracking (PEPT) research, including tracking, simulation, data analysis
# and visualisation tools.
#
# If you used this codebase or any software making use of it in a scientific
# publication, you must cite the following paper:
# Nicuşan AL, Windows-Yule CR. Positron emission particle tracking
# using machine learning. Review of Scientific Instruments.
# 2020 Jan 1;91(1):013329.
# https://doi.org/10.1063/1.5129251
#
# Copyright (C) 2019-2021 the pept developers
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# File : trajectory_separation.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date : 23.08.2019
import os
import sys
import warnings
from beartype import beartype
if sys.version_info.minor >= 9:
# Python 3.9
from collections.abc import Iterable
else:
from typing import Iterable
import numpy as np
from scipy.spatial import cKDTree
from scipy.sparse.csgraph import minimum_spanning_tree
import pept
import hdbscan
from .tco import with_continuations
from .distance_matrix_reachable import distance_matrix_reachable
def trajectory_errors(
true_positions,
tracked_positions,
averages = True,
stds = False,
errors = False,
max_workers = None
):
# true_positions, tracked_positions cols: [t, x, y, z, ...]
true_positions = np.asarray(true_positions, dtype = float)
tracked_positions = np.asarray(tracked_positions, dtype = float)
# Construct k-d tree for fast nearest neighbour lookup.
tree = cKDTree(true_positions[:, 1:4])
# errors cols: [err, err_x, err_y, err_z]
errors = np.zeros((len(tracked_positions), 4))
if max_workers is None:
max_workers = os.cpu_count()
# Find closest true point to each tracked point.
for i, pos in enumerate(tracked_positions[:, 1:4]):
dist, index = tree.query(pos, k = 1, n_jobs = max_workers)
errors[i, 0] = np.linalg.norm(pos - true_positions[index])
errors[i, 1:4] = np.abs(pos - true_positions[index, 0:3])
# errors[i, 1] = np.abs(pos[0] - true_positions[index, 0])
# errors[i, 2] = np.abs(pos[1] - true_positions[index, 1])
# errors[i, 3] = np.abs(pos[2] - true_positions[index, 2])
# Append and return the results that were asked for.
results = []
if averages:
results.append(np.average(errors, axis = 0))
if stds:
results.append(np.std(errors, axis = 0))
if errors:
results.append(errors)
# If a single result was asked for, return it directly; otherwise, use a
# tuple.
if len(results) == 1:
return results[0]
else:
return tuple(results)
[docs]class Segregate(pept.base.Reducer):
'''Segregate the intertwined points from multiple trajectories into
individual paths.
Reducer signature:
::
pept.PointData -> Segregate.fit_sample -> pept.PointData
list[pept.PointData] -> Segregate.fit_sample -> pept.PointData
The points in `point_data` (a numpy array or `pept.PointData`) are used
to construct a minimum spanning tree in which every point can only be
connected to `points_window` points around it - this "window" refers to
the points in the initial data array, sorted based on the time column;
therefore, only points within a certain timeframe can be connected. All
edges (or "connections") in the minimum spanning tree that are larger
than `trajectory_cut_distance` are removed (or "cut") and the remaining
connected "clusters" are deemed individual trajectories if they contain
more than `min_trajectory_size` points.
The trajectory indices (or labels) are appended to `point_data`. That
is, for each data point (i.e. row) in `point_data`, a label will be
appended starting from 0 for the corresponding trajectory; a label of
-1 represents noise. If `point_data` is a numpy array, a new numpy
array is returned; if it is a `pept.PointData` instance, a new instance
is returned.
This function uses single linkage clustering with a custom metric for
spatio-temporal data to segregate trajectory points. The single linkage
clustering was optimised for this use-case: points are only connected
if they are within a certain `points_window` in the time-sorted input
array. Sparse matrices are also used for minimising the memory
footprint.
Attributes
----------
window : int
Two points are "reachable" (i.e. they can be connected) if and only
if they are within `points_window` in the time-sorted input
`point_data`. As the points from different trajectories are
intertwined (e.g. for two tracers A and B, the `point_data` array
might have two entries for A, followed by three entries for B, then
one entry for A, etc.), this should optimally be the largest number
of points in the input array between two consecutive points on the
same trajectory. If `points_window` is too small, all points in the
dataset will be unreachable. Naturally, a larger `time_window`
correponds to more pairs needing to be checked (and the function
will take a longer to complete).
cut_distance : float
Once all the closest points are connected (i.e. the minimum
spanning tree is constructed), separate all trajectories that are
further apart than `trajectory_cut_distance`.
min_trajectory_size : float, default 5
After the trajectories have been cut, declare all trajectories with
fewer points than `min_trajectory_size` as noise.
Examples
--------
A typical workflow would involve transforming LoRs into points using some
tracking algorithm. These points include all tracers moving through the
system, being intertwined (e.g. for two tracers A and B, the `point_data`
array might have two entries for A, followed by three entries for B, then
one entry for A, etc.). They can be segregated based on position alone
using this function; take for example two tracers that go downwards (below,
'x' is the position, and in parens is the array index at which that point
is found).
::
`points`, numpy.ndarray, shape (10, 4), columns [time, x, y, z]:
x (1) x (2)
x (3) x (4)
x (5) x (7)
x (6) x (9)
x (8) x (10)
>>> import pept.tracking.trajectory_separation as tsp
>>> points_window = 10
>>> trajectory_cut_distance = 15 # mm
>>> segregated_trajectories = tsp.segregate_trajectories(
>>> points, points_window, trajectory_cut_distance
>>> )
::
`segregated_trajectories`, numpy.ndarray, shape (10, 5),
columns [time, x, y, z, trajectory_label]:
x (1, label = 0) x (2, label = 1)
x (3, label = 0) x (4, label = 1)
x (5, label = 0) x (7, label = 1)
x (6, label = 0) x (9, label = 1)
x (8, label = 0) x (10, label = 1)
See Also
--------
connect_trajectories : Connect segregated trajectories based on tracer
signatures.
PlotlyGrapher : Easy, publication-ready plotting of PEPT-oriented data.
'''
[docs] def __init__(
self,
window,
cut_distance,
min_trajectory_size = 5,
):
self.window = int(window)
self.cut_distance = float(cut_distance)
self.min_trajectory_size = int(min_trajectory_size)
[docs] @beartype
def fit(self, points: Iterable[pept.PointData]):
# Stack the input points into a single PointData
if not isinstance(points, pept.PointData):
points = pept.PointData(points)
if len(points.points) == 0:
return points.copy(
data = points.points[0:0],
columns = points.columns + ["label"],
)
pts = points.points
# Sort pts based on the time column (col 0) and create a C-ordered copy
# to send to Cython.
pts = np.asarray(pts[pts[:, 0].argsort()], dtype = float, order = "C")
# Calculate the sparse distance matrix between reachable points. This
# is an optimised Cython function returning a sparse CSR matrix.
distance_matrix = distance_matrix_reachable(pts, self.window)
# Construct the minimum spanning tree from the sparse distance matrix.
# Note that `mst` is also a sparse CSR matrix.
mst = minimum_spanning_tree(distance_matrix)
# Get the minimum spanning tree edges into the [vertex 1, vertex 2,
# edge distance] format, then sort it based on the edge distance.
mst = mst.tocoo()
mst_edges = np.vstack((mst.row, mst.col, mst.data)).T
mst_edges = mst_edges[mst_edges[:, 2].argsort()]
# Ignore deprecation warning from HDBSCAN's use of `np.bool`
with warnings.catch_warnings():
warnings.simplefilter("ignore", category = DeprecationWarning)
# Create the single linkage tree from the minimum spanning tree
# edges using internal hdbscan methods (because they're damn fast).
# This should be a fairly quick step.
linkage_tree = hdbscan._hdbscan_linkage.label(mst_edges)
linkage_tree = hdbscan.plots.SingleLinkageTree(linkage_tree)
# Cut the single linkage tree at `trajectory_cut_distance` and get
# the cluster labels, setting clusters smaller than
# `min_trajectory_size` to -1 (i.e. noise).
labels = linkage_tree.get_clusters(
self.cut_distance,
self.min_trajectory_size,
)
# Append the labels to `pts`.
return points.copy(
data = np.c_[pts, labels],
columns = points.columns + ["label"],
)
def connect_trajectories(
trajectories_points,
max_time_difference,
max_signature_difference,
points_to_check = 50,
signature_col = 4,
label_col = -1,
as_list = False
):
'''Connect segregated trajectories based on tracer signatures.
A pair of trajectories in `trajectories_points` will be connected if their
ends have a timestamp difference that is smaller than `max_time_difference`
and the difference between the signature averages of the closest
`points_to_check` points is smaller than `max_signature_difference`.
The `trajectories_points` are distinguished based on the trajectory
indices in the data column `label_col`. This can be achieved using the
`segregate_trajectories` function, which appends the labels to the data
points.
Because the tracer signature (e.g. cluster size in PEPT-ML) varies with the
tracer position in the system, an average of `points_to_check` points
is used for connecting pairs of trajectories.
Parameters
----------
trajectories_points : (M, N>=6) numpy.ndarray or pept.PointData
A numpy array of points that have a timestamp, spatial coordinates,
a tracer signature (such as cluster size in PEPT-ML) and a trajectory
index (or label). The data columns in `trajectories_points` are then
[time, x, y, z, ..., signature, ..., label, ...]. Note that the
timestamps and spatial coordinates must be the first 4 columns, while
the signature and label columns may be anywhere and are pointed at
by `signature_col` and `label_col`.
max_time_difference : float
Only try to connect trajectories whose ends have a timestamp difference
smaller than `max_time_difference`.
max_signature_difference : float
Connect two trajectories if the difference between the signature
averages of the closest `points_to_check` is smaller than this.
points_to_check : int, default 50
The number of points used when computing the average tracer signature
in one trajectory.
signature_col : int, default 4
The column in `trajectories_points` that contains the tracer
signatures. The default is 4 (i.e. the signature comes right after
the spatial coordinates).
label_col : int, default -1
The column in `trajectories_points` that contains the trajectory
indices (labels). The default is -1 (i.e. the last column).
as_list : bool, default False
If True, return a list of arrays, where each array contains the points
in a single trajectory. In other words, return separate, single
trajectories in a list. If False, return a single array of all points
(if `trajectories_points` was a `numpy.ndarray`) or a `pept.PointData`
(if `trajectories_points` was a `pept.PointData` instance), but with
labels changed to reflect the connected trajectories.
Returns
-------
numpy.ndarray or pept.PointData or list of numpy.ndarray
If `as_list` is True, return separate, single trajectories in a list.
If `as_list` is False, return a single array of all points
(if `trajectories_points` was a `numpy.ndarray`) or a `pept.PointData`
(if `trajectories_points` was a `pept.PointData` instance), but with
labels changed to reflect the connected trajectories.
Raises
------
ValueError
If `point_data` is a numpy array with fewer than 6 columns.
Notes
-----
The labels are changed in-place to reflect the connected trajectories. For
example, if there are 3 trajectories with labels 0, 1, 2 and the first two
are connected, then all points which previously had the label 1 will be
changed to label 0; the last trajectory's label remains unchanged, 2.
Examples
--------
[TODO] - add full tutorial page on Bham PIC GitHub page for this.
See Also
--------
segregate_trajectories : Segregate the intertwined points from multiple
trajectories into individual paths.
PlotlyGrapher : Easy, publication-ready plotting of PEPT-oriented data.
'''
# Check `point_data` is a numpy array or pept.PointData
if isinstance(trajectories_points, pept.PointData):
trajs = trajectories_points.points
else:
trajs = np.asarray(trajectories_points, dtype = float, order = "C")
if trajs.ndim != 2 or trajs.shape[1] < 6:
raise ValueError((
"\n[ERROR]: `trajectories_points` should have dimensions "
f"(M, N), where N >= 6. Received {trajs.shape}.\n"
))
# Type-check the input parameters
max_time_difference = float(max_time_difference)
max_signature_difference = float(max_signature_difference)
points_to_check = int(points_to_check)
signature_col = int(signature_col)
label_col = int(label_col)
as_list = bool(as_list)
# Separate the trajs array into a list of individual trajectories based on
# the `label_col`.
trajectory_list = pept.utilities.group_by_column(trajs.copy(), label_col)
trajectory_list = _connect_trajectories(
trajectory_list,
max_time_difference,
max_signature_difference,
points_to_check,
signature_col,
label_col
)
if as_list:
return trajectory_list
elif isinstance(trajectories_points, pept.PointData):
trajectories_points_connected = pept.PointData(
np.vstack(trajectory_list),
sample_size = trajectories_points.sample_size,
overlap = trajectories_points.overlap,
verbose = False
)
return trajectories_points_connected
else:
return np.vstack(trajectory_list)
@with_continuations() # Use tail-call optimisation from tco.py
def _connect_trajectories(
trajectory_list,
max_time_difference,
max_signature_difference,
points_to_check,
signature_col = 4,
label_col = -1,
self = None
):
number_of_trajectories = len(trajectory_list)
# Check all pairs of trajectories. Each trajectory has two ends:
# Traj1: [End11, End12]
# Traj2: [End21, End22]
for i in range(number_of_trajectories - 1):
for j in range(i + 1, number_of_trajectories):
t1 = trajectory_list[i] # Traj1
t2 = trajectory_list[j] # Traj2
# Try to connect End11 with End22
# Check the time difference between End11 and End22 is small enough
if time_difference(t1, t2) < max_time_difference:
# Check the signature difference of the last `points_to_check`
# points is small enough
if signature_difference(
t1, t2, points_to_check, signature_col
) < max_signature_difference:
# Assimilate the column labels
t1[:, label_col] = t2[0, label_col]
# Merge the two trajectories
connected_trajectory = np.append(t2, t1, axis = 0)
# Delete the individual trajectories and append the merged
# trajectories to `trajectory_list`
del trajectory_list[i], trajectory_list[j - 1]
trajectory_list.append(connected_trajectory)
# Call the function again (to reinitialise
# `number_of_trajectories`) using tail call optimisation
# from tco.py
return self(
trajectory_list,
max_time_difference,
max_signature_difference,
points_to_check,
signature_col,
label_col
)
# Try to connect End12 with End21
elif time_difference(t2, t1) < max_time_difference:
if signature_difference(
t2, t1, points_to_check, signature_col
) < max_signature_difference:
t2[:, label_col] = t1[0, label_col]
connected_trajectory = np.append(t1, t2, axis = 0)
del trajectory_list[i], trajectory_list[j - 1]
trajectory_list.append(connected_trajectory)
return self(
trajectory_list,
max_time_difference,
max_signature_difference,
points_to_check,
signature_col,
label_col
)
return trajectory_list
def signature_difference(
traj1,
traj2,
points_to_check,
signature_col
):
return np.abs(
np.average(traj1[:points_to_check, signature_col]) -
np.average(traj2[-points_to_check:, signature_col])
)
def time_difference(traj1, traj2):
return np.abs(traj1[0, 0] - traj2[-1, 0])