Source code for pept.tracking.peptml.peptml

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


#    pept is a Python library that unifies Positron Emission Particle
#    Tracking (PEPT) research, including tracking, simulation, data analysis
#    and visualisation tools.
#
#    If you used this codebase or any software making use of it in a scientific
#    publication, you must cite the following paper:
#        Nicuşan AL, Windows-Yule CR. Positron emission particle tracking
#        using machine learning. Review of Scientific Instruments.
#        2020 Jan 1;91(1):013329.
#        https://doi.org/10.1063/1.5129251
#
#    Copyright (C) 2019-2021 the pept developers
#
#    This program is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    This program is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with this program.  If not, see <https://www.gnu.org/licenses/>.


# File   : peptml.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date   : 28.08.2019


import copy
import os
import textwrap
import time
import warnings

import  numpy               as      np

from    joblib              import  Parallel, delayed
from    tqdm                import  tqdm

import  cma
import  hdbscan

import  pept




class HDBSCANClusterer:
    '''Efficient, optionally-parallel HDBSCAN-based clustering for cutpoints
    computed from LoRs (or generic 3D points).

    Given a sample of (cut)points, they are clustered using the HDBSCAN [1]_
    algorithm; the tracers' locations are computed as the "centre" of each
    cluster, corresponding to the average of all the points in the cluster
    (this is the original implementation [2]_). Optionally, the cluster
    exemplars - that is, the most stable points in the cluster - can be used
    for the average (if `select_exemplars` == True).

    This algorithm requires no prior knowledge of the number of tracers within
    the system; it successfully distinguished multiple particles separated by
    distances as small as 2 mm. The technique’s spatial resolution was observed
    to be invariant with the number of tracers used, allowing large numbers of
    particles to be tracked simultaneously, with no loss of data quality [2]_.

    This class is a wrapper around the `hdbscan` package, providing tools for
    parallel clustering of samples of cutpoints. It can return `PointData`
    classes for ease of manipulation and visualisation.

    Two main methods are provided: `fit_sample` for clustering a single numpy
    array of cutpoints (i.e. a single sample) and `fit` which clusters all the
    samples encapsulated in a `pept.PointData` class (such as the one returned
    by the `Cutpoints` class) *in parallel*.

    Optionally, this class can find the optimum clustering parameters for a
    given dataset using the `optimise` method.

    Attributes
    ----------
    clusterer : hdbscan.HDBSCAN object
        The HDBSCAN object that will be used to cluster points. The settings
        below apply to it. If `allow_single_cluster` is set to the default
        "auto", a second clusterer object (`clusterer_single`) will be used for
        samples in which no cluster was found. This is useful for experiments
        in which tracers leave and enter the field of view, or coalesce. The
        `optimise` method can find the optimal settings for a given dataset.

    clusterer_single : hdbscan.HDBSCAN object or None
        The second HDBSCAN object, defined only if `allow_single_cluster` is
        "auto". It can have different settings than `clusterer`, which is
        useful for long experiments in which the data looks very different
        when a single tracer remains in the field of view (e.g. if two tracers
        coalesce, the resulting "single tracer" will be much more active). The
        `optimise` method can find the optimal settings for a given dataset,
        independently from `clusterer`.

    min_cluster_size : int
        (Taken from hdbscan's documentation): The minimum size of clusters;
        single linkage splits that contain fewer points than this will be
        considered points “falling out” of a cluster rather than a cluster
        splitting into two new clusters.

    min_samples : int
        (Taken from hdbscan's documentation): The number of samples in a
        neighbourhood for a point to be considered a core point. The default is
        `None`, being set automatically to the `min_cluster_size`.

    min_cluster_size_single : int or None
        The `min_cluster_size` value for `clusterer_single`, defined only if
        `allow_single_cluster` is "auto".

    min_samples_single : int
        The `min_samples` value for `clusterer_single`, defined only if
        `allow_single_cluster` is "auto".

    allow_single_cluster : bool or "auto"
        By default HDBSCAN will not produce a single cluster - this creates
        "tighter" clusters (i.e. more accurate positions). Setting this to
        `False` will discard datasets with a single tracer, but will produce
        more accurate positions. Setting this to `True` will also work for
        single tracers, at the expense of lower accuracy for cases with more
        tracers. This class provides a third option, "auto", in which case two
        clusterers will be used: one with `allow_single_cluster` set to `False`
        and another with it set to `True`; the latter will only be used if the
        first did not find any clusters.

    select_exemplars : bool, default False
        If `False`, the tracer position is computed as the average of *all* the
        points in a cluster. If `True`, the tracer position is computed as the
        average of the *cluster exemplars*. This can yield tighter clusters in
        the majority of cases, but miss some tracers if the points are very
        sparse.

    labels : (N,) numpy.ndarray, dtype = int
        A 1D array of the cluster labels for cutpoints fitted using
        `fit_sample` or `fit`. If `fit_sample` is used, `labels` correspond to
        each row in the sample array fitted. If `fit` is used with the setting
        `get_labels = True`, `labels` correpond to the stacked labels for every
        sample in the given `pept.PointData` class.

    Methods
    -------
    fit_sample(sample, get_labels = False, as_array = True, verbose = False,\
               _set_labels = True)
        Fit one sample of cutpoints and return the cluster centres and
        (optionally) the labelled cutpoints.

    fit(cutpoints, get_labels = False, max_workers = None, verbose = True)
        Fit cutpoints (an instance of `PointData`) and return the cluster
        centres and (optionally) the labelled cutpoints.

    optimise(points, nsamples = 16, max_workers = None, verbose = True,\
             _stability_params = None)
        Optimise HDBSCANClusterer settings against a given dataset of `points`.

    Examples
    --------
    A typical workflow would involve reading LoRs from a file, computing their
    cutpoints, clustering them and plotting them.

    >>> import pept
    >>> from pept.tracking import peptml
    >>>
    >>> lors = pept.LineData(...)
    >>> cutpoints = peptml.Cutpoints(lors, 0.15)
    >>>
    >>> clusterer = peptml.HDBSCANClusterer()
    >>> clusterer.optimise(cutpoints)           # optional
    >>> centres = clusterer.fit(cutpoints)
    >>>
    >>> grapher = PlotlyGrapher()
    >>> grapher.add_points(centres)
    >>> grapher.show()

    For more advanced uses of HDBSCANClusterer such as 2-pass clustering, do
    check out the tutorials available on the Birmingham's Positron Imaging
    Centre's GitHub repository at github.com/uob-positron-imaging-centre.

    References
    ----------
    .. [1] McInnes L, Healy J. Accelerated Hierarchical Density Based
       Clustering In: 2017 IEEE International Conference on Data Mining
       Workshops (ICDMW), IEEE, pp 33-42. 2017
    .. [2] Nicuşan AL, Windows-Yule CR. Positron emission particle tracking
       using machine learning. Review of Scientific Instruments. 2020 Jan 1;
       91(1):013329.

    See Also
    --------
    pept.tracking.peptml.Cutpoints : Compute cutpoints from `pept.LineData`.
    pept.LineData : Encapsulate LoRs for ease of iteration and plotting.
    pept.PointData : Encapsulate points for ease of iteration and plotting.
    pept.utilities.read_csv : Fast CSV file reading into numpy arrays.
    PlotlyGrapher : Easy, publication-ready plotting of PEPT-oriented data.
    '''

    def __init__(
        self,
        min_cluster_size = 20,
        min_samples = None,
        min_cluster_size_single = None,
        min_samples_single = None,
        allow_single_cluster = "auto",
        select_exemplars = False,
        core_dist_n_jobs = 1,
        **kwargs
    ):
        '''`HDBSCANClusterer` class constructor.

        Parameters
        ----------
        min_cluster_size : int, default 20
            (Taken from hdbscan's documentation): The minimum size of clusters;
            single linkage splits that contain fewer points than this will be
            considered points “falling out” of a cluster rather than a cluster
            splitting into two new clusters.

        min_samples : int, optional
            (Taken from hdbscan's documentation): The number of samples in a
            neighbourhood for a point to be considered a core point. The
            default is `None`, being set automatically to the
            `min_cluster_size`.

        min_cluster_size_single : int, optional
            The `min_cluster_size` value for `clusterer_single`, defined only
            if `allow_single_cluster` is "auto". The default is `None`, being
            set automatically to the `min_cluster_size`.

        min_samples_single : int, optional
            The `min_samples` value for `clusterer_single`, defined only if
            `allow_single_cluster` is "auto". The default is `None`, being set
            automatically to the `min_cluster_size`.

        allow_single_cluster : bool or str, default "auto"
            By default HDBSCAN will not produce a single cluster - this creates
            "tighter" clusters (i.e. more accurate positions). Setting this to
            `False` will discard datasets with a single tracer, but will
            produce more accurate positions. Setting this to `True` will also
            work for single tracers, at the expense of lower accuracy for cases
            with more tracers. This class provides a third option, "auto", in
            which case two clusterers will be used: one with
            `allow_single_cluster` set to `False` and another with it set to
            `True`; the latter will only be used if the first did not find any
            clusters.

        select_exemplars : bool, default False
            If `False`, the tracer position is computed as the average of *all*
            the points in a cluster. If `True`, the tracer position is computed
            as the average of the *cluster exemplars*. This can yield tighter
            clusters in the majority of cases, but miss some tracers if the
            points are very sparse.

        core_dist_n_jobs : int or None, default 1
            (Taken from hdbscan's documentation): Number of parallel jobs to
            run in core distance computations (if supported by the specific
            algorithm). If unset (`None`), the value returned by
            `os.cpu_count()` is used. This should be left to the default 1, as
            parallelism is more advantageous between samples (i.e. when calling
            `fit`) rather than for each clustering algorithm run.

        kwargs : keyword arguments
            Other keyword arguments that will be passed to the HDBSCAN
            instantiation.

        Raises
        ------
        ValueError
            If `allow_single_cluster` is not `True`, `False` or "auto".
        '''

        # Type-check min_cluster_size and min_samples
        min_cluster_size = int(round(min_cluster_size))

        if min_samples is None:
            min_samples = min_cluster_size
        else:
            min_samples = int(round(min_samples))

        if min_cluster_size_single is None:
            min_cluster_size_single = min_cluster_size
        else:
            min_cluster_size_single = int(round(min_cluster_size_single))

        if min_samples_single is None:
            min_samples_single = min_samples
        else:
            min_samples_single = int(round(min_samples_single))

        if core_dist_n_jobs is None:
            core_dist_n_jobs = os.cpu_count()

        self._select_exemplars = bool(select_exemplars)
        self._labels = None

        # Will manually supply allow_single_cluster. Only need prediction_data
        # if select_exemplars is `True`.
        clusterer_options = {
            "min_cluster_size": min_cluster_size,
            "min_samples": min_samples,
            "core_dist_n_jobs": core_dist_n_jobs,
            "gen_min_span_tree": True,
            "prediction_data": self._select_exemplars,
            "cluster_selection_method": "leaf",         # Quite harsh
            **kwargs
        }

        clusterer_single_options = {
            "min_cluster_size": min_cluster_size_single,
            "min_samples": min_samples_single,
            "core_dist_n_jobs": core_dist_n_jobs,
            "gen_min_span_tree": True,
            "prediction_data": self._select_exemplars,
            "cluster_selection_method": "eom",          # More lenient
            **kwargs
        }

        # Create inner clusterers (including clusterer_single if
        # allow_single_cluster == "auto")
        if allow_single_cluster is True:
            self._allow_single_cluster = allow_single_cluster
            self.clusterer = hdbscan.HDBSCAN(
                allow_single_cluster = True,
                **clusterer_options
            )

            self.clusterer_single = None

        elif allow_single_cluster is False:
            self._allow_single_cluster = allow_single_cluster
            self.clusterer = hdbscan.HDBSCAN(
                allow_single_cluster = False,
                **clusterer_options
            )

            self.clusterer_single = None

        elif str(allow_single_cluster).lower() == "auto":
            self._allow_single_cluster = "auto"
            self.clusterer = hdbscan.HDBSCAN(
                allow_single_cluster = False,
                **clusterer_options
            )

            self.clusterer_single = hdbscan.HDBSCAN(
                allow_single_cluster = True,
                **clusterer_single_options
            )

        else:
            raise ValueError((
                "\n[ERROR]: `allow_single_cluster` should be either `True`, "
                f"`False` or 'auto'. Received {allow_single_cluster}.\n"
            ))


    @property
    def min_cluster_size(self):
        return self.clusterer.min_cluster_size


    @min_cluster_size.setter
    def min_cluster_size(self, new_min_cluster_size):
        if new_min_cluster_size < 2:
            warnings.warn((
                "\n[WARNING]: new_min_cluster_size was set to 2, as it was "
                f"{new_min_cluster_size} < 2.\n"
            ), RuntimeWarning)
            new_min_cluster_size = 2
        else:
            new_min_cluster_size = int(round(new_min_cluster_size))

        self.clusterer.min_cluster_size = new_min_cluster_size


    @property
    def min_samples(self):
        return self.clusterer.min_samples


    @min_samples.setter
    def min_samples(self, new_min_samples):
        if new_min_samples < 2:
            warnings.warn((
                "\n[WARNING]: new_min_samples was set to 2, as it was "
                f"{new_min_samples} < 2.\n"
            ), RuntimeWarning)
            new_min_samples = 2
        else:
            new_min_samples = int(round(new_min_samples))

        self.clusterer.min_samples = new_min_samples


    @property
    def min_cluster_size_single(self):
        if self.clusterer_single is None:
            return None

        return self.clusterer_single.min_cluster_size


    @min_cluster_size_single.setter
    def min_cluster_size_single(self, new_min_cluster_size):
        if new_min_cluster_size < 2:
            warnings.warn((
                "\n[WARNING]: new_min_cluster_size was set to 2, as it was "
                f"{new_min_cluster_size} < 2.\n"
            ), RuntimeWarning)
            new_min_cluster_size = 2
        else:
            new_min_cluster_size = int(round(new_min_cluster_size))

        self.clusterer_single.min_cluster_size = new_min_cluster_size


    @property
    def min_samples_single(self):
        if self.clusterer_single is None:
            return None

        return self.clusterer_single.min_samples


    @min_samples_single.setter
    def min_samples_single(self, new_min_samples):
        if new_min_samples < 2:
            warnings.warn((
                "\n[WARNING]: new_min_samples was set to 2, as it was "
                f"{new_min_samples} < 2.\n"
            ), RuntimeWarning)
            new_min_samples = 2
        else:
            new_min_samples = int(round(new_min_samples))

        self.clusterer_single.min_samples = new_min_samples


    @property
    def allow_single_cluster(self):
        return self._allow_single_cluster


    @allow_single_cluster.setter
    def allow_single_cluster(self, allow_single_cluster):
        if allow_single_cluster is True:
            self._allow_single_cluster = allow_single_cluster
            self.clusterer.allow_single_cluster = True
            self.clusterer_single = None

        elif allow_single_cluster is False:
            self._allow_single_cluster = allow_single_cluster
            self.clusterer.allow_single_cluster = False
            self.clusterer_single = None

        elif str(allow_single_cluster).lower() == "auto":
            self._allow_single_cluster = "auto"
            self.clusterer.allow_single_cluster = False

            # Make a deep copy of the "normal" clusterer, and only change the
            # `allow_single_cluster` option to `True`.
            self.clusterer_single = copy.deepcopy(self.clusterer)
            self.clusterer_single.allow_single_cluster = True

        else:
            raise ValueError((
                "\n[ERROR]: `allow_single_cluster` should be either `True`, "
                f"`False` or 'auto'. Received {allow_single_cluster}.\n"
            ))


    @property
    def select_exemplars(self):
        return self._select_exemplars


    @select_exemplars.setter
    def select_exemplars(self, select_exemplars):
        self._select_exemplars = bool(select_exemplars)

        self.clusterer.prediction_data = self._select_exemplars

        if self.clusterer_single is not None:
            self.clusterer_single.prediction_data = self._select_exemplars


    @property
    def labels(self):
        return self._labels


    def fit_sample(
        self,
        sample,
        get_labels = False,
        as_array = True,
        verbose = False,
        _set_labels = True
    ):
        '''Fit one sample of cutpoints and return the cluster centres and
        (optionally) the labelled cutpoints.

        Parameters
        ----------
        sample : (N, M >= 4) numpy.ndarray
            The sample of points that will be clustered. The expected columns
            are `[time, x, y, z, etc]`. Only columns `[1, 2, 3]` are used for
            clustering.

        get_labels : bool, default False
            If set to True, the input `sample` is also returned with an extra
            column representing the label of the cluster that each point is
            associated with. This label is an `int`, numbering clusters
            starting from 0; noise is represented with the value -1.

        as_array : bool, default True
            If set to True, the centres of the clusters and the labelled
            cutpoints are returned as numpy arrays. If set to False, they are
            returned inside instances of `pept.PointData`.

        verbose : bool, default False
            Provide extra information when computing the cutpoints: time the
            operation and show a progress bar.

        _set_labels : bool, default True
            This is an internal setting that an end-user should not normally
            care about. If `True`, the class property `labels` will be set
            after fitting. Setting this to `False` is helpful for multithreaded
            contexts - when calling `fit_sample` in parallel, it makes sure
            no internal attributes are mutated at the same time.

        Returns
        -------
        centres : numpy.ndarray or pept.PointData
            The centroids of every cluster found with columns
            `[time, x, y, z, ..., cluster_size]`. They are computed as the
            column-wise average of the points included in each cluster (i.e.
            for each label) or the cluster exemplars (if the `select_exemplars`
            attribute is `True`). Another column is added to the initial data
            in `sample`, signifying the cluster size - that is, the number of
            points included in the cluster. If `as_array` is set to True, it is
            a numpy array, otherwise the centres are stored in a
            `pept.PointData` instance.

        sample_labelled : optional, numpy.ndarray or pept.PointData
            Returned if `get_labels` is `True`. It is the input `sample` with
            an appended column representing the label of the cluster that the
            point was associated with. The labels are integers starting from 0.
            The points classified as noise have the number -1 associated. If
            `as_array` is set to True, it is a numpy array, otherwise the
            labelled points are stored in a `pept.PointData` instance.

        Raises
        ------
        ValueError
            If `sample` is not a numpy array of shape (N, M), where M >= 4.

        Notes
        -----
        If no clusters were found (i.e. all labels are -1), the returned values
        are empty numpy arrays.
        '''

        if verbose:
            start = time.time()

        # sample columns: [time, x, y, z, ...]
        sample = np.asarray(sample, dtype = float, order = "C")
        if sample.ndim != 2 or sample.shape[1] < 4:
            raise ValueError((
                "\n[ERROR]: `sample` should have two dimensions (M, N), where "
                f"N >= 4. Received {sample.shape}.\n"
            ))

        labels = self.clusterer.fit_predict(sample[:, 1:4])
        max_label = labels.max()

        # If `allow_single_cluster` is "auto", check if no clusters were found
        # and try again using the hdbscan option allow_single_cluster = True.
        if max_label == -1 and self._allow_single_cluster == "auto":
            labels = self.clusterer_single.fit_predict(sample[:, 1:4])
            max_label = labels.max()

            if self._select_exemplars:
                prediction_data = \
                    self.clusterer_single._prediction_data
                selected_clusters = \
                    self.clusterer_single.condensed_tree_._select_clusters()
                raw_condensed_tree = \
                    self.clusterer_single.condensed_tree_._raw_tree

        elif self._select_exemplars:
            prediction_data = \
                self.clusterer._prediction_data
            selected_clusters = \
                self.clusterer.condensed_tree_._select_clusters()
            raw_condensed_tree = \
                self.clusterer.condensed_tree_._raw_tree

        # Select the cluster exemplars' indices
        if self._select_exemplars:
            exemplars = []
            for cluster in selected_clusters:
                cluster_exemplars = []
                for leaf in prediction_data._recurse_leaf_dfs(cluster):
                    leaf_max_lambda = raw_condensed_tree['lambda_val'][
                        raw_condensed_tree['parent'] == leaf
                    ].max()
                    points = raw_condensed_tree['child'][
                        (raw_condensed_tree['parent'] == leaf) &
                        (raw_condensed_tree['lambda_val'] == leaf_max_lambda)
                    ]
                    cluster_exemplars.append(points)
                exemplars.append(np.hstack(cluster_exemplars))

            exemplars = np.hstack(exemplars)

            # Create a boolean mask for rows in `sample` which are exemplars
            indices_exemplars = np.zeros(len(sample), dtype = bool)
            indices_exemplars[exemplars] = True

        if _set_labels:
            self._labels = labels

        # The tracer location (centre of a cluster's exemplars) is the average
        # of the time, x, y, z columns + the number of points in that cluster
        # (i.e. cluster size)
        # centres columns: [time, x, y, z, ..etc.., cluster_size]
        centres = []
        for i in range(0, max_label + 1):
            # Average time, x, y, z of the exemplars in cluster with label i
            indices_cluster = (labels == i)

            if self._select_exemplars:
                centre = np.mean(sample[indices_cluster & indices_exemplars],
                                 axis = 0)
            else:
                centre = np.mean(sample[indices_cluster], axis = 0)

            # Append the number of points of label i => cluster_size
            centre = np.append(centre, indices_cluster.sum())
            centres.append(centre)

        centres = np.array(centres)

        if not as_array and len(centres) != 0:
            centres = pept.PointData(
                centres,
                sample_size = 0,
                overlap = 0,
                verbose = False
            )

        if verbose:
            end = time.time()
            print("Fitting one sample took {} seconds".format(end - start))

        # If labels are requested, also return the initial sample with appended
        # labels. Labels go from 0 to max_label; -1 represents noise.
        if get_labels:
            sample_labelled = np.append(
                sample, labels[:, np.newaxis], axis = 1
            )

            if not as_array and len(sample_labelled) != 0:
                sample_labelled = pept.PointData(
                    sample_labelled,
                    sample_size = 0,
                    overlap = 0,
                    verbose = False
                )
            return centres, sample_labelled

        # Otherwise just return the found centres
        return centres


    def fit(
        self,
        cutpoints,
        get_labels = False,
        max_workers = None,
        verbose = True
    ):
        '''Fit cutpoints (an instance of `PointData`) and return the cluster
        centres and (optionally) the labelled cutpoints.

        This is a convenience function that clusters each sample in an instance
        of `pept.PointData` *in parallel*, using joblib. For more fine-grained
        control over the clustering, the `fit_sample` method can be used for
        each individual sample.

        Parameters
        ----------
        cutpoints : an instance of `pept.PointData`
            The samples of points that will be clustered. Be careful to set the
            appropriate `sample_size` and `overlap` for good results. If the
            `sample_size` is too low, the less radioactive tracers might not be
            found; if it is too high, temporal resolution is decreased. If the
            `overlap` is too small, the tracked points might be very "sparse".
            Note: when transforming LoRs into cutpoints using the `Cutpoints`
            class, the `sample_size` is automatically set based on the average
            number of cutpoints found per sample of LoRs.

        get_labels : bool, default False
            If set to True, the labelled cutpoints are returned along with the
            centres of the clusters. The labelled cutpoints are a list of
            `pept.PointData` for each sample of cutpoints, with an appended
            column representing the cluster labels (starting from 0; noise is
            encoded as -1).

        max_workers : int, optional
            The maximum number of threads that will be used for asynchronously
            clustering the samples in `cutpoints`. If unset (`None`), the
            number of threads available on the machine (as returned by
            `os.cpu_count()`) will be used.

        verbose : bool, default True
            Provide extra information when computing the cutpoints: time the
            operation and show a progress bar.

        Returns
        -------
        centres : pept.PointData
            The centroids of every cluster found with columns
            `[time, x, y, z, ..., cluster_size]`. They are computed as the
            column-wise average of the points included in each cluster (i.e.
            for each label) or the cluster exemplars (if the `select_exemplars`
            attribute is `True`). Another column is added to the initial data
            in `sample`, signifying the cluster size - that is, the number of
            points included in the cluster.

        labelled_cutpoints : optional, pept.PointData
            Returned if `get_labels` is `True`. It is a `pept.PointData`
            instance in which every sample is the corresponding sample in
            `cutpoints`, but with an appended column representing the label of
            the cluster that the point was associated with. The labels are
            integers starting from 0. The points classified as noise have the
            number -1 associated. Note that the labels are only consistent
            within the same sample; that is, for tracers A and B, if in one
            sample A gets the label 0 and B the label 1, in another sample
            their order might be inversed. The `trajectory_separation` module
            might be used to separate them out.

        Raises
        ------
        TypeError
            If `cutpoints` is not an instance (or a subclass) of
            `pept.PointData`.

        Notes
        -----
        If no clusters were found (i.e. all labels are -1), the returned values
        are empty numpy arrays.
        '''

        if verbose:
            start = time.time()

        if not isinstance(cutpoints, pept.PointData):
            raise TypeError((
                "\n[ERROR]: cutpoints should be an instance of "
                "`pept.PointData` (or any class inheriting from it). Received "
                f"{type(cutpoints)}.\n"
            ))

        get_labels = bool(get_labels)

        # Fit all samples in `cutpoints` in parallel using joblib
        # Collect all outputs as a list. If verbose, show progress bar with
        # tqdm
        if verbose:
            cutpoints = tqdm(cutpoints)

        if max_workers is None:
            max_workers = os.cpu_count()

        data_list = Parallel(n_jobs = max_workers)(
            delayed(self.fit_sample)(
                sample,
                get_labels = get_labels,
                as_array = True,
                verbose = False,
                _set_labels = False
            ) for sample in cutpoints
        )

        if not get_labels:
            # data_list is a list of arrays. Only choose the arrays with at
            # least one row.
            centres = [r for r in data_list if len(r) != 0]
        else:
            # data_list is a list of tuples, in which the first element is an
            # array of the centres, and the second element is an array of the
            # labelled cutpoints.
            centres = [r[0] for r in data_list if len(r[0]) != 0]

        if len(centres) != 0:
            centres = pept.PointData(
                np.vstack(centres),
                sample_size = 0,
                overlap = 0,
                verbose = False
            )

        if verbose:
            end = time.time()
            print("\nFitting cutpoints took {} seconds.\n".format(end - start))

        if get_labels:
            # data_list is a list of tuples, in which the first element is an
            # array of the centres, and the second element is an array of the
            # labelled cutpoints.
            labelled_cutpoints = [r[1] for r in data_list if len(r[1]) != 0]
            if len(labelled_cutpoints) != 0:
                # Encapsulate `labelled_cutpoints` in a `pept.PointData`
                # instance in which every sample is the corresponding sample in
                # `cutpoints`, but with an appended column representing the
                # labels. Therefore, the `sample_size` is the same as for
                # `cutpoints`, which is equal to the length of every array in
                # `labelled_cutpoints`
                labelled_cutpoints = pept.PointData(
                    np.vstack(labelled_cutpoints),
                    sample_size = len(labelled_cutpoints[0]),
                    overlap = 0,
                    verbose = False
                )

            # Set the attribute `labels` to the stacked labels of all the
            # labelled cutpoints; that is, the last column in the
            # labelled_cutpoints internal data:
            self._labels = labelled_cutpoints.points[:, -1]

            return centres, labelled_cutpoints

        return centres


    def optimise(
        self,
        points,
        nsamples = 16,
        selected_indices = None,
        max_workers = None,
        verbose = True,
        _stability_params = None
    ):
        '''Optimise HDBSCANClusterer settings against a given dataset of
        `points`.

        The `points` are either a single sample (i.e. an (M, N>=4) numpy array)
        or an instance of `PointData` (i.e. multiple samples). In the latter
        case, the settings are optimised against `nsamples` random ramples
        (if `selected_indices` is None), or the samples at indices
        `selected_incides`.

        The CMA-ES (Covariance Matrix Adaptation Evolution Strategy) [1]_
        algorithm is used to find the `min_cluster_size` and
        `min_samples` combination that maximises the overall cluster
        stabilities, quantified with a fitness function that uses:

        1. The probability of each clustered point of being in the assigned
           cluster.
        2. The persistence of each cluster, gauging the relative coherence of
           the clusters.

        As the samples used for optimisation are chosen randomly and CMA-ES is
        a stochastic algorithm, some variance in the results of different
        optimisation passes is expected.

        Parameters
        ----------
        points : (M, N>=4) numpy.ndarray or pept.PointData
            The dataset for which the HDBSCAN parameters will be optimised. It
            is either a single sample (a numpy array) or an instance of
            `PointData` (multiple samples).

        nsamples : int, default 16
            If `points` is an instance of `pept.PointData` and
            `selected_indices` is `None`, `nsamples` random samples are
            selected to optimise against. A larger number may yield better
            parameters, but take more time to find. The default of 16 provides
            a good trade-off between the two.

        selected_indices : list-like of int, optional
            If `None`, `nsamples` random samples are selected from `points`.
            Otherwise, select the samples in `points` at the indices contained
            in `selected_indices`. Only has an effect if `points` is a
            `pept.PointData`.

        max_workers : int, optional
            The maximum number of threads that will be used for asynchronously
            clustering the selected samples in `points`. If unset (`None`), the
            number of threads available on the machine (as returned by
            `os.cpu_count()`) will be used.

        verbose : bool, default True
            Provide extra information during the optimisation pass.

        _stability_params : list-like of length 2 or 4, optional
            If set, the function will no longer run optimisations; rather, it
            will only return the negative fitness score, computed for the
            parameters in `_stability_params`. If `allow_single_cluster` is
            "auto", this parameter should contain the `min_cluster_size` and
            `min_samples` values for `self.clusterer`, followed by the values
            for `self.clusterer_single`. Otherwise, it should contain only the
            values for `self.clusterer`. This is useful for debugging and
            using a different algorithm than CMA-ES.

        Returns
        -------
        stabilities : float or list[float] of length 2
            If `_stability_params` is not `None`, this method will return the
            cluster stability scores. If `allow_single_cluster` is "auto", it
            will return the scores for both the `clusterer` and
            `clusterer_single`; otherwise it returns only the `clusterer`
            score. If `_stability_params` is `None`, nothing is returned.

        Raises
        ------
        ValueError
            If `selected_indices` is not `None` and is not a list-like with a
            single dimension.

        ValueError
            If `points` is a single sample and does not have shape (M, N>=4).

        Notes
        -----
        If the optimisation is successful, this class' `min_cluster_size` and
        `min_samples` attributes will be automatically set to the optimum
        values found.

        For datasets with 2000 points per sample, the optimisation should have
        around 200 function evaluations (i.e. clustering runs) and take
        somewhere under 2 minutes.

        References
        ----------
        .. [1] Nikolaus Hansen, Youhei Akimoto, and Petr Baudis. CMA-ES/pycma
           on Github. Zenodo, DOI:10.5281/zenodo.2559634, February 2019.

        '''

        # Select samples for clustering
        # points is either a single sample or PointData (i.e. multiple samples)
        if isinstance(points, pept.PointData):
            if selected_indices is None:
                # Select `nsamples` random samples
                rng = np.random.default_rng()
                selected_indices = rng.integers(0, len(points) - 1, nsamples)
            else:
                # Ensure `selected_indices` is a list-like of integers
                selected_indices = np.asarray(selected_indices, dtype = int)
                if selected_indices.ndim != 1:
                    raise ValueError(textwrap.fill(
                        "[ERROR]: If `selected_indices` is defined, it must "
                        "be list-like with a single dimension. Received "
                        f"{selected_indices.shape}"
                    ))

            selected = [points[i] for i in selected_indices]

        else:
            # We only have a single sample
            points = np.asarray(points, dtype = float)
            if points.ndim != 2 or points.shape[1] < 4:
                raise ValueError(textwrap.fill(
                    "[ERROR]: If `points` is a single sample, it should have "
                    f"shape (M, N>=4). Received {points.shape}."
                ))

            selected = [points]

        # If `_stability_params` were given, don't optimise and only return the
        # stability scores.
        if _stability_params is not None:
            score = self._optimise(
                self.clusterer, selected, max_workers, verbose,
                _stability_params[:2]
            )

            if self.clusterer_single is not None:
                self.clusterer.min_cluster_size = \
                    int(round(_stability_params[0]))
                self.clusterer.min_samples = int(round(_stability_params[1]))

                score_single = self._optimise_single(
                    self.clusterer_single, self.clusterer, selected,
                    max_workers, verbose, _stability_params[2:]
                )

                return score, score_single

            return score

        # Print optimisation info to the terminal.
        if verbose:
            print((
                "\n" + "-" * 79 + "\n" +
                "Optimising HDBSCANClusterer settings against given "
                "dataset.\n" + "-" * 79 + "\n"
            ))

            if self.clusterer_single is not None:
                print((
                    '`allow_single_cluster` is "auto" - will optimise '
                    "clusterers individually.\n"
                ))

        # Optimise `self.clusterer` settings. `self.clusterer_single` is not
        # used here!
        min_cluster_size, min_samples, score = self._optimise(
            self.clusterer, selected, max_workers, verbose, _stability_params
        )

        # Warning if no feasible settings / clusters were found. Otherwise set
        # min_cluster_size and min_samples to the found optimal values.
        if score == 0.0:
            warnings.warn(
                textwrap.fill((
                    "\n[WARNING]: Could not find settings combination that "
                    "improves the clusters' stability. Are there any clusters "
                    "in the input dataset?\nDid not set `min_cluster_size` "
                    "and `min_samples`.\n"
                ), replace_whitespace = False),
                RuntimeWarning
            )
        else:
            self.min_cluster_size = min_cluster_size
            self.min_samples = min_samples

        # If `allow_single_cluster` == "auto" (i.e. clusterer_single != None),
        # also optimise `self.clusterer_sigle` settings, only for the samples
        # in which `self.clusterer` does not find anything.
        if self.clusterer_single is not None:
            min_cluster_size_single, min_samples_single, score_single = \
                self._optimise_single(
                    self.clusterer_single, self.clusterer, selected,
                    max_workers, verbose, _stability_params
                )

            # Warning if no samples with single clusters were found. Otherwise
            # set `min_cluster_size_single` and `min_samples` to the found
            # optimal values.
            if score_single == 0.0:
                warnings.warn(
                    textwrap.fill((
                        "\n[WARNING]: Could not find settings combination that"
                        " improves the clusters' stability for the single "
                        "cluster case. Are there any samples with a single "
                        "tracer in the input dataset?\n\n"
                        "Setting `min_cluster_size` and `min_samples` to the "
                        "multi-tracer settings.\n"
                    ), replace_whitespace = False),
                    RuntimeWarning
                )
                self.min_cluster_size_single = min_cluster_size
                self.min_samples_single = min_samples
            else:
                self.min_cluster_size_single = min_cluster_size_single
                self.min_samples_single = min_samples_single

        # Print optimisation results to terminal.
        if verbose:
            print((
                "-" * 79 +
                "\nOptimal parameters found:\n"
                f"min_cluster_size = {self.min_cluster_size}\n"
                f"min_samples =      {self.min_samples}"
            ))

            if self.clusterer_single is not None:
                print((
                    "\n"
                    f"min_cluster_size_single = {self.min_cluster_size_single}"
                    f"\nmin_samples_single =      {self.min_samples_single}"
                ))

            print("-" * 79 + "\n\n")


    def _optimise(
        self,
        clusterer,
        selected,
        max_workers = None,
        verbose = True,
        _stability_params = None
    ):
        if verbose:
            start = time.time()

        if max_workers is None:
            max_workers = os.cpu_count()

        def stability_sample(clusterer, sample):
            # Only cluster based on [x, y, z].
            labels = clusterer.fit_predict(sample[:, 1:4])
            max_label = labels.max()

            prob = clusterer.probabilities_
            pers = clusterer.cluster_persistence_

            score = 0.0
            for i, label in enumerate(range(0, max_label + 1)):
                score += prob[labels == label].mean() * pers[i]

            return -score / (max_label + 2)

        def stability(params):
            params = np.asarray(params)
            clusterer.min_cluster_size = int(round(params[0]))
            clusterer.min_samples = int(round(params[1]))

            # Compute stabilities for each selected sample in parallel
            stabilities = Parallel(n_jobs = max_workers)(
                delayed(stability_sample)(clusterer, sample)
                for sample in selected
            )

            stabilities = np.array(stabilities)
            score = stabilities.mean()

            # Decrease score if there is more than 50% difference between
            # `min_cluster_size` and `min_samples`
            correction = params.min() / params.max()

            return score * correction if correction < 0.5 else score

        if _stability_params is not None:
            return stability(_stability_params)

        sample_size = len(selected[0])
        x0 = [2, 2]                 # starting guess = smallest parameters
        sigma0 = 0.1 * sample_size  # initial standard deviation
        bounds = [2, sample_size]   # bounds for min_{cluster_size | samples}

        es = cma.CMAEvolutionStrategy(x0, sigma0, dict(
            bounds = bounds,
            integer_variables = [0, 1],
            tolflatfitness = 10,                # Try 10 times if f is flat
            tolfun = -np.inf,                   # Only stop when tolx < 1
            tolx = 1,                           # Stop if changes in x < 1
            verbose = 3 if verbose else -9
        ))
        es.optimize(stability)

        min_cluster_size = int(round(es.result.xfavorite[0]))
        min_samples = int(round(es.result.xfavorite[1]))
        score = es.result.fbest

        if verbose:
            es.result_pretty()

            end = time.time()
            print(f"\nOptimisation took {end - start} s.\n")

        return min_cluster_size, min_samples, score


    def _optimise_single(
        self,
        clusterer_single,
        clusterer_multi,
        selected,
        max_workers = None,
        verbose = True,
        _stability_params = None
    ):

        if verbose:
            start = time.time()

        if max_workers is None:
            max_workers = os.cpu_count()

        def stability_sample(clusterer_single, clusterer_multi, sample):
            # Only cluster based on [x, y, z].
            labels = clusterer_multi.fit_predict(sample[:, 1:4])
            max_label = labels.max()

            # Only score if single cluster...
            if max_label != -1:
                return 0

            labels = clusterer_single.fit_predict(sample[:, 1:4])
            max_label = labels.max()

            prob = clusterer_single.probabilities_
            pers = clusterer_single.cluster_persistence_

            score = 0.0
            for i, label in enumerate(range(0, max_label + 1)):
                score += prob[labels == label].mean() * pers[i]

            return -score / (max_label + 2)

        def stability(params):
            params = np.asarray(params)
            clusterer_single.min_cluster_size = int(round(params[0]))
            clusterer_single.min_samples = int(round(params[1]))

            # Compute stabilities for each selected sample in parallel
            stabilities = Parallel(n_jobs = max_workers)(
                delayed(stability_sample)(
                    clusterer_single, clusterer_multi, sample
                ) for sample in selected
            )

            stabilities = np.array(stabilities)
            score = stabilities.mean()

            # Decrease score if there is more than 50% difference between
            # `min_cluster_size` and `min_samples`
            correction = params.min() / params.max()

            return score * correction if correction < 0.5 else score

        if _stability_params is not None:
            return stability(_stability_params)

        sample_size = len(selected[0])
        x0 = [2, 2]                 # starting guess = smallest parameters
        sigma0 = 0.1 * sample_size  # initial standard deviation
        bounds = [2, sample_size]   # bounds for min_{cluster_size | samples}

        es = cma.CMAEvolutionStrategy(x0, sigma0, dict(
            bounds = bounds,
            integer_variables = [0, 1],
            tolflatfitness = 10,                # Try 10 times if f is flat
            tolfun = -np.inf,                   # Only stop when tolx < 1
            tolx = 1,                           # Stop if changes in x < 1
            verbose = 3 if verbose else -9
        ))
        es.optimize(stability)

        min_cluster_size = int(round(es.result.xfavorite[0]))
        min_samples = int(round(es.result.xfavorite[1]))
        score = es.result.fbest

        if verbose:
            es.result_pretty()

            end = time.time()
            print(f"\nOptimisation took {end - start} s.\n")

        return min_cluster_size, min_samples, score


    def __str__(self):
        # Shown when calling print(class)
        docstr = (
            f"clusterer:\n{self.clusterer}\n\n"
            f"min_cluster_size =            {self.min_cluster_size}\n"
            f"min_samples =                 {self.min_samples}\n\n"
            f"clusterer_single:\n{self.clusterer_single}\n\n"
            f"min_cluster_size_single =     {self.min_cluster_size_single}\n"
            f"min_samples_single =          {self.min_samples_single}\n\n"
            f"allow_single_cluster =        {self.allow_single_cluster}\n"
            f"labels =                      {self.labels}"
        )

        return docstr


    def __repr__(self):
        # Shown when writing the class on a REPL
        docstr = (
            "Class instance that inherits from `peptml.HDBSCANClusterer`.\n"
            f"Type:\n{type(self)}\n\n"
            "Attributes\n"
            "----------\n"
            f"{self.__str__()}\n"
        )

        return docstr




