#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : transformers.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date : 08.08.2021
import re
import warnings
from numbers import Number
import textwrap
import numpy as np
import pandas as pd
from scipy.optimize import minimize_scalar
from pept.base import PointData, LineData
from pept.base import Filter, Reducer, IterableSamples
from pept.base import PEPTObject, AdaptiveWindow
[docs]class Stack(Reducer):
'''Stack iterables - for example a ``list[pept.LineData]`` into a single
``pept.LineData``, a ``list[list]`` into a flattened ``list``.
Reducer signature:
::
list[LineData] -> Stack.fit -> LineData
list[PointData] -> Stack.fit -> PointData
list[list[Any]] -> Stack.fit -> list[Any]
list[numpy.ndarray] -> Stack.fit -> numpy.ndarray
other -> Stack.fit -> other
Can optionally set a given `sample_size` and `overlap`. This is useful
when collecting a list of processed samples back into a single object.
'''
[docs] def __init__(self, sample_size = None, overlap = None):
self.sample_size = sample_size
self.overlap = overlap
[docs] def fit(self, samples):
# If it's a LineData / PointData, the `samples` are already stacked.
# Simply set the sample_size and overlap if required and return them
if not hasattr(samples, "__iter__"):
raise ValueError(textwrap.fill((
"The input `samples` must be an iterable (e.g. list, tuple, "
f"PointData, LineData). Received type=`{type(samples)}`."
)))
if isinstance(samples, IterableSamples):
if self.sample_size is not None:
samples.sample_size = self.sample_size
if self.sample_size is not None:
samples.overlap = self.overlap
return samples
# If it's an empty iterator, we don't have anything to stack
if len(samples) == 0:
return samples
# Stack Lines into LineData
elif isinstance(samples[0], LineData):
samples = LineData(samples)
# Stack Points into PointData
elif isinstance(samples[0], PointData):
samples = PointData(samples)
# Flatten list of lists
elif isinstance(samples[0], list):
samples = [item for sublist in samples for item in sublist]
# Vertically stack list of NumPy arrays
elif isinstance(samples[0], np.ndarray):
samples = np.vstack(samples)
# Set new sample_size and overlap if required
if self.sample_size is not None:
samples.sample_size = self.sample_size
if self.sample_size is not None:
samples.overlap = self.overlap
return samples
[docs]class SplitLabels(Filter):
'''Split a sample of data into unique ``label`` values, optionally removing
noise and extracting `_lines` attributes.
Filter signature:
::
# `extract_lines` = False (default)
LineData -> SplitLabels.fit_sample -> list[LineData]
PointData -> SplitLabels.fit_sample -> list[PointData]
# `extract_lines` = True and PointData.attrs["_lines"] exists
PointData -> SplitLabels.fit_sample -> list[LineData]
The sample of data must have a column named exactly "label". If
``remove_label = True`` (default), the "label" column is removed.
'''
[docs] def __init__(
self,
remove_labels = True,
extract_lines = False,
noise = False,
):
self.remove_labels = bool(remove_labels)
self.extract_lines = bool(extract_lines)
self.noise = bool(noise)
def _get_cluster(self, sample, labels_mask, lines_cols = None):
# Extract the labels column
cluster_data = sample.data[labels_mask]
if lines_cols is not None:
line_indices = np.unique(cluster_data[:, lines_cols])
lines = sample.attrs["_lines"].lines
cluster_lines = lines[line_indices.astype(int)]
if self.extract_lines:
return sample.attrs["_lines"].copy(data = cluster_lines)
cluster = sample.copy(data = cluster_data)
if lines_cols is not None:
cluster.attrs["_lines"] = sample.attrs["_lines"].copy(
data = cluster_lines,
)
return cluster
def _empty_cluster(self, sample, lines_cols = None):
if self.extract_lines:
# Return empty LineData
return sample.attrs["_lines"].copy(
data = sample.attrs["_lines"][0:0],
)
cluster = sample.copy(data = sample[0:0])
if lines_cols is not None:
cluster.attrs["_lines"] = sample.attrs["_lines"].copy(
data = sample.attrs["_lines"][0:0],
)
return cluster
[docs] def fit_sample(self, sample: IterableSamples):
if not isinstance(sample, IterableSamples):
raise TypeError(textwrap.fill((
"The input `sample` must be a subclass of `IterableSamples` "
f"(e.g. PointData, LineData). Received type=`{type(sample)}`."
)))
# Extract the labels column
col_idx = sample.columns.index("label")
labels = sample.data[:, col_idx]
# Check if there is a `._lines` attribute with `line_index` columns
lines_cols = None
if "_lines" in sample.attrs:
lines_cols = [
i for i, c in enumerate(sample.columns)
if c.startswith("line_index")
]
if len(lines_cols) == 0:
warnings.warn((
"A `_lines` attribute was found, but no lines can "
"be extracted without columns `line_index<N>`."
), RuntimeWarning)
lines_cols = None
self.extract_lines = False
elif self.extract_lines:
raise ValueError(textwrap.fill((
"If `extract_lines` is True, then the input `sample` must "
"contain a `_lines` attribute."
)))
# If noise is requested, also include the noise cluster
if self.noise:
labels_unique = np.unique(labels)
else:
labels_unique = np.unique(labels[labels != -1])
# For each unique label, create a new PointData / LineData cluster that
# maintains / propagates all attributes (which needs a copy)
clusters = [
self._get_cluster(sample, labels == label, lines_cols)
for label in labels_unique
]
# If no valid cluster was found, return at least a single empty cluster
if not len(clusters):
clusters.append(self._empty_cluster(sample, lines_cols))
# Remove the "label" column if needed
if self.remove_labels and not self.extract_lines:
for i in range(len(clusters)):
clusters[i] = clusters[i].copy(
data = np.delete(clusters[i].data, col_idx, axis = 1),
columns = (clusters[i].columns[:col_idx] +
clusters[i].columns[col_idx + 1:]),
)
return clusters
def _wstd(x, w):
'''Weighted standard deviation'''
avg = np.average(x, weights = w)
return np.sqrt(np.average((x - avg)**2, weights = w))
[docs]class Centroids(Filter):
'''Compute the geometric centroids of a list of samples of points.
Filter signature:
::
PointData -> Centroids.fit_sample -> PointData
list[PointData] -> Centroids.fit_sample -> PointData
numpy.ndarray -> Centroids.fit_sample -> PointData
This filter can be used right after ``pept.tracking.SplitLabels``, e.g.:
>>> (SplitLabels() + Centroids()).fit(points)
If `error = True`, append a measure of error on the computed centroid as
the standard deviation in distances from centroid to all points. It is
saved in an extra column "error".
If `cluster_size = True`, append the number of points used for each
centroid in an extra column "cluster_size" - unless `weight = True`, in
which case it is the sum of weights.
If `weight = True` and there is a column "weight" in the PointData, compute
weighted centroids and standard deviations (if `error = True`) and the sum
of weights (if `cluster_size = True`). The "weight" column is removed in
the output centroid.
'''
[docs] def __init__(self, error = False, cluster_size = False, weight = True):
self.error = bool(error)
self.cluster_size = bool(cluster_size)
self.weight = bool(weight)
def _empty_centroid(self, points, weighted: bool):
# Return an empty centroid with the correct number of columns
ncols = points.shape[1]
if self.error:
ncols += 1
if self.cluster_size:
ncols += 1
if weighted:
ncols -= 1
return np.empty((0, ncols))
def _centroid(self, point_data, weighted: bool):
# Extract the NumPy array of points
points = point_data.points
if len(points) == 0:
return self._empty_centroid(points, weighted)
if weighted:
weightcol = point_data.columns.index("weight")
weights = point_data.points[:, weightcol]
# If all weights are zero, no cluster had been found
if weights.sum() == 0.:
return self._empty_centroid(points, weighted)
c = np.average(points, weights = weights, axis = 0)
c = np.delete(c, weightcol)
else:
c = points.mean(axis = 0)
# If error is requested, compute std-dev of distances from centroid
if self.error:
d = np.linalg.norm(points[:, 1:4] - c[1:4], axis = 1)
c = np.r_[c, _wstd(d, weights) if weighted else d.std()]
# If cluster_size is requested, also append the number of points
if self.cluster_size:
c = np.r_[c, weights.sum() if weighted else len(points)]
return c
[docs] def fit_sample(self, points):
# Type-checking inputs
if isinstance(points, PointData):
list_points = [points]
elif isinstance(points, np.ndarray):
list_points = [PointData(points)]
else:
list_points = [
p if isinstance(p, PointData) else PointData(p)
for p in list(points)
]
if not len(list_points):
raise ValueError("Must receive at least one PointData.")
# If self.weight and there is a `weight` column, compute weighted
# centroids
weigh = (self.weight and "weight" in list_points[0].columns)
# Compute centroid for each PointData and stack centroid arrays
centroids = np.vstack([self._centroid(p, weigh) for p in list_points])
attributes = list_points[0].extra_attrs()
# If error or cluster_size are requested, append those columns
columns = list_points[0].columns
# Omit the `weight` column if weigh
if weigh:
weightcol = list_points[0].columns.index("weight")
columns = columns[:weightcol] + columns[weightcol + 1:]
if self.error:
columns.append("error")
if self.cluster_size:
columns.append("cluster_size")
return PointData(centroids, columns = columns, **attributes)
[docs]class LinesCentroids(Filter):
'''Compute the minimum distance point of some ``pept.LineData`` while
iteratively removing a fraction of the furthest lines.
Filter signature:
::
list[LineData] -> LinesCentroids.fit_sample -> PointData
LineData -> LinesCentroids.fit_sample -> PointData
numpy.ndarray -> LinesCentroids.fit_sample -> PointData
The code below is adapted from the PEPT-EM algorithm developed by Antoine
Renaud and Sam Manger.
'''
[docs] def __init__(self, remove = 0.1, iterations = 6):
self.remove = float(remove)
self.iterations = int(iterations)
[docs] @staticmethod
def centroid(lors):
nx = np.newaxis
m = np.identity(3)[nx, :, :] - lors[:, nx, 4:7] * lors[:, 4:7, nx]
n = np.sum(m, axis = 0)
v = np.sum(np.sum(m * lors[:, nx, 1:4], axis=-1), axis=0)
return np.matmul(np.linalg.inv(n), v)
[docs] @staticmethod
def distance_matrix(x, lors):
y = x[np.newaxis, :3] - lors[:, 1:4]
return np.sum(y**2, axis=-1) - np.sum(y * lors[:, 4:7], axis=-1)**2
[docs] def predict(self, lines):
# Rewrite LoRs in the vectorial form y(x) = position + x * direction
lors = lines[:, :7].copy(order = "C")
lors[:, 4:7] = lors[:, 4:7] - lors[:, 1:4]
lors[:, 4:7] /= np.linalg.norm(lors[:, 3:], axis = -1)[:, np.newaxis]
# Begin with equal weights for all LoRs
weights = np.ones(len(lors))
x = LinesCentroids.centroid(lors)
# Iteratively remove the furthest LoRs and recompute centroid
for i in range(self.iterations):
d2 = LinesCentroids.distance_matrix(x, lors)
k = int(len(d2) * (1 - self.remove * (i + 1)))
part = np.argpartition(d2, k)
weights[part[k:]] = 0
x = LinesCentroids.centroid(lors)
# Add timestamp as the mean LoRs' time
return np.hstack((lors[:, 0].mean(), x))
[docs] def fit_sample(self, lines):
# Type-checking inputs
if isinstance(lines, LineData):
list_lines = [lines]
elif isinstance(lines, np.ndarray):
list_lines = [LineData(lines)]
else:
list_lines = list(lines)
centroids = [self.predict(lines.lines) for lines in list_lines]
return PointData(np.vstack(centroids), **list_lines[0].extra_attrs())
[docs]class Condition(Filter):
'''Select only data satisfying multiple conditions, given as a string, a
function or list thereof; e.g. ``Condition("error < 15")`` selects all
points whose "error" column value is smaller than 15.
Filter signature:
::
PointData -> Condition.fit_sample -> PointData
LineData -> Condition.fit_sample -> LineData
In the simplest case, a column name is specified, plus a comparison, e.g.
``Condition("error < 15, y > 100")``; multiple conditions may be
concatenated using a comma.
More complex conditions - where the column name is not the first operand -
can be constructed using single quotes, e.g. using NumPy functions in
``Condition("np.isfinite('x')")`` to filter out NaNs and Infs. Quotes can
be used to index columns too: ``Condition("'0' < 150")`` selects all rows
whose first column is smaller than 150.
Generally, you can use any function returning a boolean mask, either as a
string of code ``Condition("np.isclose('x', 3)")`` or a user-defined
function receiving a NumPy array ``Condition(lambda x: x[:, 0] < 10)``.
Finally, multiple such conditions may be supplied separately:
``Condition(lambda x: x[:, -1] > 10, "'t' < 50")``.
'''
[docs] def __init__(self, *conditions):
# Calls the conditions setter which does parsing
self.conditions = conditions
@property
def conditions(self):
return self._conditions
@conditions.setter
def conditions(self, conditions):
if isinstance(conditions, str):
self._conditions = Condition._parse_condition(conditions)
elif callable(conditions):
self._conditions = [conditions]
else:
cs = []
for cond in conditions:
cs.extend(Condition._parse_condition(cond))
self._conditions = cs
@staticmethod
def _parse_condition(cond):
if callable(cond):
return [cond]
conditions = str(cond).replace(" ", "").split(",")
# Compile regex object to find quoted strings
finder = re.compile(r"'\w+'")
for i in range(len(conditions)):
# Replace single-quoted column numbers / names
if "'" in conditions[i]:
conditions[i] = finder.sub(
Condition._replace_quoted,
conditions[i],
)
continue
# If condition is a simple comparison, allow using non-quoted
# column names
op = None
if "<" in conditions[i]:
op = "<"
elif ">" in conditions[i]:
op = ">"
elif "!" in conditions[i]:
op = "!"
elif "==" in conditions[i]:
op = "=="
if op is not None:
cs = conditions[i].split(op)
cs[0] = Condition._replace_term(cs[0])
conditions[i] = op.join(cs)
else:
raise ValueError(textwrap.fill((
f"The input `conditions[i] = {conditions[i]}` did not "
"contain an operator or single-quoted terms."
)))
return conditions
@staticmethod
def _replace_term(term: str):
return f"data[:, sample.columns.index('{term}')]"
@staticmethod
def _replace_quoted(term):
# Remove single quotes
if isinstance(term, re.Match):
term = term.group()
term = term.split("'")[1]
try:
index = int(term)
return f"data[:, {index}]"
except ValueError:
return f"data[:, sample.columns.index('{term}')]"
[docs] def fit_sample(self, sample: IterableSamples):
if not isinstance(sample, IterableSamples):
raise TypeError(textwrap.fill((
"The input `sample` must be a subclass of `IterableSamples` "
f"(e.g. PointData, LineData). Received type=`{type(sample)}`."
)))
data = sample.data
for cond in self.conditions:
if callable(cond):
data = data[cond(data)]
else:
data = data[eval(cond, globals(), locals())]
return sample.copy(data = data)
[docs]class SamplesCondition(Reducer):
'''Select only *samples* satisfying multiple conditions, given as a string,
a function or list thereof; e.g. ``Condition("sample_size > 30")`` selects
all samples with a sample size larger than 30.
Filter signature:
::
PointData -> SamplesCondition.fit_sample -> PointData
LineData -> SamplesCondition.fit_sample -> LineData
This is different to a `Condition`, which selects individual points; for
`SamplesCondition`, each sample will be passed through the conditions.
Conditions can be defined as Python code using the following variables:
- `sample` - this is the full PointData or LineData, e.g. only keep samples
with more than 30 points with "len(sample.points) > 30".
- `data` - this is the raw NumPy array of data wrapped by a PointData or
LineData, e.g. only keep samples which have all X coordinates beyond 100
with `SamplesCondition("np.all(data[:, 1] > 100)")`.
- `sample_size` - this is a shorthand for the number of data points, e.g.
only keep samples with more than 30 points with "sample_size > 30".
Conditions can also be Python functions:
>>> def high_velocity_filter(sample):
>>> return np.all(sample["v"] > 5)
>>> from pept.tracking import SamplesCondition
>>> filtered = SamplesCondition(high_velocity_filter).fit(point_data)
'''
[docs] def __init__(self, *conditions):
# Calls the conditions setter which does parsing
self.conditions = conditions
@property
def conditions(self):
return self._conditions
@conditions.setter
def conditions(self, conditions):
if isinstance(conditions, str):
self._conditions = SamplesCondition._parse_condition(conditions)
elif callable(conditions):
self._conditions = [conditions]
else:
cs = []
for cond in conditions:
cs.extend(SamplesCondition._parse_condition(cond))
self._conditions = cs
@staticmethod
def _parse_condition(cond):
if callable(cond):
return [cond]
conditions = str(cond).replace(" ", "").split(",")
# Compile regex object to find quoted strings
finder = re.compile(r"'\w+'")
for i in range(len(conditions)):
# Replace single-quoted column numbers / names
if "'" in conditions[i]:
conditions[i] = finder.sub(
SamplesCondition._replace_quoted,
conditions[i],
)
continue
return conditions
@staticmethod
def _replace_quoted(term):
# Remove single quotes
if isinstance(term, re.Match):
term = term.group()
term = term.split("'")[1]
try:
index = int(term)
return f"data[:, {index}]"
except ValueError:
return f"data[:, sample.columns.index('{term}')]"
[docs] def fit(self, samples):
# Filtered samples
collected = []
for i, sample in enumerate(samples):
# Defining variables to be accessed inside conditions
if isinstance(sample, IterableSamples):
data = sample.data
else:
data = sample
sample_size = len(data)
# Check all conditions are true
keep = True
for cond in self.conditions:
if callable(cond):
keep = keep and cond(sample)
else:
keep = keep and eval(cond, globals(), locals())
if keep:
collected.append(sample)
elif len(collected) == 0 and i == len(samples) - 1:
# If no sample was retained, save an empty sample
collected.append(sample[0:0])
return collected
[docs]class Remove(Filter):
'''Remove columns (either column names or indices) from `pept.LineData` or
`pept.PointData`.
Filter signature:
::
pept.LineData -> Remove.fit_sample -> pept.LineData
pept.PointData -> Remove.fit_sample -> pept.PointData
Examples
--------
To remove a single column named "line_index":
>>> import pept
>>> from pept.tracking import *
>>> points = pept.PointData(...) # Some dummy data
>>> rem = Remove("line_index")
>>> points_without = rem.fit_sample(points)
Remove all columns starting with "line_index" using a glob operator (*):
>>> points_without = Remove("line_index*").fit_sample(points)
Remove the first column based on its index:
>>> points_without = Remove(0).fit_sample(points)
Finally, multiple removals may be chained into a list:
>>> points_without = Remove(["line_index*", -1]).fit_sample(points)
'''
[docs] def __init__(self, *columns):
self._indices = []
self._filters = []
# Calls the `columns` setter which does parsing
self.columns = columns
@property
def columns(self):
return self._columns
@columns.setter
def columns(self, columns):
self._columns = [Remove._parse(col) for col in columns]
# Split the removers into regex strings and column indices
for c in self._columns:
if isinstance(c, str):
self._filters.append(c)
else:
self._indices.append(c)
@staticmethod
def _parse(col):
if isinstance(col, str):
return col.replace("*", r"\w*")
elif isinstance(col, Number):
return int(col)
else:
raise ValueError(textwrap.fill((
"Each input argument in `columns` must be a string or an "
f"integer. One of them was `type(col) = {type(col)}`."
)))
[docs] def fit_sample(self, sample: IterableSamples):
if not isinstance(sample, IterableSamples):
raise TypeError(textwrap.fill((
"The input `sample` must be a subclass of `IterableSamples` "
f"(e.g. PointData, LineData). Received type=`{type(sample)}`."
)))
# Extract the relevant `sample` attributes
columns = sample.columns
ncols = len(columns)
# The regex filters to use and column numbers to remove
filters = self._filters
indices = self._indices
# Column indices to remove and remaining column names
removed = set()
columns_filtered = []
for i, c in enumerate(columns):
# Also handle negative indices
if any((re.fullmatch(r, c) for r in filters)) or \
any((i == ind or i == ind + ncols for ind in indices)):
removed.add(i)
else:
columns_filtered.append(c)
indices_filtered = [i for i in range(len(columns)) if i not in removed]
data = sample.data[:, indices_filtered]
return sample.copy(data = data, columns = columns_filtered)
[docs]class GroupBy(Reducer):
'''Stack all samples and split them into a list according to a named /
numeric column index.
Reducer signature:
::
LineData -> SplitAll.fit -> list[LineData]
list[LineData] -> SplitAll.fit -> list[LineData]
PointData -> SplitAll.fit -> list[PointData]
list[PointData] -> SplitAll.fit -> list[PointData]
numpy.ndarray -> SplitAll.fit -> list[numpy.ndarray]
list[numpy.ndarray] -> SplitAll.fit -> list[numpy.ndarray]
If using a LineData / PointData, you can use a columns name as a string,
e.g. ``SplitAll("label")`` or a number ``SplitAll(4)``. If using a NumPy
array, only numeric indices are accepted.
'''
[docs] def __init__(self, column):
try:
self.column_index = int(column)
self.column_name = None
except ValueError:
self.column_name = str(column)
self.column_index = None
[docs] def fit(self, samples):
if not hasattr(samples, "__iter__"):
raise TypeError(textwrap.fill((
"The input `samples` must be an iterable (e.g. list, tuple, "
f"PointData, LineData). Received type=`{type(samples)}`."
)))
# Reduce / stack list of samples onto a single IterableSamples / array
samples = Stack().fit(samples)
if isinstance(samples, np.ndarray):
return self._split_numpy(samples)
elif isinstance(samples, IterableSamples):
return self._split_iterable_samples(samples)
else:
raise TypeError(textwrap.fill((
"The input samples must be NumPy arrays, PointData / LineData "
f"or lists thereof. Received `type(samples) = {type(samples)}`"
)))
def _split_numpy(self, samples):
if self.column_index is None:
raise TypeError(textwrap.fill((
"If the samples are NumPy arrays, you must use a numeric "
f"column index; used a named column: `{self.column_name}`."
)))
col_data = samples[:, self.column_index]
labels = np.unique(col_data)
# If no labels exist, return a list with an empty sample
if not len(labels):
return [samples[0:0]]
return [samples[col_data == label] for label in labels]
def _split_iterable_samples(self, samples):
if self.column_index is not None:
col_data = samples.data[:, self.column_index]
else:
col_data = samples.data[:, samples.columns.index(self.column_name)]
labels = np.unique(col_data)
# If no labels exist, return a list with an empty sample
if not len(labels):
return [samples[0:0]]
return [
samples.copy(data = samples.data[col_data == label])
for label in labels
]
# Use standard names, Leonard...
SplitAll = GroupBy
[docs]class Swap(Filter):
'''Swap two columns in a LineData or PointData.
Filter signature:
::
LineData -> Swap.fit_sample -> LineData
PointData -> Swap.fit_sample -> PointData
For example, swap the Y and Z axes: ``Swap("y, z").fit_sample(points)``.
Add multiple swaps as separate arguments: ``Swap("y, z", "label, x")``.
You can also swap columns at numerical indices by single-quoting them:
``Swap("'0', '1'")``.
*New in pept-0.4.3*
'''
[docs] def __init__(self, *swaps, inplace = True):
# Calls swaps.setter which does type-checking
self.swaps = swaps
self.inplace = bool(inplace)
@property
def swaps(self):
return self._swaps
@swaps.setter
def swaps(self, swaps):
commands = []
# Compile regex object to find quoted strings
finder = re.compile(r"'\w+'")
for i in range(len(swaps)):
s = swaps[i].replace(" ", "").split(",")
if len(s) != 2:
raise ValueError(textwrap.fill((
f"The input `swaps[{i}] = {swaps[i]}` does not have a "
'single comma. It must be formatted as "col1, col2".'
)))
# Replace single-quoted column numbers / names
for j in range(2):
if "'" in s[j]:
s[j] = finder.sub(Swap._replace_quoted, s[j])
else:
s[j] = Swap._replace_term(s[j])
commands.append(
f"aux = {s[0]}.copy() ; {s[0]} = {s[1]} ; {s[1]} = aux;\n"
)
self._swaps = tuple(commands)
@staticmethod
def _replace_term(term: str):
return f"data[:, sample.columns.index('{term}')]"
@staticmethod
def _replace_quoted(term):
# Remove single quotes
if isinstance(term, re.Match):
term = term.group()
term = term.split("'")[1]
try:
index = int(term)
return f"data[:, {index}]"
except ValueError:
return f"data[:, sample.columns.index('{term}')]"
[docs] def fit_sample(self, sample: IterableSamples):
if not isinstance(sample, IterableSamples):
raise TypeError(textwrap.fill((
"The input `sample` must be a subclass of `IterableSamples` "
f"(e.g. PointData, LineData). Received type=`{type(sample)}`."
)))
if not self.inplace:
sample = sample.copy()
data = sample.data
for s in self.swaps:
exec(s, locals(), globals())
return sample
[docs]class OptimizeWindow(Reducer):
'''Automatically determine optimum adaptive time window to have an ideal
number of elements per sample.
Reducer signature:
::
LineData -> OptimizeWindow.fit -> LineData
list[LineData] -> OptimizeWindow.fit -> LineData
PointData -> OptimizeWindow.fit -> PointData
list[PointData] -> OptimizeWindow.fit -> PointData
numpy.ndarray -> OptimizeWindow.fit -> PointData
The adaptive time window approach combines the advantages of fixed sample
sizes and time windowing:
- Time windows are robust to tracers moving in and out of the field of
view, as they simply ignore the time slices where almost no LoRs are
recorded.
- Fixed sample sizes effectively adapt their spatio-temporal resolution,
allowing for higher accuracy when tracers are passing through more
active scanner regions.
All samples with more than `ideal_elems` are shortened, such that time
windows are shrinked when the tracer activity permits. There exists an
ideal time window such that most samples will have roughly `ideal_elems`,
with a few higher activity ones that are shortened; ``OptimizeWindow``
finds this ideal time window for ``pept.AdaptiveWindow``.
*New in pept-0.5.1*
Examples
--------
Find an adaptive time window that is ideal for about 200 LoRs per sample:
>>> import pept
>>> import pept.tracking as pt
>>> lors = pept.LineData(...)
>>> lors = pt.OptimizeWindow(ideal_elems = 200).fit(lors)
`OptimizeWindow` can be used at the start of a pipeline; an optional
`overlap` parameter can be used to define an overlap as a ratio to the
ideal time window found. For example, if the ideal time window found is
100 ms, an overlap of 0.5 will result in an overlapping time interval of
50 ms:
>>> pipeline = pept.Pipeline([
>>> pt.OptimizeWindow(200, overlap = 0.5),
>>> pt.BirminghamMethod(0.5),
>>> pt.Stack(),
>>> ])
'''
[docs] def __init__(self, ideal_elems, overlap = 0., low = 0.3, high = 3):
self.ideal_elems = int(ideal_elems)
overlap = float(overlap)
if overlap >= 1 or overlap < 0:
raise ValueError((
"\n[ERROR]: If `overlap` is defined, it must be the ratio "
"relative to the ideal time window that will be found, "
f"and hence between [0, 1). Received {overlap}."
))
self.overlap = overlap
self.low = float(low)
self.high = float(high)
[docs] def fit(self, data):
# Stack all data into LineData or PointData (default)
data = Stack().fit(data)
if not isinstance(data, PEPTObject):
data = PointData(data)
self.data = data
# Compute bounds
times = data.data[:, 0]
dt = times[1:] - times[:-1]
estimate = self.ideal_elems * np.nanmedian(dt)
# In extreme cases, median(dt) is 0
if estimate == 0.:
for q in [0.8, 0.9, 0.95, 0.99, 0.999]:
estimate = self.ideal_elems * np.nanquantile(dt, q)
if estimate != 0.:
break
else:
raise ValueError("Ideal time window could not be estimated.")
# Find time window that yields median LoR counts per sample as close to
# the `ideal_elems`
res = minimize_scalar(
self.evaluate,
bracket = [0.5 * estimate, estimate],
)
# Set sample size and overlap (if requested) to ideal values found
data.sample_size = AdaptiveWindow(res.x, self.ideal_elems)
data.overlap = AdaptiveWindow(res.x * self.overlap)
return data
[docs] def evaluate(self, window):
# Window cannot be negative; return maximum error
if window < 0:
return np.inf
# Set adaptive window to compute samples indices
self.data.sample_size = AdaptiveWindow(window)
# Compute number of counts per sample
si = self.data.samples_indices
counts = np.array(si[:, 1] - si[:, 0], dtype = float)
# Ignore samples where the tracer is outside the system (very low
# counts) or is extremely active / static
counts[counts < self.low * self.ideal_elems] = np.nan
counts[counts > self.high * self.ideal_elems] = np.nan
return (np.nanmedian(counts) - self.ideal_elems) ** 2
[docs]class Debug(Reducer):
'''Print types and statistics about the objects being processed in a
``pept.Pipeline``.
Reducer signature:
::
PointData -> Debug.fit -> PointData
LineData -> Debug.fit -> LineData
list[PointData] -> Debug.fit -> list[PointData]
list[LineData] -> Debug.fit -> list[LineData]
np.ndarray -> Debug.fit -> np.ndarray
Any -> Debug.fit -> Any
This is a reducer, so it will collect all samples processed up to the
point of use, print them, and return them unchanged.
*New in pept-0.5.1*
Examples
--------
A ``Debug`` is normally added in a ``Pipeline``:
>>> import pept
>>> import pept.tracking as pt
>>>
>>> pept.Pipeline([
>>> # First pass of clustering
>>> pt.Cutpoints(max_distance = 0.2),
>>> pt.HDBSCAN(true_fraction = 0.15),
>>> pt.SplitLabels() + pt.Centroids(cluster_size = True, error = True),
>>>
>>> pt.Debug(),
>>> pt.Stack(),
>>> ])
'''
[docs] def __init__(self, verbose = 5, max_samples = 10):
self.verbose = int(verbose)
self.max_samples = int(max_samples)
def _print_stats_pla(self, samples):
# Printing statistics for PointData, LineData, np.ndarray
if isinstance(samples, np.ndarray):
data = samples
columns = None
else:
data = samples.data
columns = samples.columns
if self.verbose >= 1:
print(samples)
# Create summary statistics using pandas; add new summary columns
if self.verbose >= 2:
df = pd.DataFrame(data, columns = columns)
desc = df.describe(include = "all")
desc.loc['dtype'] = df.dtypes
desc.loc['NaN'] = df.isnull().sum()
print(desc)
[docs] def fit(self, samples):
# Lines for printing
over = "=" * 80
under = "-" * 80
print("\n" + over + "\nDebug Start\n" + under)
# Special-cased types
pla = (PointData, LineData, np.ndarray)
if isinstance(samples, list):
if len(samples) == 0:
print("No samples given, received empty list!")
else:
# Print unique types in list
unique_types = set(type(s) for s in samples)
print(f"Processing {len(samples)} samples of:")
for ut in unique_types:
print(" ", ut)
# If list contains PointData, LineData or simple NumPy arrays,
# stack them and print statistics via pandas
if len(unique_types) == 1 and isinstance(samples[0], pla):
print("Stacking data to print statistics...\n")
# Reduce / stack list of samples onto a single object
stacked = Stack().fit(samples)
self._print_stats_pla(stacked)
# Unknown types in list
else:
print(f"Printing the first {self.max_samples} samples")
for i, s in samples:
print(f"\nSample {i}:\n{under}")
print(s)
elif isinstance(samples, pla):
print(f"Processing a single {type(samples)}:")
self._print_stats_pla(samples)
else:
print(f"Processing {type(samples)}:")
print(samples)
print("\n" + under + "\nDebug End\n" + over)
return samples