Source code for pept.tracking.transformers

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : transformers.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date   : 08.08.2021


import  re
import  sys
import  warnings
from    typing          import  Union
from    numbers         import  Number

if sys.version_info.minor >= 9:
    # Python 3.9
    from collections.abc import  Iterable
else:
    from typing         import  Iterable

import  textwrap

import  numpy           as      np

from    beartype        import  beartype

from    pept.base       import  PointData, LineData
from    pept.base       import  Filter, Reducer, IterableSamples


PointsOrLines = Union[PointData, LineData]




[docs]class Stack(Reducer): '''Stack iterables - for example a ``list[pept.LineData]`` into a single ``pept.LineData``, a ``list[list]`` into a flattened ``list``. Reducer signature: :: list[LineData] -> Stack.fit -> LineData list[PointData] -> Stack.fit -> PointData list[list[Any]] -> Stack.fit -> list[Any] list[numpy.ndarray] -> Stack.fit -> numpy.ndarray other -> Stack.fit -> other Can optionally set a given `sample_size` and `overlap`. This is useful when collecting a list of processed samples back into a single object. '''
[docs] def __init__(self, sample_size = None, overlap = None): self.sample_size = sample_size self.overlap = overlap
[docs] @beartype def fit(self, samples: Iterable): # If it's a LineData / PointData, the `samples` are already stacked. # Simply set the sample_size and overlap if required and return them if isinstance(samples, IterableSamples): if self.sample_size is not None: samples.sample_size = self.sample_size if self.sample_size is not None: samples.overlap = self.overlap return samples # If it's an empty iterator, we don't have anything to stack if len(samples) == 0: return samples # Stack Lines into LineData elif isinstance(samples[0], LineData): samples = LineData(samples) # Stack Points into PointData elif isinstance(samples[0], PointData): samples = PointData(samples) # Flatten list of lists elif isinstance(samples[0], list): samples = [item for sublist in samples for item in sublist] # Vertically stack list of NumPy arrays elif isinstance(samples[0], np.ndarray): samples = np.vstack(samples) # Set new sample_size and overlap if required if self.sample_size is not None: samples.sample_size = self.sample_size if self.sample_size is not None: samples.overlap = self.overlap return samples
[docs]class SplitLabels(Filter): '''Split a sample of data into unique ``label`` values, optionally removing noise and extracting `_lines` attributes. Filter signature: :: # `extract_lines` = False (default) LineData -> SplitLabels.fit_sample -> list[LineData] PointData -> SplitLabels.fit_sample -> list[PointData] # `extract_lines` = True and PointData.lines exists PointData -> SplitLabels.fit_sample -> list[LineData] The sample of data must have a column named exactly "label". The filter normally removes the "label" column in the output (if ``remove_label = True``). '''
[docs] def __init__( self, remove_labels = True, extract_lines = False, noise = False, ): self.remove_labels = bool(remove_labels) self.extract_lines = bool(extract_lines) self.noise = bool(noise)
def _get_cluster(self, sample, labels_mask, lines_cols = None): # Extract the labels column cluster_data = sample.data[labels_mask] if lines_cols is not None: line_indices = np.unique(cluster_data[:, lines_cols]) lines = sample.attrs["_lines"].lines cluster_lines = lines[line_indices.astype(int)] if self.extract_lines: return sample.attrs["_lines"].copy(data = cluster_lines) cluster = sample.copy(data = cluster_data) if lines_cols is not None: cluster.attrs["_lines"] = sample.attrs["_lines"].copy( data = cluster_lines, ) return cluster def _empty_cluster(self, sample, lines_cols = None): if self.extract_lines: # Return empty LineData return sample.attrs["_lines"].copy( data = sample.attrs["_lines"][0:0], ) cluster = sample.copy(data = sample[0:0]) if lines_cols is not None: cluster.attrs["_lines"] = sample.attrs["_lines"].copy( data = sample.attrs["_lines"][0:0], ) return cluster
[docs] @beartype def fit_sample(self, sample: IterableSamples): # Extract the labels column col_idx = sample.columns.index("label") labels = sample.data[:, col_idx] # Check if there is a `._lines` attribute with `line_index` columns lines_cols = None if "_lines" in sample.attrs: lines_cols = [ i for i, c in enumerate(sample.columns) if c.startswith("line_index") ] if len(lines_cols) == 0: warnings.warn(( "A `_lines` attribute was found, but no lines can " "be extracted without columns `line_index<N>`." ), RuntimeWarning) lines_cols = None self.extract_lines = False elif self.extract_lines: raise ValueError(textwrap.fill(( "If `extract_lines` is True, then the input `sample` must " "contain a `_lines` attribute." ))) # If noise is requested, also include the noise cluster if self.noise: labels_unique = np.unique(labels) else: labels_unique = np.unique(labels[labels != -1]) # For each unique label, create a new PointData / LineData cluster that # maintains / propagates all attributes (which needs a copy) clusters = [ self._get_cluster(sample, labels == label, lines_cols) for label in labels_unique ] # If no valid cluster was found, return at least a single empty cluster if not len(clusters): clusters.append(self._empty_cluster(sample, lines_cols)) # Remove the "label" column if needed if self.remove_labels and not self.extract_lines: for i in range(len(clusters)): clusters[i] = clusters[i].copy( data = np.delete(clusters[i].data, col_idx, axis = 1), columns = (clusters[i].columns[:col_idx] + clusters[i].columns[col_idx + 1:]), ) return clusters
[docs]class Centroids(Filter): '''Compute the geometric centroids of a list of samples of points. Filter signature: :: PointData -> Centroids.fit_sample -> PointData list[PointData] -> Centroids.fit_sample -> PointData numpy.ndarray -> Centroids.fit_sample -> PointData This filter can be used right after ``pept.tracking.SplitLabels``, e.g.: >>> (SplitLabels() + Centroids()).fit(points) '''
[docs] def __init__(self, error = False, cluster_size = False): self.error = bool(error) self.cluster_size = bool(cluster_size)
def _empty_centroid(self, points): # Return an empty centroid with the correct number of columns ncols = points.shape[1] if self.error: ncols += 1 if self.cluster_size: ncols += 1 return np.empty((0, ncols)) def _centroid(self, points): if len(points) == 0: return self._empty_centroid(points) c = points.mean(axis = 0) # If error is requested, compute std-dev of distances from centroid if self.error: err = np.linalg.norm(points - c, axis = 1).std() c = np.r_[c, err] # If cluster_size is requested, also append the number of points if self.cluster_size: c = np.r_[c, len(points)] return c
[docs] def fit_sample(self, points): # Type-checking inputs if isinstance(points, PointData): list_points = [points] elif isinstance(points, np.ndarray): list_points = [PointData(points)] else: list_points = list(points) # Compute centroid for each PointData and stack centroid arrays centroids = np.vstack([self._centroid(p.points) for p in list_points]) attributes = list_points[0].extra_attrs() # If error or cluster_size are requested, append those columns columns = list_points[0].columns if self.error: columns.append("error") if self.cluster_size: columns.append("cluster_size") return PointData(centroids, columns = columns, **attributes)
[docs]class LinesCentroids(Filter): '''Compute the minimum distance point of some ``pept.LineData`` while iteratively removing a fraction of the furthest lines. Filter signature: :: list[LineData] -> LinesCentroids.fit_sample -> PointData LineData -> LinesCentroids.fit_sample -> PointData numpy.ndarray -> LinesCentroids.fit_sample -> PointData The code below is adapted from the PEPT-EM algorithm developed by Antoine Renaud and Sam Manger '''
[docs] def __init__(self, remove = 0.1, iterations = 6): self.remove = float(remove) self.iterations = int(iterations)
[docs] @staticmethod def centroid(lors): nx = np.newaxis m = np.identity(3)[nx, :, :] - lors[:, nx, 4:7] * lors[:, 4:7, nx] n = np.sum(m, axis = 0) v = np.sum(np.sum(m * lors[:, nx, 1:4], axis=-1), axis=0) return np.matmul(np.linalg.inv(n), v)
[docs] @staticmethod def distance_matrix(x, lors): y = x[np.newaxis, :3] - lors[:, 1:4] return np.sum(y**2, axis=-1) - np.sum(y * lors[:, 4:7], axis=-1)**2
[docs] def predict(self, lines): # Rewrite LoRs in the vectorial form y(x) = position + x * direction lors = lines[:, :7].copy(order = "C") lors[:, 4:7] = lors[:, 4:7] - lors[:, 1:4] lors[:, 4:7] /= np.linalg.norm(lors[:, 3:], axis = -1)[:, np.newaxis] # Begin with equal weights for all LoRs weights = np.ones(len(lors)) x = LinesCentroids.centroid(lors) # Iteratively remove the furthest LoRs and recompute centroid for i in range(self.iterations): d2 = LinesCentroids.distance_matrix(x, lors) k = int(len(d2) * (1 - self.remove * (i + 1))) part = np.argpartition(d2, k) weights[part[k:]] = 0 x = LinesCentroids.centroid(lors) # Add timestamp as the mean LoRs' time return np.hstack((lors[:, 0].mean(), x))
[docs] def fit_sample(self, lines): # Type-checking inputs if isinstance(lines, LineData): list_lines = [lines] elif isinstance(lines, np.ndarray): list_lines = [LineData(lines)] else: list_lines = list(lines) centroids = [self.predict(lines.lines) for lines in list_lines] return PointData(np.vstack(centroids), **list_lines[0].extra_attrs())
[docs]class Condition(Filter): '''Select only data satisfying multiple conditions, given as a string, a function or list thereof; e.g. ``Condition("error < 15")`` selects all points whose "error" column value is smaller than 15. Filter signature: :: PointData -> Condition.fit_sample -> PointData LineData -> Condition.fit_sample -> LineData In the simplest case, a column name is specified, plus a comparison, e.g. ``Condition("error < 15, y > 100")``; multiple conditions may be concatenated using a comma. More complex conditions - where the column name is not the first operand - can be constructed using single quotes, e.g. using NumPy functions in ``Condition("np.isfinite('x')")`` to filter out NaNs and Infs. Quotes can be used to index columns too: ``Condition("'0' < 150")`` selects all rows whose first column is smaller than 150. Generally, you can use any function returning a boolean mask, either as a string of code ``Condition("np.isclose('x', 3)")`` or a user-defined function receiving a NumPy array ``Condition(lambda x: x[:, 0] < 10)``. Finally, multiple such conditions may be supplied separately: ``Condition(lambda x: x[:, -1] > 10, "'t' < 50")``. '''
[docs] def __init__(self, *conditions): # Calls the conditions setter which does parsing self.conditions = conditions
@property def conditions(self): return self._conditions @conditions.setter def conditions(self, conditions): if isinstance(conditions, str): self._conditions = Condition._parse_condition(conditions) elif callable(conditions): self._conditions = [conditions] else: cs = [] for cond in conditions: cs.extend(Condition._parse_condition(cond)) self._conditions = cs @staticmethod def _parse_condition(cond): if callable(cond): return [cond] conditions = str(cond).replace(" ", "").split(",") # Compile regex object to find quoted strings finder = re.compile(r"'\w+'") for i in range(len(conditions)): # Replace single-quoted column numbers / names if "'" in conditions[i]: conditions[i] = finder.sub( Condition._replace_quoted, conditions[i], ) continue # If condition is a simple comparison, allow using non-quoted # column names op = None if "<" in conditions[i]: op = "<" elif ">" in conditions[i]: op = ">" elif "!" in conditions[i]: op = "!" elif "==" in conditions[i]: op = "==" if op is not None: cs = conditions[i].split(op) cs[0] = Condition._replace_term(cs[0]) conditions[i] = op.join(cs) else: raise ValueError(textwrap.fill(( f"The input `conditions[i] = {conditions[i]}` did not " "contain an operator or single-quoted terms." ))) return conditions @staticmethod def _replace_term(term: str): return f"data[:, sample.columns.index('{term}')]" @staticmethod def _replace_quoted(term): # Remove single quotes if isinstance(term, re.Match): term = term.group() term = term.split("'")[1] try: index = int(term) return f"data[:, {index}]" except ValueError: return f"data[:, sample.columns.index('{term}')]"
[docs] @beartype def fit_sample(self, sample: IterableSamples): data = sample.data for cond in self.conditions: if callable(cond): data = data[cond(data)] else: data = data[eval(cond, globals(), locals())] return sample.copy(data = data)
[docs]class Remove(Filter): '''Remove columns (either column names or indices) from `pept.LineData` or `pept.PointData`. Filter signature: :: pept.LineData -> Remove.fit_sample -> pept.LineData pept.PointData -> Remove.fit_sample -> pept.PointData Examples -------- To remove a single column named "line_index": >>> import pept >>> from pept.tracking import * >>> points = pept.PointData(...) # Some dummy data >>> rem = Remove("line_index") >>> points_without = rem.fit_sample(points) Remove all columns starting with "line_index" using a glob operator (*): >>> points_without = Remove("line_index*").fit_sample(points) Remove the first column based on its index: >>> points_without = Remove(0).fit_sample(points) Finally, multiple removals may be chained into a list: >>> points_without = Remove(["line_index*", -1]).fit_sample(points) '''
[docs] def __init__(self, *columns): self._indices = [] self._filters = [] # Calls the `columns` setter which does parsing self.columns = columns
@property def columns(self): return self._columns @columns.setter def columns(self, columns): self._columns = [Remove._parse(col) for col in columns] # Split the removers into regex strings and column indices for c in self._columns: if isinstance(c, str): self._filters.append(c) else: self._indices.append(c) @staticmethod def _parse(col): if isinstance(col, str): return col.replace("*", r"\w*") elif isinstance(col, Number): return int(col) else: raise ValueError(textwrap.fill(( "Each input argument in `columns` must be a string or an " f"integer. One of them was `type(col) = {type(col)}`." )))
[docs] @beartype def fit_sample(self, sample: IterableSamples): # Extract the relevant `sample` attributes columns = sample.columns ncols = len(columns) # The regex filters to use and column numbers to remove filters = self._filters indices = self._indices # Column indices to remove and remaining column names removed = set() columns_filtered = [] for i, c in enumerate(columns): # Also handle negative indices if any((re.fullmatch(r, c) for r in filters)) or \ any((i == ind or i == ind + ncols for ind in indices)): removed.add(i) else: columns_filtered.append(c) indices_filtered = [i for i in range(len(columns)) if i not in removed] data = sample.data[:, indices_filtered] return sample.copy(data = data, columns = columns_filtered)
[docs]class SplitAll(Reducer): '''Stack all samples and split them into a list according to a named / numeric column index. Reducer signature: :: LineData -> SplitAll.fit -> list[LineData] list[LineData] -> SplitAll.fit -> list[LineData] PointData -> SplitAll.fit -> list[PointData] list[PointData] -> SplitAll.fit -> list[PointData] numpy.ndarray -> SplitAll.fit -> list[numpy.ndarray] list[numpy.ndarray] -> SplitAll.fit -> list[numpy.ndarray] If using a LineData / PointData, you can use a columns name as a string, e.g. ``SplitAll("label")`` or a number ``SplitAll(4)``. If using a NumPy array, only numeric indices are accepted. '''
[docs] def __init__(self, column): try: self.column_index = int(column) self.column_name = None except ValueError: self.column_name = str(column) self.column_index = None
[docs] @beartype def fit(self, samples: Iterable): # Reduce / stack list of samples onto a single IterableSamples / array samples = Stack().fit(samples) if isinstance(samples, np.ndarray): return self._split_numpy(samples) elif isinstance(samples, IterableSamples): return self._split_iterable_samples(samples) else: raise TypeError(textwrap.fill(( "The input samples must be NumPy arrays, PointData / LineData " f"or lists thereof. Received `type(samples) = {type(samples)}`" )))
def _split_numpy(self, samples): if self.column_index is None: raise TypeError(textwrap.fill(( "If the samples are NumPy arrays, you must use a numeric " f"column index; used a named column: `{self.column_name}`." ))) col_data = samples[:, self.column_index] labels = np.unique(col_data) # If no labels exist, return a list with an empty sample if not len(labels): return [samples[0:0]] return [samples[col_data == label] for label in labels] def _split_iterable_samples(self, samples): if self.column_index is not None: col_data = samples.data[:, self.column_index] else: col_data = samples.data[:, samples.columns.index(self.column_name)] labels = np.unique(col_data) # If no labels exist, return a list with an empty sample if not len(labels): return [samples[0:0]] return [ samples.copy(data = samples.data[col_data == label]) for label in labels ]