# The code below is adapted from the PEPT-EM algorithm developed by Antoine
# Renaud and Sam Manger
def _centroid(lors, weights):
    nx = np.newaxis

    m = np.identity(3)[nx, :, :] - lors[:, nx, 4:7] * lors[:, 4:7, nx]
    m *= weights[:, nx, nx]

    n = np.sum(m, axis = 0)
    v = np.sum(np.sum(m * lors[:, nx, 1:4], axis = -1), axis = 0)

    return np.matmul(np.linalg.inv(n), v)


def _dist_matrix(x, lors):
    y = x[np.newaxis, :3] - lors[:, 1:4]
    return np.sum(y**2, axis = -1) - np.sum(y * lors[:, 4:7], axis = -1)**2


def _latent_weights(d2, s):
    w = np.exp(-d2 / 2 / s**2) / (s * 2.50662827463)
    w /= np.sum(w)
    return w


def _linear_weights(d2):
    return (d2.max() - d2) / (d2.max() - d2.min())


def _predict_gaussian(lines, iters = 2):
    # Rewrite LoRs in the vectorial form y(x) = position + x * direction
    lors = lines[:, :7].copy(order = "C")

    lors[:, 4:7] = lors[:, 4:7] - lors[:, 1:4]
    lors[:, 4:7] /= np.linalg.norm(lors[:, 3:], axis = -1)[:, np.newaxis]

    # Begin with equal weights for all LoRs
    weights = np.ones(len(lors))

    # Run two EM iterations so that a reasonable amount of LoRs remain with
    # significant weights
    for _ in range(iters):
        x = _centroid(lors, weights)
        d2 = _dist_matrix(x, lors)
        # variance = np.sqrt(np.mean(d2 * weights))
        # print(variance)
        weights = _latent_weights(d2, 5.0)

    return x, weights


