#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : base.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date : 29.09.2021
import textwrap
import numpy as np
from scipy.optimize import minimize
from pept import LineData, PointData
from pept.base import LineDataFilter, Filter
from ..peptml import get_cutoffs
from .cutpoints_tof import find_cutpoints_tof
[docs]class TimeOfFlight(LineDataFilter):
'''Compute the positron annihilation locations of each LoR as given by the
Time Of Flight (ToF) data of the two LoR timestamps.
Filter signature:
::
LineData -> TimeOfFlight.fit_sample -> LineData (points = False)
LineData -> TimeOfFlight.fit_sample -> PointData (points = True)
Importantly, the input LineData must have at least 8 columns, formatted as
[t1, x1, y1, z1, x2, y2, z2, t2] - notice the different timestamps of the
two LoR ends.
If `points = False` (default), the computed ToF points are saved as an
extra attribute "tof" in the input LineData; otherwise they are returned
directly.
The `temporal_resolution` should be set to the FWHM of the temporal
resolution in the LoR timestamps, in self-consistent units of measure (e.g.
m / s or mm / ms, but not mm / s). If it is set, the "temporal_resolution"
and "spatial_resolution" extra attributes are set on the ToF points.
*New in pept-0.4.2*
Examples
--------
Generate 10 random LoRs between (-100, 100) mm and ms with 8 columns for
the ToF format.
>>> import numpy as np
>>> import pept
>>> rng = np.random.default_rng(0)
>>> lors = pept.LineData(
>>> rng.uniform(-100, 100, (10, 8)),
>>> columns = ["t1", "x1", "y1", "z1", "x2", "y2", "z2", "t2"],
>>> )
>>> lors
pept.LineData (samples: 1)
--------------------------
sample_size = 10
overlap = 0
lines =
(rows: 10, columns: 8)
[[ 57.4196615 -52.1261114 ... -9.93212667 59.26485406]
[-53.8715582 -89.59573979 ... -40.26077344 34.39897559]
...
[ 51.59020047 2.55174465 ... -31.13800424 -13.94025361]
[ 93.21241616 12.44636845 ... -75.08905883 -42.3338486 ]]
columns = ['t1', 'x1', 'y1', 'z1', 'x2', 'y2', 'z2', 't2']
attrs = {}
Compute Time of Flight annihilation locations from the two timestamps
in the data above. Assume a temporal resolution of 100 ps - be careful to
use self-consistent units; in this case we are using mm and ms:
>>> from pept.tracking import *
>>> temporal_resolution = 100e-12 * 1000 # ms
>>> lors_tof = TimeOfFlight(temporal_resolution).fit_sample(lors)
>>> lors_tof
pept.LineData (samples: 1)
--------------------------
sample_size = 10
overlap = 0
lines =
(rows: 10, columns: 8)
[[ 57.4196615 -52.1261114 ... -9.93212667 59.26485406]
[-53.8715582 -89.59573979 ... -40.26077344 34.39897559]
...
[ 51.59020047 2.55174465 ... -31.13800424 -13.94025361]
[ 93.21241616 12.44636845 ... -75.08905883 -42.3338486 ]]
columns = ['t1', 'x1', 'y1', 'z1', 'x2', 'y2', 'z2', 't2']
attrs = {
'tof': pept.PointData (samples: 1)
---------------------------
sample_...
}
>>> lors_tof.attrs["tof"]
pept.PointData (samples: 1)
---------------------------
sample_size = 10
overlap = 0
points =
(rows: 10, columns: 4)
[[ 5.64970655e+01 -3.22092074e+07 2.41767704e+08 -1.30428351e+08]
[-9.80068250e+01 -2.48775932e+09 -1.12904720e+10 -6.43480969e+09]
...
[ 1.88249731e+01 3.34819602e+09 -8.78848458e+09 2.83529405e+09]
[ 2.54392837e+01 1.90343279e+10 -1.92717662e+09 -6.84078611e+09]]
columns = ['t', 'x', 'y', 'z']
attrs = {
'temporal_resolution': 1.0000000000000001e-07
'spatial_resolution': 29.9792458
}
Alternatively, you can extract only the ToF points directly:
>>> tof = TimeOfFlight(temporal_resolution, points = True).fit_sample(lors)
>>> tof
pept.PointData (samples: 1)
---------------------------
sample_size = 10
overlap = 0
points =
(rows: 10, columns: 4)
[[ 5.64970655e+01 -3.22092074e+07 2.41767704e+08 -1.30428351e+08]
[-9.80068250e+01 -2.48775932e+09 -1.12904720e+10 -6.43480969e+09]
...
[ 1.88249731e+01 3.34819602e+09 -8.78848458e+09 2.83529405e+09]
[ 2.54392837e+01 1.90343279e+10 -1.92717662e+09 -6.84078611e+09]]
columns = ['t', 'x', 'y', 'z']
attrs = {
'temporal_resolution': 1.0000000000000001e-07
'spatial_resolution': 29.9792458
}
'''
[docs] def __init__(self, temporal_resolution = None, points = False):
if temporal_resolution is None:
self.temporal_resolution = None
else:
self.temporal_resolution = float(temporal_resolution)
self.points = bool(points)
[docs] def fit_sample(self, sample: LineData):
if not isinstance(sample, LineData):
sample = LineData(sample)
if sample.lines.shape[1] < 8:
raise ValueError(textwrap.fill((
"The input `sample` of LineData must have at least 8 columns, "
"formatted as [t1, x1, y1, z1, x2, y2, z2, t2]. Received "
f"sample with `sample.lines.shape = {sample.lines.shape}`."
)))
# The two points defining the LoR
t1 = sample.lines[:, 0]
p1 = sample.lines[:, 1:4]
p2 = sample.lines[:, 4:7]
t2 = sample.lines[:, 7]
# Speed of light (mm / ms)
c = 299792458.
# The ratio (P1 - tofpoint) / (P1 - P2) for all rows
distance_ratio = (
0.5 - 0.5 / np.linalg.norm(p2 - p1, axis = 1) * c * (t2 - t1)
)
# [:, np.newaxis] = transform row vector to column vector (i.e. 2D
# array with one column)
tof_locations = p1 + (p2 - p1) * distance_ratio[:, np.newaxis]
tof_time = t1 - np.linalg.norm(tof_locations - p1, axis = 1) / c
# Encapsulate ToF points in a PointData. Save resolution as attrs
tof_pts = PointData(np.c_[tof_time, tof_locations])
if self.temporal_resolution is not None:
tof_pts.attrs["temporal_resolution"] = self.temporal_resolution
tof_pts.attrs["spatial_resolution"] = self.temporal_resolution * c
if self.points:
return tof_pts
sample.attrs["tof"] = tof_pts
return sample
[docs]class CutpointsToF(LineDataFilter):
'''Compute cutpoints from all pairs of lines whose Time Of Flight-predicted
locations are closer than `max_distance`.
Filter signature:
::
LineData -> CutpointsToF.fit_sample -> PointData
If the ``TimeOfFlight`` filter was used and a temporal resolution was
specified (as a FWHM), then `max_distance` is automatically inferred as
the minimum between 2 * "spatial_resolution" and the dimension-wise
standard deviation of the input points.
The `cutoffs` parameter may be set as [xmin, xmax, ymin, ymax, zmin, zmax]
for a minimum bounding box outside of which cutpoints are discarded.
Otherwise it is automatically set to the minimum bounding box containing
all input LoRs.
If `append_indices = True`, two extra columns are appended to the result as
"line_index1" and "line_index2" containing the indices of the LoRs that
produced each cutpoint; an extra attribute "_lines" is also set to the
input `LineData`.
If `cutpoints_only = False` (default), the Time Of Flight-predicted
positron annihilation locations are also appended to the returned points.
*New in pept-0.4.2*
See Also
--------
pept.LineData : Encapsulate LoRs for ease of iteration and plotting.
pept.tracking.HDBSCAN : Efficient, HDBSCAN-based clustering of (cut)points.
pept.read_csv : Fast CSV file reading into numpy arrays.
Examples
--------
Make sure to use the ``TimeOfFlight`` filter to compute to ToF annihilation
locations; if you specify a temporal resolution, the `max_distance`
parameter is automatically computed:
>>> import pept
>>> from pept.tracking import *
>>> line_data = pept.LineData(example_tof_data)
>>> line_data_tof = TimeOfFlight(100e-9).fit_sample(line_data)
>>> cutpoints_tof = CutpointsToF().fit_sample(line_data_tof)
Alternatively, set `max_distance` yourself:
>>> line_data = pept.LineData(example_tof_data)
>>> line_data_tof = TimeOfFlight().fit_sample(line_data)
>>> cutpoints_tof = CutpointsToF(5.0).fit_sample(line_data_tof)
'''
[docs] def __init__(
self,
max_distance = None,
cutoffs = None,
append_indices = False,
cutpoints_only = False,
):
# Setting class attributes. The ones below call setters which do type
# checking
self.cutoffs = cutoffs
self.append_indices = append_indices
self.max_distance = max_distance
self.cutpoints_only = bool(cutpoints_only)
@property
def max_distance(self):
return self._max_distance
@max_distance.setter
def max_distance(self, max_distance):
if max_distance is None:
self._max_distance = None
else:
self._max_distance = float(max_distance)
@property
def cutoffs(self):
return self._cutoffs
@cutoffs.setter
def cutoffs(self, cutoffs):
if cutoffs is not None:
cutoffs = np.asarray(cutoffs, order = 'C', dtype = float)
if cutoffs.ndim != 1 or len(cutoffs) != 6:
raise ValueError((
"\n[ERROR]: cutoffs should be a one-dimensional array "
"with values [min_x, max_x, min_y, max_y, min_z, max_z]. "
f"Received {cutoffs}.\n"
))
self._cutoffs = cutoffs
else:
self._cutoffs = None
@property
def append_indices(self):
return self._append_indices
@append_indices.setter
def append_indices(self, append_indices):
self._append_indices = bool(append_indices)
[docs] def fit_sample(self, sample_lines: LineData):
if not isinstance(sample_lines, LineData):
sample_lines = LineData(sample_lines)
# Ensure ToF data is present and extract it
if "tof" not in sample_lines.attrs:
raise AttributeError(textwrap.fill((
"The input `sample_lines` must have an attribute 'tof' "
"containing the Time of Flight predicted annihilation point "
"for each LoR. This is normally set by "
"`pept.tracking.TimeOfFlight`."
)))
tofpoints = sample_lines.attrs["tof"]
tofattrs = tofpoints.attrs
# If max_distance was not defined, use FWHM spatial_resolution
if self.max_distance is not None:
max_distance = self.max_distance
elif "spatial_resolution" in tofpoints.attrs:
max_distance = min(
tofpoints.attrs["spatial_resolution"] * 2,
tofpoints.points[:, 1:4].std(axis = 0).max(),
)
else:
max_distance = tofpoints.points[:, 1:4].std(axis = 0).max()
# If cutoffs were not defined, automatically compute them
if self.cutoffs is not None:
cutoffs = self.cutoffs
else:
cutoffs = get_cutoffs(sample_lines.lines)
# Only compute cutpoints if there are at least 2 LoRs
if len(sample_lines.lines) >= 2:
sample_cutpoints = find_cutpoints_tof(
sample_lines.lines,
tofpoints.points,
max_distance,
cutoffs,
append_indices = self.append_indices,
)
else:
sample_cutpoints = np.empty((0, 6 if self.append_indices else 4))
# Column names
columns = ["t", "x", "y", "z"]
if self.append_indices:
columns += ["line_index1", "line_index2"]
# Also include tofpoints if not self.cutpoints_only
if self.cutpoints_only:
points = PointData(sample_cutpoints, columns = columns, **tofattrs)
else:
tofs = tofpoints.points
if self.append_indices:
tofs = np.c_[tofs, np.arange(len(tofs)), np.arange(len(tofs))]
points = PointData(
np.vstack((sample_cutpoints, tofs)),
columns = columns,
**tofattrs,
)
# Add optional metadata to the points; because they have an underscore,
# they won't be propagated when new objects are constructed
points.attrs["_max_distance"] = max_distance
points.attrs["_cutoffs"] = cutoffs
if self.append_indices:
points.attrs["_lines"] = sample_lines
return points
def _gaussian_probs(x, points, sigma):
'''Probabilities from x to Gaussian-distributed points.'''
dists = np.sum((x[:3] - points)**2, axis = 1)
return np.exp(-0.5 * dists / sigma**2)
[docs]class GaussianDensity(Filter):
'''Append weights according to the Gaussian distribution that best fits
the samples of points.
Filter signature:
::
PointData -> GaussianDensity.fit_sample -> PointData
numpy.ndarray -> GaussianDensity.fit_sample -> PointData
list[PointData] -> GaussianDensity.fit_sample -> list[PointData]
This is treated as an optimisation problem: find the 3D location that
maximises the sum of Probability Distributions (PDF) centered at each
point.
::
Given N points p_1, p_2, ..., p_N:
N
maximise sum( exp( -0.5 * |x - p_i|^2 / sigma^2 ) )
x i
Each point is then assigned a weight corresponding to its PDF - i.e. the
exponential part - saved in the `weight` column.
Sigma controls the standard deviation of the Gaussian distribution centred
at each point; this corresponds to the relative uncertainty in each point's
location. For ``TimeOfFlight`` data, leave `sigma = None` and it will be
computed from the "spatial_resolution" attribute.
You can use ``Centroids`` afterwards to compute the weighted centroid, i.e.
where the tracer is. For multiple particle tracking (or just more
robustness to noise) you can use `HDBSCAN + SplitLabels` beforehand.
*New in pept-0.4.2*
'''
[docs] def __init__(self, sigma = None):
self.sigma = None if sigma is None else float(sigma)
def _add_weights(self, point_data, sigma):
# Extract the inner NumPy array of points
points = point_data.points
# If no points were given, still return an empty array with the correct
# number of columns
if len(points) == 0:
return np.empty((0, points.shape[1] + 1))
# Find the 3D point that maximises the sum of PDFs
xyz = np.array(points[:, 1:4])
res = minimize(
lambda x: -np.sum(_gaussian_probs(x, xyz, sigma)),
np.median(xyz, axis = 0),
method = "SLSQP",
)
# The PDF of each point acts as a weight
weights = _gaussian_probs(res.x, xyz, sigma)
return np.c_[points, weights]
[docs] def fit_sample(self, points):
# Type-checking inputs
return_list = False
if isinstance(points, PointData):
list_points = [points]
elif isinstance(points, np.ndarray):
list_points = [PointData(points)]
else:
return_list = True
list_points = [
p if isinstance(p, PointData) else PointData(p)
for p in list(points)
]
if not len(list_points):
raise ValueError("Must receive at least one PointData.")
# Ensure either self.sigma is set or points.attrs["spatial_resolution"]
# exists
if self.sigma is None and any(
("spatial_resolution" not in lp.attrs for lp in list_points)
):
raise AttributeError(textwrap.fill((
"If `sigma` is not set, `points.attrs['spatial_resolution']` "
"is used. However, it was not found (it is normally set by "
"`pept.tracking.TimeOfFlight` if `temporal_resolution` is "
"given)."
)))
else:
fwhms = 2 * np.sqrt(2 * np.log(2))
self.sigma = list_points[0].attrs["spatial_resolution"] / fwhms
# Add "weight" column to each PointData
list_weighted = [self._add_weights(p, self.sigma) for p in list_points]
columns = list_points[0].columns + ["weight"]
if return_list:
return [
points.copy(data = weighted, columns = columns)
for points, weighted in zip(list_points, list_weighted)
]
return PointData(
np.vstack(list_weighted),
columns = columns,
**list_points[0].attrs,
)