From 088ae6bd815647d1c25d39273762a4ccf112f5ad Mon Sep 17 00:00:00 2001 From: Aske-Rosted <askerosted@gmail.com> Date: Thu, 16 Feb 2023 13:52:42 +0900 Subject: [PATCH 1/6] adding base class generators --- src/graphnet/data/dataconverter.py | 11 ++++ src/graphnet/data/generators/__init__.py | 5 ++ src/graphnet/data/generators/generator.py | 63 +++++++++++++++++++++++ 3 files changed, 79 insertions(+) create mode 100644 src/graphnet/data/generators/__init__.py create mode 100644 src/graphnet/data/generators/generator.py diff --git a/src/graphnet/data/dataconverter.py b/src/graphnet/data/dataconverter.py index 077c812e1..990006cd2 100644 --- a/src/graphnet/data/dataconverter.py +++ b/src/graphnet/data/dataconverter.py @@ -35,6 +35,11 @@ from graphnet.data.extractors import ( I3TruthExtractor, I3GenericExtractor, ) +from graphnet.data.generators import ( + Generator, + GeneratorCollection, +) + from graphnet.utilities.decorators import final from graphnet.utilities.filesys import find_i3_files from graphnet.utilities.imports import has_icecube_package @@ -101,6 +106,7 @@ class DataConverter(ABC, LoggerMixin): outdir: str, gcd_rescue: Optional[str] = None, *, + generators: Optional[List[Generator]] = None, nb_files_to_batch: Optional[int] = None, sequential_batch_pattern: Optional[str] = None, input_file_batch_pattern: Optional[str] = None, @@ -169,6 +175,8 @@ class DataConverter(ABC, LoggerMixin): # Create I3Extractors self._extractors = I3ExtractorCollection(*extractors) + if generators is not None: + self._generators = GeneratorCollection(*generators) # Create shorthand of names of all pulsemaps queried self._table_names = [extractor.name for extractor in self._extractors] @@ -453,6 +461,9 @@ class DataConverter(ABC, LoggerMixin): for table in data_dict.keys(): data_dict[table][self._index_column] = index + if self._generators: + data_dict = self._generators(data_dict) + data.append(data_dict) return data diff --git a/src/graphnet/data/generators/__init__.py b/src/graphnet/data/generators/__init__.py new file mode 100644 index 000000000..3986334f7 --- /dev/null +++ b/src/graphnet/data/generators/__init__.py @@ -0,0 +1,5 @@ +"""Collection of I3Generators, for generating additional data from I3Frames.""" + +from .generator import Generator, GeneratorCollection + +from .coarsepulsegenerator import CoarsePulseGenerator diff --git a/src/graphnet/data/generators/generator.py b/src/graphnet/data/generators/generator.py new file mode 100644 index 000000000..fcb6b35ef --- /dev/null +++ b/src/graphnet/data/generators/generator.py @@ -0,0 +1,63 @@ +"""Base class for I3 generators.""" + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, Dict, List, Optional +from collections import OrderedDict + + +from graphnet.utilities.imports import has_icecube_package +from graphnet.utilities.logging import LoggerMixin +from copy import deepcopy + +if has_icecube_package() or TYPE_CHECKING: + from icecube import icetray, dataio # pyright: reportMissingImports=false + + +class Generator(ABC, LoggerMixin): + """Base class for generating additional data from Frames. + + All classes inheriting from `Generator` should implement the `__call__` + method, and can be applied directly on an OrderedDict generated from an + icetray.I3Frame objects to return generated table. + """ + + def __init__(self, name: str): + """Construct Generator. + + Args: + name: Name of the `Generator` instance. Used to keep track of the + provenance of different data, and to name tables to which this + data is saved. + """ + # Member variable(s) + self._name: str = name + + @abstractmethod + def __call__(self, data: Dict[str, Any]) -> dict: + """Return ordered dict with generated features.""" + pass + + @property + def name(self) -> str: + """Get name of generator instance.""" + return self._name + + +class GeneratorCollection(list): + """Collection of Generators, for generating additional data from Frames.""" + + def __init__(self, *generators: Generator) -> None: + """Construct GeneratorCollection.""" + for generator in generators: + assert isinstance( + generator, Generator + ), "All generators must be of type Generator" + + super().__init__(generators) + + def __call__(self, data: OrderedDict) -> OrderedDict: + """Update input dict with generated features.""" + tmp_data = deepcopy(data) + for generator in self: + data.update(generator(tmp_data)) + return data -- GitLab From 46dfec6b85b7e894ed550a9f6c13d8fcb46ddf70 Mon Sep 17 00:00:00 2001 From: Aske-Rosted <askerosted@gmail.com> Date: Thu, 16 Feb 2023 13:55:59 +0900 Subject: [PATCH 2/6] adding coarsening generator --- .../data/generators/coarsepulsegenerator.py | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 src/graphnet/data/generators/coarsepulsegenerator.py diff --git a/src/graphnet/data/generators/coarsepulsegenerator.py b/src/graphnet/data/generators/coarsepulsegenerator.py new file mode 100644 index 000000000..55d7565a7 --- /dev/null +++ b/src/graphnet/data/generators/coarsepulsegenerator.py @@ -0,0 +1,155 @@ +"""I3Extractor class(es) for extracting specific, reconstructed features.""" + +from typing import TYPE_CHECKING, Any, Dict, OrderedDict, List, Optional +from graphnet.data.generators import Generator + +from graphnet.utilities.imports import has_icecube_package +from graphnet.utilities.logging import warn_once + +from torch import Tensor, unique + +from sklearn.cluster import KMeans + +import numpy as np + +if has_icecube_package() or TYPE_CHECKING: + from icecube import ( + icetray, + dataclasses, + ) # pyright: reportMissingImports=false + + +class CoarsePulseGenerator(Generator): + def __init__( + self, + pulsemap: str, + name: str, + method: str, + coarsen_on: Optional[List[str]] = None, + keep_columns: Optional[List[str]] = None, + reduc: int = 100, + min_n: int = 25, + ): + """Construct CoarsePulsemapGenerator. + + Args: + pulsemap: Name of the pulse (series) map for which to extract + reconstructed features. + name: Name of the `Generator` instance. + method: Method to use for coarse pulsemap generation. + coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm. + keep_columns: List of pulsemap columns to keep in final coarse pulsemap. + """ + # Member variable(s) + self._pulsemap = pulsemap + # reduction method + self._method = method + # pseudo-distance to use for coarsening + self._coarsen_on = coarsen_on + # target reduction factor ie. 1/reduc + self.reduc = reduc + # minimum number of pulses to keep + self.min_n = min_n + # columns to keep in final coarse pulsemap + self._keep_columns = keep_columns + + # set coarsen_on if not specified + if coarsen_on == None: + self._coarsen_on = [ + "dom_x", + "dom_y", + "dom_z", + ] + + # Base class constructor + super().__init__(name) + + def __call__(self, data: Dict[str, Any]) -> dict: + """Extract reconstructed features from `frame`. + + Args: + data: Ordered dictionary generated from physics I3 frame. + Returns: + data: + """ + # Get pulse series + self._pulse_data = data[self._pulsemap] + # get feature keys + self._pulse_names = self._pulse_data.keys() + # Get keep columns + if self._keep_columns is None: + self._keep_columns = self._pulse_names + # Get time index + self._time_index = [i for i in self._pulse_data if "time" in i] + # Get charge index + self._charge_index = [i for i in self._pulse_data if "charge" in i] + # Get coarse pulse series + coarse_pulse_data = self.get_coarse_pulse_data(self._method) + # return coarsened pulse series + return OrderedDict(self._pulsemap + "_coarse", coarse_pulse_data) + + def get_coarse_pulse_data(self) -> any: + """Get coarse pulse series. + + Args: + pulse_data: Pulse series to coarsen. + Returns: + coarse_pulse_data: Coarsened pulse series. + """ + # Get coarse pulse series + data_CP = self._pulse_data.deepcopy() + # change time into distance using speed of light in ice. + data_CP[self._time_index] = ( + data_CP[self._time_index] * 2.3 * 10 ** (-1) + ) + # Take the spatial + time (transformed) values and use those for the coarsening algorithm + tensor = Tensor(data_CP[self._coarsen_on + self._time_index]).T + reduc = min([len(tensor), reduc]) + min_n = int(len(tensor) / min_n) + # reduce by factor 100 ensuring not to reduce below min red (unless less dom activations in event) + n_clusters = max([reduc, min_n]) + if self._method == "Kmeans": + clusterer = KMeans( + n_clusters=n_clusters, random_state=10, init="random", n_init=1 + ) + else: + raise ValueError("Method not implemented") + + index = clusterer.fit_predict(tensor) + + data_with_group = np.vstack([index, self._pulse_data]) + data_with_group = data_with_group.T[data_with_group[0, :].argsort()] + data_grouped = np.array( + np.split( + data_with_group[:, 1:], + np.unique(data_with_group[:, 0], return_index=True)[1][1:], + ), + dtype=object, + ) + # mget mean of grouped data and multiply charge by number of pulses in group. + for self._pulse_data, i in zip(data_grouped, range(len(data_grouped))): + counter = np.shape(self._pulse_data)[0] + data_grouped[i] = np.mean(self._pulse_data, axis=0) + data_grouped[i][self._charge_index] = ( + data_grouped[i][self._charge_index] * counter + ) + if len(np.shape(np.array(list(data_grouped)).T)) == 3: + data_grouped = data_grouped[:, 0, :] + + result = np.array(list(data_grouped)).T + result = OrderedDict(result.T, columns=self._pulse_names)[ + self._keep_columns + ] + + return result + + def get_time_index(self): + """Get time index. + + Args: + None + Returns: + time_index: Index of time feature. + """ + time_index = "dom_time" + return time_index -- GitLab From cccfbdab71ceabcaec956704bed15bd74694a6be Mon Sep 17 00:00:00 2001 From: Aske-Rosted <askerosted@gmail.com> Date: Thu, 16 Feb 2023 17:04:36 +0900 Subject: [PATCH 3/6] Created coarse pulse generator --- .../data/generators/coarsepulsegenerator.py | 103 ++++++++++-------- 1 file changed, 57 insertions(+), 46 deletions(-) diff --git a/src/graphnet/data/generators/coarsepulsegenerator.py b/src/graphnet/data/generators/coarsepulsegenerator.py index 55d7565a7..bcb76d1ab 100644 --- a/src/graphnet/data/generators/coarsepulsegenerator.py +++ b/src/graphnet/data/generators/coarsepulsegenerator.py @@ -6,11 +6,14 @@ from graphnet.data.generators import Generator from graphnet.utilities.imports import has_icecube_package from graphnet.utilities.logging import warn_once +from copy import deepcopy + from torch import Tensor, unique from sklearn.cluster import KMeans import numpy as np +import pandas as pd if has_icecube_package() or TYPE_CHECKING: from icecube import ( @@ -20,12 +23,19 @@ if has_icecube_package() or TYPE_CHECKING: class CoarsePulseGenerator(Generator): + """Generator for producing coarse pulsemaps from I3PulseSeriesMaps.""" + def __init__( self, pulsemap: str, name: str, method: str, - coarsen_on: Optional[List[str]] = None, + coarsen_on: List[str] = [ + "dom_x", + "dom_y", + "dom_z", + "dom_time", + ], keep_columns: Optional[List[str]] = None, reduc: int = 100, min_n: int = 25, @@ -39,6 +49,8 @@ class CoarsePulseGenerator(Generator): method: Method to use for coarse pulsemap generation. coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm. keep_columns: List of pulsemap columns to keep in final coarse pulsemap. + reduc: Target reduction factor for coarse pulsemap. + min_n: Minimum number of pulses to keep in coarse pulsemap. """ # Member variable(s) self._pulsemap = pulsemap @@ -47,20 +59,12 @@ class CoarsePulseGenerator(Generator): # pseudo-distance to use for coarsening self._coarsen_on = coarsen_on # target reduction factor ie. 1/reduc - self.reduc = reduc + self._reduc = reduc # minimum number of pulses to keep - self.min_n = min_n + self._min_n = min_n # columns to keep in final coarse pulsemap self._keep_columns = keep_columns - # set coarsen_on if not specified - if coarsen_on == None: - self._coarsen_on = [ - "dom_x", - "dom_y", - "dom_z", - ] - # Base class constructor super().__init__(name) @@ -69,43 +73,55 @@ class CoarsePulseGenerator(Generator): Args: data: Ordered dictionary generated from physics I3 frame. + Returns: data: """ # Get pulse series self._pulse_data = data[self._pulsemap] # get feature keys - self._pulse_names = self._pulse_data.keys() + self._pulse_names = list(self._pulse_data.keys()) + # Get keep columns if self._keep_columns is None: self._keep_columns = self._pulse_names - # Get time index - self._time_index = [i for i in self._pulse_data if "time" in i] # Get charge index - self._charge_index = [i for i in self._pulse_data if "charge" in i] + self._charge_index = [ + self._pulse_names.index(i) + for i in self._pulse_names + if "charge" in i + ] # Get coarse pulse series - coarse_pulse_data = self.get_coarse_pulse_data(self._method) + coarse_pulse_data = self.get_coarse_pulse_data() # return coarsened pulse series - return OrderedDict(self._pulsemap + "_coarse", coarse_pulse_data) + return {self._name: coarse_pulse_data} - def get_coarse_pulse_data(self) -> any: + def get_coarse_pulse_data(self) -> dict: """Get coarse pulse series. Args: pulse_data: Pulse series to coarsen. + Returns: coarse_pulse_data: Coarsened pulse series. """ + # get index values of columns to keep. + if self._keep_columns is not None: + keep_index = [ + self._pulse_names.index(i) for i in self._keep_columns + ] + + data_CP = [] + for i in self._coarsen_on: + data_CP.append(np.array(self._pulse_data[i])) + data_CP = np.array(data_CP) # Get coarse pulse series - data_CP = self._pulse_data.deepcopy() # change time into distance using speed of light in ice. - data_CP[self._time_index] = ( - data_CP[self._time_index] * 2.3 * 10 ** (-1) - ) + data_CP[-1] = data_CP[-1] * 2.3 * 10 ** (-1) # Take the spatial + time (transformed) values and use those for the coarsening algorithm - tensor = Tensor(data_CP[self._coarsen_on + self._time_index]).T - reduc = min([len(tensor), reduc]) - min_n = int(len(tensor) / min_n) + tensor = Tensor(data_CP).T + min_n = min([len(tensor), self._min_n]) + reduc = int(len(tensor) / self._reduc) # reduce by factor 100 ensuring not to reduce below min red (unless less dom activations in event) n_clusters = max([reduc, min_n]) if self._method == "Kmeans": @@ -116,8 +132,8 @@ class CoarsePulseGenerator(Generator): raise ValueError("Method not implemented") index = clusterer.fit_predict(tensor) - - data_with_group = np.vstack([index, self._pulse_data]) + pulse_df = pd.DataFrame(self._pulse_data, index=None).T + data_with_group = np.vstack([index, pulse_df]) data_with_group = data_with_group.T[data_with_group[0, :].argsort()] data_grouped = np.array( np.split( @@ -126,30 +142,25 @@ class CoarsePulseGenerator(Generator): ), dtype=object, ) + # mget mean of grouped data and multiply charge by number of pulses in group. - for self._pulse_data, i in zip(data_grouped, range(len(data_grouped))): - counter = np.shape(self._pulse_data)[0] - data_grouped[i] = np.mean(self._pulse_data, axis=0) - data_grouped[i][self._charge_index] = ( - data_grouped[i][self._charge_index] * counter + for data, ind in zip(data_grouped, range(len(data_grouped))): + counter = np.shape(data)[0] + data_grouped[ind] = np.mean(data, axis=0) + data_grouped[ind][self._charge_index] = ( + data_grouped[ind][self._charge_index] * counter ) if len(np.shape(np.array(list(data_grouped)).T)) == 3: data_grouped = data_grouped[:, 0, :] result = np.array(list(data_grouped)).T - result = OrderedDict(result.T, columns=self._pulse_names)[ - self._keep_columns - ] + result = dict( + zip( + list(np.array(self._pulse_names)[keep_index]), + result[keep_index], + ) + ) + # update event_no + result.update({"event_no": self._pulse_data["event_no"]}) return result - - def get_time_index(self): - """Get time index. - - Args: - None - Returns: - time_index: Index of time feature. - """ - time_index = "dom_time" - return time_index -- GitLab From d79e712527a2457cb61cb4628595ceb36a5a681a Mon Sep 17 00:00:00 2001 From: Aske-Rosted <askerosted@gmail.com> Date: Mon, 20 Feb 2023 12:50:32 +0900 Subject: [PATCH 4/6] generate table before indices --- src/graphnet/data/dataconverter.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/graphnet/data/dataconverter.py b/src/graphnet/data/dataconverter.py index 8950e2afb..7560c71b5 100644 --- a/src/graphnet/data/dataconverter.py +++ b/src/graphnet/data/dataconverter.py @@ -456,6 +456,10 @@ class DataConverter(ABC, LoggerMixin): if isinstance(extractor, I3GenericExtractor): data_dict.update(data_dict.pop(extractor._name)) + if self._generators: + data_dict = self._generators(data_dict) + + data.append(data_dict) # Get new, unique index and increment value if multi_processing: with global_index.get_lock(): # type: ignore[name-defined] @@ -469,11 +473,6 @@ class DataConverter(ABC, LoggerMixin): for table in data_dict.keys(): data_dict[table][self._index_column] = index - if self._generators: - data_dict = self._generators(data_dict) - - data.append(data_dict) - return data def get_map_function( -- GitLab From 5cb5efb3c2e2d3481270225581e2ed7339ccb7f2 Mon Sep 17 00:00:00 2001 From: Aske-Rosted <askerosted@gmail.com> Date: Tue, 21 Feb 2023 12:59:46 +0900 Subject: [PATCH 5/6] bugfixing and tidying --- src/graphnet/data/dataconverter.py | 3 +- .../data/generators/coarsepulsegenerator.py | 37 ++++++++++++------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/src/graphnet/data/dataconverter.py b/src/graphnet/data/dataconverter.py index 7560c71b5..0c65714ec 100644 --- a/src/graphnet/data/dataconverter.py +++ b/src/graphnet/data/dataconverter.py @@ -459,7 +459,6 @@ class DataConverter(ABC, LoggerMixin): if self._generators: data_dict = self._generators(data_dict) - data.append(data_dict) # Get new, unique index and increment value if multi_processing: with global_index.get_lock(): # type: ignore[name-defined] @@ -473,6 +472,8 @@ class DataConverter(ABC, LoggerMixin): for table in data_dict.keys(): data_dict[table][self._index_column] = index + data.append(data_dict) + return data def get_map_function( diff --git a/src/graphnet/data/generators/coarsepulsegenerator.py b/src/graphnet/data/generators/coarsepulsegenerator.py index bcb76d1ab..5dd41e175 100644 --- a/src/graphnet/data/generators/coarsepulsegenerator.py +++ b/src/graphnet/data/generators/coarsepulsegenerator.py @@ -34,8 +34,8 @@ class CoarsePulseGenerator(Generator): "dom_x", "dom_y", "dom_z", - "dom_time", ], + time_label: str = "dom_time", keep_columns: Optional[List[str]] = None, reduc: int = 100, min_n: int = 25, @@ -48,6 +48,7 @@ class CoarsePulseGenerator(Generator): name: Name of the `Generator` instance. method: Method to use for coarse pulsemap generation. coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm. + time_label: Name of the time column in the pulsemap. keep_columns: List of pulsemap columns to keep in final coarse pulsemap. reduc: Target reduction factor for coarse pulsemap. min_n: Minimum number of pulses to keep in coarse pulsemap. @@ -56,8 +57,8 @@ class CoarsePulseGenerator(Generator): self._pulsemap = pulsemap # reduction method self._method = method - # pseudo-distance to use for coarsening - self._coarsen_on = coarsen_on + # pseudo-distance to use for coarsening force time to be included at end. + self._coarsen_on = coarsen_on + [time_label] # target reduction factor ie. 1/reduc self._reduc = reduc # minimum number of pulses to keep @@ -124,14 +125,21 @@ class CoarsePulseGenerator(Generator): reduc = int(len(tensor) / self._reduc) # reduce by factor 100 ensuring not to reduce below min red (unless less dom activations in event) n_clusters = max([reduc, min_n]) - if self._method == "Kmeans": - clusterer = KMeans( - n_clusters=n_clusters, random_state=10, init="random", n_init=1 - ) - else: - raise ValueError("Method not implemented") + if len(tensor) > self._min_n: + if self._method == "Kmeans": + clusterer = KMeans( + n_clusters=n_clusters, + random_state=10, + init="random", + n_init=1, + ) + else: + raise ValueError("Method not implemented") + + index = clusterer.fit_predict(tensor) + else: # if less dom activations than clusters, just return the doms + index = np.arange(len(tensor)) - index = clusterer.fit_predict(tensor) pulse_df = pd.DataFrame(self._pulse_data, index=None).T data_with_group = np.vstack([index, pulse_df]) data_with_group = data_with_group.T[data_with_group[0, :].argsort()] @@ -143,7 +151,7 @@ class CoarsePulseGenerator(Generator): dtype=object, ) - # mget mean of grouped data and multiply charge by number of pulses in group. + # get mean of grouped data and multiply charge by number of pulses in group. for data, ind in zip(data_grouped, range(len(data_grouped))): counter = np.shape(data)[0] data_grouped[ind] = np.mean(data, axis=0) @@ -154,13 +162,14 @@ class CoarsePulseGenerator(Generator): data_grouped = data_grouped[:, 0, :] result = np.array(list(data_grouped)).T + # turn the np array of np arrays into a list of lists + result = [list(i) for i in result[keep_index]] + # write to dict result = dict( zip( list(np.array(self._pulse_names)[keep_index]), - result[keep_index], + result, ) ) - # update event_no - result.update({"event_no": self._pulse_data["event_no"]}) return result -- GitLab From 9559c07da22683f00c1b3d1781c30bad09271b33 Mon Sep 17 00:00:00 2001 From: Aske-Rosted <askerosted@gmail.com> Date: Wed, 22 Feb 2023 16:34:07 +0900 Subject: [PATCH 6/6] reorganizing and cleaning --- .../data/generators/coarsepulsegenerator.py | 197 ++++++++++++------ 1 file changed, 129 insertions(+), 68 deletions(-) diff --git a/src/graphnet/data/generators/coarsepulsegenerator.py b/src/graphnet/data/generators/coarsepulsegenerator.py index 5dd41e175..ffb4daec8 100644 --- a/src/graphnet/data/generators/coarsepulsegenerator.py +++ b/src/graphnet/data/generators/coarsepulsegenerator.py @@ -1,14 +1,11 @@ """I3Extractor class(es) for extracting specific, reconstructed features.""" -from typing import TYPE_CHECKING, Any, Dict, OrderedDict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional from graphnet.data.generators import Generator from graphnet.utilities.imports import has_icecube_package -from graphnet.utilities.logging import warn_once -from copy import deepcopy - -from torch import Tensor, unique +from torch import Tensor from sklearn.cluster import KMeans @@ -37,7 +34,7 @@ class CoarsePulseGenerator(Generator): ], time_label: str = "dom_time", keep_columns: Optional[List[str]] = None, - reduc: int = 100, + reduce: int = 100, min_n: int = 25, ): """Construct CoarsePulsemapGenerator. @@ -50,7 +47,7 @@ class CoarsePulseGenerator(Generator): coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm. time_label: Name of the time column in the pulsemap. keep_columns: List of pulsemap columns to keep in final coarse pulsemap. - reduc: Target reduction factor for coarse pulsemap. + reduce: Target reduction factor for coarse pulsemap. min_n: Minimum number of pulses to keep in coarse pulsemap. """ # Member variable(s) @@ -59,8 +56,8 @@ class CoarsePulseGenerator(Generator): self._method = method # pseudo-distance to use for coarsening force time to be included at end. self._coarsen_on = coarsen_on + [time_label] - # target reduction factor ie. 1/reduc - self._reduc = reduc + # target reduction factor ie. 1/reduce + self._reduce = reduce # minimum number of pulses to keep self._min_n = min_n # columns to keep in final coarse pulsemap @@ -100,76 +97,140 @@ class CoarsePulseGenerator(Generator): def get_coarse_pulse_data(self) -> dict: """Get coarse pulse series. - Args: - pulse_data: Pulse series to coarsen. - Returns: coarse_pulse_data: Coarsened pulse series. """ - # get index values of columns to keep. - if self._keep_columns is not None: - keep_index = [ - self._pulse_names.index(i) for i in self._keep_columns - ] - - data_CP = [] - for i in self._coarsen_on: - data_CP.append(np.array(self._pulse_data[i])) - data_CP = np.array(data_CP) - # Get coarse pulse series - # change time into distance using speed of light in ice. - data_CP[-1] = data_CP[-1] * 2.3 * 10 ** (-1) - # Take the spatial + time (transformed) values and use those for the coarsening algorithm - tensor = Tensor(data_CP).T - min_n = min([len(tensor), self._min_n]) - reduc = int(len(tensor) / self._reduc) - # reduce by factor 100 ensuring not to reduce below min red (unless less dom activations in event) - n_clusters = max([reduc, min_n]) - if len(tensor) > self._min_n: - if self._method == "Kmeans": - clusterer = KMeans( - n_clusters=n_clusters, - random_state=10, - init="random", - n_init=1, - ) - else: - raise ValueError("Method not implemented") - - index = clusterer.fit_predict(tensor) - else: # if less dom activations than clusters, just return the doms - index = np.arange(len(tensor)) - - pulse_df = pd.DataFrame(self._pulse_data, index=None).T - data_with_group = np.vstack([index, pulse_df]) - data_with_group = data_with_group.T[data_with_group[0, :].argsort()] - data_grouped = np.array( - np.split( - data_with_group[:, 1:], - np.unique(data_with_group[:, 0], return_index=True)[1][1:], - ), - dtype=object, + # get index values for grouping + index = coarsening_index( + pulse_data=self._pulse_data, + coarsen_on=self._coarsen_on, + reduce=self._reduce, + min_n=self._min_n, + method=self._method, + ) + # group pulses by index + coarse_pulse_data = group_by_index( + pulse_data=self._pulse_data, + index=index, + pulse_names=self._pulse_names, + charge_index=self._charge_index, + keep_columns=self._keep_columns, ) - # get mean of grouped data and multiply charge by number of pulses in group. - for data, ind in zip(data_grouped, range(len(data_grouped))): - counter = np.shape(data)[0] - data_grouped[ind] = np.mean(data, axis=0) - data_grouped[ind][self._charge_index] = ( - data_grouped[ind][self._charge_index] * counter + return coarse_pulse_data + + +def coarsening_index( + pulse_data: dict, + coarsen_on: List[str], + reduce: int, + min_n: int, + method: str, +) -> np.array: + """Get coarsening index. + + Args: + pulse_data: Pulse series to coarsen. + coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm, time assumed included as last entry. + reduce: Target reduction factor for coarse pulsemap. + min_n: Minimum number of pulses to keep in coarse pulsemap. + method: Method to use for coarse pulsemap generation. + + Returns: + index: Index list for grouping. + """ + data = [] + for i in coarsen_on: + data.append(np.array(pulse_data[i])) + data = np.array(data) + # Get coarse pulse series + # change time into distance using speed of light in ice. + data[-1] = data[-1] * 2.3 * 10 ** (-1) + # Take the spatial + time (transformed) values and use those for the coarsening algorithm + tensor = Tensor(data).T + min_n = min([len(tensor), min_n]) + reduce = int(len(tensor) / reduce) + # reduce by factor 100 ensuring not to reduce below min red (unless less dom activations in event) + n_clusters = max([reduce, min_n]) + if len(tensor) > min_n: + if method == "Kmeans": + clusterer = KMeans( + n_clusters=n_clusters, + random_state=10, + init="random", + n_init=1, + ) + else: + raise ValueError("Method not implemented") + + index = clusterer.fit_predict(tensor) + else: # if less dom activations than clusters, just return the doms + index = np.arange(len(tensor)) + + return index + + +def group_by_index( + pulse_data: dict, + index: List[int], + pulse_names: List[str], + charge_index: Optional[List[int]] = None, + keep_columns: Optional[List[str]] = None, +) -> dict: + """Group pulses by given grouping index. + + Args: + pulse_data: Pulse series to group. + index: Index list for grouping. + pulse_names: List of pulsemap columns. + charge_index: Index of charge column. + keep_columns: List of pulsemap columns to keep in final coarse pulsemap. + + Returns: + result: Pulsemap grouped by input index. + """ + pulse_df = pd.DataFrame(pulse_data, index=None).T + data_with_group = np.vstack([index, pulse_df]) + data_with_group = data_with_group.T[data_with_group[0, :].argsort()] + data_grouped = np.array( + np.split( + data_with_group[:, 1:], + np.unique(data_with_group[:, 0], return_index=True)[1][1:], + ), + dtype=object, + ) + + # get mean of grouped data and multiply charge by number of pulses in group. + for data, ind in zip(data_grouped, range(len(data_grouped))): + counter = np.shape(data)[0] + data_grouped[ind] = np.mean(data, axis=0) + if charge_index is not None: + data_grouped[ind][charge_index] = ( + data_grouped[ind][charge_index] * counter ) - if len(np.shape(np.array(list(data_grouped)).T)) == 3: - data_grouped = data_grouped[:, 0, :] + if len(np.shape(np.array(list(data_grouped)).T)) == 3: + data_grouped = data_grouped[:, 0, :] - result = np.array(list(data_grouped)).T - # turn the np array of np arrays into a list of lists + result = np.array(list(data_grouped)).T + # turn the np array of np arrays into a list of lists + + # get index values of columns to keep, and keep only those columns. + if keep_columns is not None: + keep_index = [pulse_names.index(i) for i in keep_columns] result = [list(i) for i in result[keep_index]] - # write to dict result = dict( zip( - list(np.array(self._pulse_names)[keep_index]), + list(np.array(pulse_names)[keep_index]), + result, + ) + ) + else: # if no keep columns specified, keep all columns. + result = [list(i) for i in result] + result = dict( + zip( + list(pulse_names), result, ) ) - return result + return result -- GitLab