def _predict_simple(lines):
    # Rewrite LoRs in the vectorial form y(x) = position + x * direction
    lors = lines[:, :7].copy(order = "C")

    lors[:, 4:7] = lors[:, 4:7] - lors[:, 1:4]
    lors[:, 4:7] /= np.linalg.norm(lors[:, 3:], axis = -1)[:, np.newaxis]

    # Begin with equal weights for all LoRs
    weights = np.ones(len(lors))
    x = _centroid(lors, weights)
    d2 = _dist_matrix(x, lors)

    for i in range(6):
        k = int(len(d2) * (1 - 0.1 * (i + 1)))
        part = np.argpartition(d2, k)
        weights[part[k:]] = 0

        x = _centroid(lors, weights)
        d2 = _dist_matrix(x, lors)

    weights[weights != 0] = _linear_weights(d2[weights != 0])
    return x, weights




[docs]class HDBSCAN(pept.base.PointDataFilter): '''Use HDBSCAN to cluster some ``pept.PointData`` and append a cluster label to each point. Filter signature: :: PointData -> HDBSCAN.fit_sample -> PointData The only free parameter to select is the ``true_fraction``, a relative measure of the ratio of inliers to outliers. A noisy sample - e.g. first pass of clustering of cutpoints - may need a value of `0.15`. A cleaned up dataset - e.g. a second pass of clustering - can work with `0.6`. You can also set the maximum number of tracers visible at any one time in the system in ``max_tracers`` (default 1). This is simply an inverse scaling factor, but the ``true_fraction`` is quite robust with varying numbers of tracers. '''
[docs] def __init__(self, true_fraction, max_tracers = 1): self.true_fraction = float(true_fraction) self.max_tracers = int(max_tracers) self.clusterer = hdbscan.HDBSCAN( allow_single_cluster = False, core_dist_n_jobs = 1, )
def _inject_cluster(self, points): # Inject artificial cluster in a clearly outlying region quantiles = np.quantile(points[:, 1:4], [0.25, 0.75], axis = 0) qrange = quantiles[1, :] - quantiles[0, :] # Artificial cluster centre = Tukey's upper fence artificial = quantiles[1, :] + 1.5 * qrange # Pre-allocate array for the artificial cluster's points num_points = int(len(points) * self.true_fraction / self.max_tracers) cluster = np.zeros((num_points, points.shape[1])) # Fill artificial cluster's points' coordinates with normally # distributed points of a variance comparable with that of the points cluster[:, 1:4] = np.random.normal( artificial, 0.3 * qrange, size = (num_points, 3), ) return np.vstack((points, cluster))
[docs] def fit_sample(self, sample_points): # Type-checking inputs if not isinstance(sample_points, pept.PointData): sample_points = pept.PointData(sample_points) points = sample_points.points # If there are no points, return empty PointData with the same attrs if len(points) <= 1: return sample_points.copy( data = np.empty((0, points.shape[1] + 1)), columns = sample_points.columns + ["label"], ) # Inject artificial cluster outside FoV to enforce characteristic # length. Will remove labels given to those points after fitting points_art = self._inject_cluster(points) # Set HDBSCAN's parameters phi = int(self.true_fraction * len(points_art) / (self.max_tracers + 1)) if phi >= len(points): phi = len(points) if phi < 2: phi = 2 self.clusterer.min_cluster_size = phi self.clusterer.min_samples = phi # Cluster the points given and get each point's label with warnings.catch_warnings(): # Ignore deprecation warning from HDBSCAN's use of `np.bool` warnings.simplefilter("ignore", category = DeprecationWarning) labels = self.clusterer.fit_predict(points_art[:, 1:4]) # Assign noise to all points included in the artificial cluster labels_art = labels[len(points):] for label in np.unique(labels_art): labels[labels == label] = -1 # Remove labels given to the artificial cluster's points labels = labels[:len(points)] # Map labels from [0, 2, 2, 3, 0] to [0, 1, 1, 2, 0] good = (labels != -1) _, labels[good] = np.unique(labels[good], return_inverse = True) # Construct new PointData instance with the same attributes return sample_points.copy( data = np.c_[points, labels], columns = sample_points.columns + ["label"], )