From 088ae6bd815647d1c25d39273762a4ccf112f5ad Mon Sep 17 00:00:00 2001
From: Aske-Rosted <askerosted@gmail.com>
Date: Thu, 16 Feb 2023 13:52:42 +0900
Subject: [PATCH 1/6] adding base class generators

---
 src/graphnet/data/dataconverter.py        | 11 ++++
 src/graphnet/data/generators/__init__.py  |  5 ++
 src/graphnet/data/generators/generator.py | 63 +++++++++++++++++++++++
 3 files changed, 79 insertions(+)
 create mode 100644 src/graphnet/data/generators/__init__.py
 create mode 100644 src/graphnet/data/generators/generator.py

diff --git a/src/graphnet/data/dataconverter.py b/src/graphnet/data/dataconverter.py
index 077c812e1..990006cd2 100644
--- a/src/graphnet/data/dataconverter.py
+++ b/src/graphnet/data/dataconverter.py
@@ -35,6 +35,11 @@ from graphnet.data.extractors import (
     I3TruthExtractor,
     I3GenericExtractor,
 )
+from graphnet.data.generators import (
+    Generator,
+    GeneratorCollection,
+)
+
 from graphnet.utilities.decorators import final
 from graphnet.utilities.filesys import find_i3_files
 from graphnet.utilities.imports import has_icecube_package
@@ -101,6 +106,7 @@ class DataConverter(ABC, LoggerMixin):
         outdir: str,
         gcd_rescue: Optional[str] = None,
         *,
+        generators: Optional[List[Generator]] = None,
         nb_files_to_batch: Optional[int] = None,
         sequential_batch_pattern: Optional[str] = None,
         input_file_batch_pattern: Optional[str] = None,
@@ -169,6 +175,8 @@ class DataConverter(ABC, LoggerMixin):
 
         # Create I3Extractors
         self._extractors = I3ExtractorCollection(*extractors)
+        if generators is not None:
+            self._generators = GeneratorCollection(*generators)
 
         # Create shorthand of names of all pulsemaps queried
         self._table_names = [extractor.name for extractor in self._extractors]
@@ -453,6 +461,9 @@ class DataConverter(ABC, LoggerMixin):
             for table in data_dict.keys():
                 data_dict[table][self._index_column] = index
 
+            if self._generators:
+                data_dict = self._generators(data_dict)
+
             data.append(data_dict)
 
         return data
diff --git a/src/graphnet/data/generators/__init__.py b/src/graphnet/data/generators/__init__.py
new file mode 100644
index 000000000..3986334f7
--- /dev/null
+++ b/src/graphnet/data/generators/__init__.py
@@ -0,0 +1,5 @@
+"""Collection of I3Generators, for generating additional data from I3Frames."""
+
+from .generator import Generator, GeneratorCollection
+
+from .coarsepulsegenerator import CoarsePulseGenerator
diff --git a/src/graphnet/data/generators/generator.py b/src/graphnet/data/generators/generator.py
new file mode 100644
index 000000000..fcb6b35ef
--- /dev/null
+++ b/src/graphnet/data/generators/generator.py
@@ -0,0 +1,63 @@
+"""Base class for I3 generators."""
+
+from abc import ABC, abstractmethod
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+from collections import OrderedDict
+
+
+from graphnet.utilities.imports import has_icecube_package
+from graphnet.utilities.logging import LoggerMixin
+from copy import deepcopy
+
+if has_icecube_package() or TYPE_CHECKING:
+    from icecube import icetray, dataio  # pyright: reportMissingImports=false
+
+
+class Generator(ABC, LoggerMixin):
+    """Base class for generating additional data from Frames.
+
+    All classes inheriting from `Generator` should implement the `__call__`
+    method, and can be applied directly on an OrderedDict generated from an
+    icetray.I3Frame objects to return generated table.
+    """
+
+    def __init__(self, name: str):
+        """Construct Generator.
+
+        Args:
+            name: Name of the `Generator` instance. Used to keep track of the
+                provenance of different data, and to name tables to which this
+                data is saved.
+        """
+        # Member variable(s)
+        self._name: str = name
+
+    @abstractmethod
+    def __call__(self, data: Dict[str, Any]) -> dict:
+        """Return ordered dict with generated features."""
+        pass
+
+    @property
+    def name(self) -> str:
+        """Get name of generator instance."""
+        return self._name
+
+
+class GeneratorCollection(list):
+    """Collection of Generators, for generating additional data from Frames."""
+
+    def __init__(self, *generators: Generator) -> None:
+        """Construct GeneratorCollection."""
+        for generator in generators:
+            assert isinstance(
+                generator, Generator
+            ), "All generators must be of type Generator"
+
+        super().__init__(generators)
+
+    def __call__(self, data: OrderedDict) -> OrderedDict:
+        """Update input dict with generated features."""
+        tmp_data = deepcopy(data)
+        for generator in self:
+            data.update(generator(tmp_data))
+        return data
-- 
GitLab


From 46dfec6b85b7e894ed550a9f6c13d8fcb46ddf70 Mon Sep 17 00:00:00 2001
From: Aske-Rosted <askerosted@gmail.com>
Date: Thu, 16 Feb 2023 13:55:59 +0900
Subject: [PATCH 2/6] adding coarsening generator

---
 .../data/generators/coarsepulsegenerator.py   | 155 ++++++++++++++++++
 1 file changed, 155 insertions(+)
 create mode 100644 src/graphnet/data/generators/coarsepulsegenerator.py

diff --git a/src/graphnet/data/generators/coarsepulsegenerator.py b/src/graphnet/data/generators/coarsepulsegenerator.py
new file mode 100644
index 000000000..55d7565a7
--- /dev/null
+++ b/src/graphnet/data/generators/coarsepulsegenerator.py
@@ -0,0 +1,155 @@
+"""I3Extractor class(es) for extracting specific, reconstructed features."""
+
+from typing import TYPE_CHECKING, Any, Dict, OrderedDict, List, Optional
+from graphnet.data.generators import Generator
+
+from graphnet.utilities.imports import has_icecube_package
+from graphnet.utilities.logging import warn_once
+
+from torch import Tensor, unique
+
+from sklearn.cluster import KMeans
+
+import numpy as np
+
+if has_icecube_package() or TYPE_CHECKING:
+    from icecube import (
+        icetray,
+        dataclasses,
+    )  # pyright: reportMissingImports=false
+
+
+class CoarsePulseGenerator(Generator):
+    def __init__(
+        self,
+        pulsemap: str,
+        name: str,
+        method: str,
+        coarsen_on: Optional[List[str]] = None,
+        keep_columns: Optional[List[str]] = None,
+        reduc: int = 100,
+        min_n: int = 25,
+    ):
+        """Construct CoarsePulsemapGenerator.
+
+        Args:
+            pulsemap: Name of the pulse (series) map for which to extract
+                reconstructed features.
+            name: Name of the `Generator` instance.
+            method: Method to use for coarse pulsemap generation.
+            coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm.
+            keep_columns: List of pulsemap columns to keep in final coarse pulsemap.
+        """
+        # Member variable(s)
+        self._pulsemap = pulsemap
+        # reduction method
+        self._method = method
+        # pseudo-distance to use for coarsening
+        self._coarsen_on = coarsen_on
+        # target reduction factor ie. 1/reduc
+        self.reduc = reduc
+        # minimum number of pulses to keep
+        self.min_n = min_n
+        # columns to keep in final coarse pulsemap
+        self._keep_columns = keep_columns
+
+        # set coarsen_on if not specified
+        if coarsen_on == None:
+            self._coarsen_on = [
+                "dom_x",
+                "dom_y",
+                "dom_z",
+            ]
+
+        # Base class constructor
+        super().__init__(name)
+
+    def __call__(self, data: Dict[str, Any]) -> dict:
+        """Extract reconstructed features from `frame`.
+
+        Args:
+            data: Ordered dictionary generated from physics I3 frame.
+        Returns:
+            data:
+        """
+        # Get pulse series
+        self._pulse_data = data[self._pulsemap]
+        # get feature keys
+        self._pulse_names = self._pulse_data.keys()
+        # Get keep columns
+        if self._keep_columns is None:
+            self._keep_columns = self._pulse_names
+        # Get time index
+        self._time_index = [i for i in self._pulse_data if "time" in i]
+        # Get charge index
+        self._charge_index = [i for i in self._pulse_data if "charge" in i]
+        # Get coarse pulse series
+        coarse_pulse_data = self.get_coarse_pulse_data(self._method)
+        # return coarsened pulse series
+        return OrderedDict(self._pulsemap + "_coarse", coarse_pulse_data)
+
+    def get_coarse_pulse_data(self) -> any:
+        """Get coarse pulse series.
+
+        Args:
+            pulse_data: Pulse series to coarsen.
+        Returns:
+            coarse_pulse_data: Coarsened pulse series.
+        """
+        # Get coarse pulse series
+        data_CP = self._pulse_data.deepcopy()
+        # change time into  distance using speed of light in ice.
+        data_CP[self._time_index] = (
+            data_CP[self._time_index] * 2.3 * 10 ** (-1)
+        )
+        # Take the spatial + time (transformed) values and use those for the coarsening algorithm
+        tensor = Tensor(data_CP[self._coarsen_on + self._time_index]).T
+        reduc = min([len(tensor), reduc])
+        min_n = int(len(tensor) / min_n)
+        # reduce by factor 100 ensuring not to   reduce below min red (unless less dom activations in event)
+        n_clusters = max([reduc, min_n])
+        if self._method == "Kmeans":
+            clusterer = KMeans(
+                n_clusters=n_clusters, random_state=10, init="random", n_init=1
+            )
+        else:
+            raise ValueError("Method not implemented")
+
+        index = clusterer.fit_predict(tensor)
+
+        data_with_group = np.vstack([index, self._pulse_data])
+        data_with_group = data_with_group.T[data_with_group[0, :].argsort()]
+        data_grouped = np.array(
+            np.split(
+                data_with_group[:, 1:],
+                np.unique(data_with_group[:, 0], return_index=True)[1][1:],
+            ),
+            dtype=object,
+        )
+        # mget mean of grouped data and multiply charge by number of pulses in group.
+        for self._pulse_data, i in zip(data_grouped, range(len(data_grouped))):
+            counter = np.shape(self._pulse_data)[0]
+            data_grouped[i] = np.mean(self._pulse_data, axis=0)
+            data_grouped[i][self._charge_index] = (
+                data_grouped[i][self._charge_index] * counter
+            )
+        if len(np.shape(np.array(list(data_grouped)).T)) == 3:
+            data_grouped = data_grouped[:, 0, :]
+
+        result = np.array(list(data_grouped)).T
+        result = OrderedDict(result.T, columns=self._pulse_names)[
+            self._keep_columns
+        ]
+
+        return result
+
+    def get_time_index(self):
+        """Get time index.
+
+        Args:
+            None
+        Returns:
+            time_index: Index of time feature.
+        """
+        time_index = "dom_time"
+        return time_index
-- 
GitLab


From cccfbdab71ceabcaec956704bed15bd74694a6be Mon Sep 17 00:00:00 2001
From: Aske-Rosted <askerosted@gmail.com>
Date: Thu, 16 Feb 2023 17:04:36 +0900
Subject: [PATCH 3/6] Created coarse pulse generator

---
 .../data/generators/coarsepulsegenerator.py   | 103 ++++++++++--------
 1 file changed, 57 insertions(+), 46 deletions(-)

diff --git a/src/graphnet/data/generators/coarsepulsegenerator.py b/src/graphnet/data/generators/coarsepulsegenerator.py
index 55d7565a7..bcb76d1ab 100644
--- a/src/graphnet/data/generators/coarsepulsegenerator.py
+++ b/src/graphnet/data/generators/coarsepulsegenerator.py
@@ -6,11 +6,14 @@ from graphnet.data.generators import Generator
 from graphnet.utilities.imports import has_icecube_package
 from graphnet.utilities.logging import warn_once
 
+from copy import deepcopy
+
 from torch import Tensor, unique
 
 from sklearn.cluster import KMeans
 
 import numpy as np
+import pandas as pd
 
 if has_icecube_package() or TYPE_CHECKING:
     from icecube import (
@@ -20,12 +23,19 @@ if has_icecube_package() or TYPE_CHECKING:
 
 
 class CoarsePulseGenerator(Generator):
+    """Generator for producing coarse pulsemaps from I3PulseSeriesMaps."""
+
     def __init__(
         self,
         pulsemap: str,
         name: str,
         method: str,
-        coarsen_on: Optional[List[str]] = None,
+        coarsen_on: List[str] = [
+            "dom_x",
+            "dom_y",
+            "dom_z",
+            "dom_time",
+        ],
         keep_columns: Optional[List[str]] = None,
         reduc: int = 100,
         min_n: int = 25,
@@ -39,6 +49,8 @@ class CoarsePulseGenerator(Generator):
             method: Method to use for coarse pulsemap generation.
             coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm.
             keep_columns: List of pulsemap columns to keep in final coarse pulsemap.
+            reduc: Target reduction factor for coarse pulsemap.
+            min_n: Minimum number of pulses to keep in coarse pulsemap.
         """
         # Member variable(s)
         self._pulsemap = pulsemap
@@ -47,20 +59,12 @@ class CoarsePulseGenerator(Generator):
         # pseudo-distance to use for coarsening
         self._coarsen_on = coarsen_on
         # target reduction factor ie. 1/reduc
-        self.reduc = reduc
+        self._reduc = reduc
         # minimum number of pulses to keep
-        self.min_n = min_n
+        self._min_n = min_n
         # columns to keep in final coarse pulsemap
         self._keep_columns = keep_columns
 
-        # set coarsen_on if not specified
-        if coarsen_on == None:
-            self._coarsen_on = [
-                "dom_x",
-                "dom_y",
-                "dom_z",
-            ]
-
         # Base class constructor
         super().__init__(name)
 
@@ -69,43 +73,55 @@ class CoarsePulseGenerator(Generator):
 
         Args:
             data: Ordered dictionary generated from physics I3 frame.
+
         Returns:
             data:
         """
         # Get pulse series
         self._pulse_data = data[self._pulsemap]
         # get feature keys
-        self._pulse_names = self._pulse_data.keys()
+        self._pulse_names = list(self._pulse_data.keys())
+
         # Get keep columns
         if self._keep_columns is None:
             self._keep_columns = self._pulse_names
-        # Get time index
-        self._time_index = [i for i in self._pulse_data if "time" in i]
         # Get charge index
-        self._charge_index = [i for i in self._pulse_data if "charge" in i]
+        self._charge_index = [
+            self._pulse_names.index(i)
+            for i in self._pulse_names
+            if "charge" in i
+        ]
         # Get coarse pulse series
-        coarse_pulse_data = self.get_coarse_pulse_data(self._method)
+        coarse_pulse_data = self.get_coarse_pulse_data()
         # return coarsened pulse series
-        return OrderedDict(self._pulsemap + "_coarse", coarse_pulse_data)
+        return {self._name: coarse_pulse_data}
 
-    def get_coarse_pulse_data(self) -> any:
+    def get_coarse_pulse_data(self) -> dict:
         """Get coarse pulse series.
 
         Args:
             pulse_data: Pulse series to coarsen.
+
         Returns:
             coarse_pulse_data: Coarsened pulse series.
         """
+        # get index values of columns to keep.
+        if self._keep_columns is not None:
+            keep_index = [
+                self._pulse_names.index(i) for i in self._keep_columns
+            ]
+
+        data_CP = []
+        for i in self._coarsen_on:
+            data_CP.append(np.array(self._pulse_data[i]))
+        data_CP = np.array(data_CP)
         # Get coarse pulse series
-        data_CP = self._pulse_data.deepcopy()
         # change time into  distance using speed of light in ice.
-        data_CP[self._time_index] = (
-            data_CP[self._time_index] * 2.3 * 10 ** (-1)
-        )
+        data_CP[-1] = data_CP[-1] * 2.3 * 10 ** (-1)
         # Take the spatial + time (transformed) values and use those for the coarsening algorithm
-        tensor = Tensor(data_CP[self._coarsen_on + self._time_index]).T
-        reduc = min([len(tensor), reduc])
-        min_n = int(len(tensor) / min_n)
+        tensor = Tensor(data_CP).T
+        min_n = min([len(tensor), self._min_n])
+        reduc = int(len(tensor) / self._reduc)
         # reduce by factor 100 ensuring not to   reduce below min red (unless less dom activations in event)
         n_clusters = max([reduc, min_n])
         if self._method == "Kmeans":
@@ -116,8 +132,8 @@ class CoarsePulseGenerator(Generator):
             raise ValueError("Method not implemented")
 
         index = clusterer.fit_predict(tensor)
-
-        data_with_group = np.vstack([index, self._pulse_data])
+        pulse_df = pd.DataFrame(self._pulse_data, index=None).T
+        data_with_group = np.vstack([index, pulse_df])
         data_with_group = data_with_group.T[data_with_group[0, :].argsort()]
         data_grouped = np.array(
             np.split(
@@ -126,30 +142,25 @@ class CoarsePulseGenerator(Generator):
             ),
             dtype=object,
         )
+
         # mget mean of grouped data and multiply charge by number of pulses in group.
-        for self._pulse_data, i in zip(data_grouped, range(len(data_grouped))):
-            counter = np.shape(self._pulse_data)[0]
-            data_grouped[i] = np.mean(self._pulse_data, axis=0)
-            data_grouped[i][self._charge_index] = (
-                data_grouped[i][self._charge_index] * counter
+        for data, ind in zip(data_grouped, range(len(data_grouped))):
+            counter = np.shape(data)[0]
+            data_grouped[ind] = np.mean(data, axis=0)
+            data_grouped[ind][self._charge_index] = (
+                data_grouped[ind][self._charge_index] * counter
             )
         if len(np.shape(np.array(list(data_grouped)).T)) == 3:
             data_grouped = data_grouped[:, 0, :]
 
         result = np.array(list(data_grouped)).T
-        result = OrderedDict(result.T, columns=self._pulse_names)[
-            self._keep_columns
-        ]
+        result = dict(
+            zip(
+                list(np.array(self._pulse_names)[keep_index]),
+                result[keep_index],
+            )
+        )
 
+        # update event_no
+        result.update({"event_no": self._pulse_data["event_no"]})
         return result
-
-    def get_time_index(self):
-        """Get time index.
-
-        Args:
-            None
-        Returns:
-            time_index: Index of time feature.
-        """
-        time_index = "dom_time"
-        return time_index
-- 
GitLab


From d79e712527a2457cb61cb4628595ceb36a5a681a Mon Sep 17 00:00:00 2001
From: Aske-Rosted <askerosted@gmail.com>
Date: Mon, 20 Feb 2023 12:50:32 +0900
Subject: [PATCH 4/6] generate table before indices

---
 src/graphnet/data/dataconverter.py | 9 ++++-----
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/src/graphnet/data/dataconverter.py b/src/graphnet/data/dataconverter.py
index 8950e2afb..7560c71b5 100644
--- a/src/graphnet/data/dataconverter.py
+++ b/src/graphnet/data/dataconverter.py
@@ -456,6 +456,10 @@ class DataConverter(ABC, LoggerMixin):
                 if isinstance(extractor, I3GenericExtractor):
                     data_dict.update(data_dict.pop(extractor._name))
 
+            if self._generators:
+                data_dict = self._generators(data_dict)
+
+            data.append(data_dict)
             # Get new, unique index and increment value
             if multi_processing:
                 with global_index.get_lock():  # type: ignore[name-defined]
@@ -469,11 +473,6 @@ class DataConverter(ABC, LoggerMixin):
             for table in data_dict.keys():
                 data_dict[table][self._index_column] = index
 
-            if self._generators:
-                data_dict = self._generators(data_dict)
-
-            data.append(data_dict)
-
         return data
 
     def get_map_function(
-- 
GitLab


From 5cb5efb3c2e2d3481270225581e2ed7339ccb7f2 Mon Sep 17 00:00:00 2001
From: Aske-Rosted <askerosted@gmail.com>
Date: Tue, 21 Feb 2023 12:59:46 +0900
Subject: [PATCH 5/6] bugfixing and tidying

---
 src/graphnet/data/dataconverter.py            |  3 +-
 .../data/generators/coarsepulsegenerator.py   | 37 ++++++++++++-------
 2 files changed, 25 insertions(+), 15 deletions(-)

diff --git a/src/graphnet/data/dataconverter.py b/src/graphnet/data/dataconverter.py
index 7560c71b5..0c65714ec 100644
--- a/src/graphnet/data/dataconverter.py
+++ b/src/graphnet/data/dataconverter.py
@@ -459,7 +459,6 @@ class DataConverter(ABC, LoggerMixin):
             if self._generators:
                 data_dict = self._generators(data_dict)
 
-            data.append(data_dict)
             # Get new, unique index and increment value
             if multi_processing:
                 with global_index.get_lock():  # type: ignore[name-defined]
@@ -473,6 +472,8 @@ class DataConverter(ABC, LoggerMixin):
             for table in data_dict.keys():
                 data_dict[table][self._index_column] = index
 
+            data.append(data_dict)
+
         return data
 
     def get_map_function(
diff --git a/src/graphnet/data/generators/coarsepulsegenerator.py b/src/graphnet/data/generators/coarsepulsegenerator.py
index bcb76d1ab..5dd41e175 100644
--- a/src/graphnet/data/generators/coarsepulsegenerator.py
+++ b/src/graphnet/data/generators/coarsepulsegenerator.py
@@ -34,8 +34,8 @@ class CoarsePulseGenerator(Generator):
             "dom_x",
             "dom_y",
             "dom_z",
-            "dom_time",
         ],
+        time_label: str = "dom_time",
         keep_columns: Optional[List[str]] = None,
         reduc: int = 100,
         min_n: int = 25,
@@ -48,6 +48,7 @@ class CoarsePulseGenerator(Generator):
             name: Name of the `Generator` instance.
             method: Method to use for coarse pulsemap generation.
             coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm.
+            time_label: Name of the time column in the pulsemap.
             keep_columns: List of pulsemap columns to keep in final coarse pulsemap.
             reduc: Target reduction factor for coarse pulsemap.
             min_n: Minimum number of pulses to keep in coarse pulsemap.
@@ -56,8 +57,8 @@ class CoarsePulseGenerator(Generator):
         self._pulsemap = pulsemap
         # reduction method
         self._method = method
-        # pseudo-distance to use for coarsening
-        self._coarsen_on = coarsen_on
+        # pseudo-distance to use for coarsening force time to be included at end.
+        self._coarsen_on = coarsen_on + [time_label]
         # target reduction factor ie. 1/reduc
         self._reduc = reduc
         # minimum number of pulses to keep
@@ -124,14 +125,21 @@ class CoarsePulseGenerator(Generator):
         reduc = int(len(tensor) / self._reduc)
         # reduce by factor 100 ensuring not to   reduce below min red (unless less dom activations in event)
         n_clusters = max([reduc, min_n])
-        if self._method == "Kmeans":
-            clusterer = KMeans(
-                n_clusters=n_clusters, random_state=10, init="random", n_init=1
-            )
-        else:
-            raise ValueError("Method not implemented")
+        if len(tensor) > self._min_n:
+            if self._method == "Kmeans":
+                clusterer = KMeans(
+                    n_clusters=n_clusters,
+                    random_state=10,
+                    init="random",
+                    n_init=1,
+                )
+            else:
+                raise ValueError("Method not implemented")
+
+            index = clusterer.fit_predict(tensor)
+        else:  # if less dom activations than clusters, just return the doms
+            index = np.arange(len(tensor))
 
-        index = clusterer.fit_predict(tensor)
         pulse_df = pd.DataFrame(self._pulse_data, index=None).T
         data_with_group = np.vstack([index, pulse_df])
         data_with_group = data_with_group.T[data_with_group[0, :].argsort()]
@@ -143,7 +151,7 @@ class CoarsePulseGenerator(Generator):
             dtype=object,
         )
 
-        # mget mean of grouped data and multiply charge by number of pulses in group.
+        # get mean of grouped data and multiply charge by number of pulses in group.
         for data, ind in zip(data_grouped, range(len(data_grouped))):
             counter = np.shape(data)[0]
             data_grouped[ind] = np.mean(data, axis=0)
@@ -154,13 +162,14 @@ class CoarsePulseGenerator(Generator):
             data_grouped = data_grouped[:, 0, :]
 
         result = np.array(list(data_grouped)).T
+        # turn the np array of np arrays into a list of lists
+        result = [list(i) for i in result[keep_index]]
+        # write to dict
         result = dict(
             zip(
                 list(np.array(self._pulse_names)[keep_index]),
-                result[keep_index],
+                result,
             )
         )
 
-        # update event_no
-        result.update({"event_no": self._pulse_data["event_no"]})
         return result
-- 
GitLab


From 9559c07da22683f00c1b3d1781c30bad09271b33 Mon Sep 17 00:00:00 2001
From: Aske-Rosted <askerosted@gmail.com>
Date: Wed, 22 Feb 2023 16:34:07 +0900
Subject: [PATCH 6/6] reorganizing and cleaning

---
 .../data/generators/coarsepulsegenerator.py   | 197 ++++++++++++------
 1 file changed, 129 insertions(+), 68 deletions(-)

diff --git a/src/graphnet/data/generators/coarsepulsegenerator.py b/src/graphnet/data/generators/coarsepulsegenerator.py
index 5dd41e175..ffb4daec8 100644
--- a/src/graphnet/data/generators/coarsepulsegenerator.py
+++ b/src/graphnet/data/generators/coarsepulsegenerator.py
@@ -1,14 +1,11 @@
 """I3Extractor class(es) for extracting specific, reconstructed features."""
 
-from typing import TYPE_CHECKING, Any, Dict, OrderedDict, List, Optional
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
 from graphnet.data.generators import Generator
 
 from graphnet.utilities.imports import has_icecube_package
-from graphnet.utilities.logging import warn_once
 
-from copy import deepcopy
-
-from torch import Tensor, unique
+from torch import Tensor
 
 from sklearn.cluster import KMeans
 
@@ -37,7 +34,7 @@ class CoarsePulseGenerator(Generator):
         ],
         time_label: str = "dom_time",
         keep_columns: Optional[List[str]] = None,
-        reduc: int = 100,
+        reduce: int = 100,
         min_n: int = 25,
     ):
         """Construct CoarsePulsemapGenerator.
@@ -50,7 +47,7 @@ class CoarsePulseGenerator(Generator):
             coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm.
             time_label: Name of the time column in the pulsemap.
             keep_columns: List of pulsemap columns to keep in final coarse pulsemap.
-            reduc: Target reduction factor for coarse pulsemap.
+            reduce: Target reduction factor for coarse pulsemap.
             min_n: Minimum number of pulses to keep in coarse pulsemap.
         """
         # Member variable(s)
@@ -59,8 +56,8 @@ class CoarsePulseGenerator(Generator):
         self._method = method
         # pseudo-distance to use for coarsening force time to be included at end.
         self._coarsen_on = coarsen_on + [time_label]
-        # target reduction factor ie. 1/reduc
-        self._reduc = reduc
+        # target reduction factor ie. 1/reduce
+        self._reduce = reduce
         # minimum number of pulses to keep
         self._min_n = min_n
         # columns to keep in final coarse pulsemap
@@ -100,76 +97,140 @@ class CoarsePulseGenerator(Generator):
     def get_coarse_pulse_data(self) -> dict:
         """Get coarse pulse series.
 
-        Args:
-            pulse_data: Pulse series to coarsen.
-
         Returns:
             coarse_pulse_data: Coarsened pulse series.
         """
-        # get index values of columns to keep.
-        if self._keep_columns is not None:
-            keep_index = [
-                self._pulse_names.index(i) for i in self._keep_columns
-            ]
-
-        data_CP = []
-        for i in self._coarsen_on:
-            data_CP.append(np.array(self._pulse_data[i]))
-        data_CP = np.array(data_CP)
-        # Get coarse pulse series
-        # change time into  distance using speed of light in ice.
-        data_CP[-1] = data_CP[-1] * 2.3 * 10 ** (-1)
-        # Take the spatial + time (transformed) values and use those for the coarsening algorithm
-        tensor = Tensor(data_CP).T
-        min_n = min([len(tensor), self._min_n])
-        reduc = int(len(tensor) / self._reduc)
-        # reduce by factor 100 ensuring not to   reduce below min red (unless less dom activations in event)
-        n_clusters = max([reduc, min_n])
-        if len(tensor) > self._min_n:
-            if self._method == "Kmeans":
-                clusterer = KMeans(
-                    n_clusters=n_clusters,
-                    random_state=10,
-                    init="random",
-                    n_init=1,
-                )
-            else:
-                raise ValueError("Method not implemented")
-
-            index = clusterer.fit_predict(tensor)
-        else:  # if less dom activations than clusters, just return the doms
-            index = np.arange(len(tensor))
-
-        pulse_df = pd.DataFrame(self._pulse_data, index=None).T
-        data_with_group = np.vstack([index, pulse_df])
-        data_with_group = data_with_group.T[data_with_group[0, :].argsort()]
-        data_grouped = np.array(
-            np.split(
-                data_with_group[:, 1:],
-                np.unique(data_with_group[:, 0], return_index=True)[1][1:],
-            ),
-            dtype=object,
+        # get index values for grouping
+        index = coarsening_index(
+            pulse_data=self._pulse_data,
+            coarsen_on=self._coarsen_on,
+            reduce=self._reduce,
+            min_n=self._min_n,
+            method=self._method,
+        )
+        # group pulses by index
+        coarse_pulse_data = group_by_index(
+            pulse_data=self._pulse_data,
+            index=index,
+            pulse_names=self._pulse_names,
+            charge_index=self._charge_index,
+            keep_columns=self._keep_columns,
         )
 
-        # get mean of grouped data and multiply charge by number of pulses in group.
-        for data, ind in zip(data_grouped, range(len(data_grouped))):
-            counter = np.shape(data)[0]
-            data_grouped[ind] = np.mean(data, axis=0)
-            data_grouped[ind][self._charge_index] = (
-                data_grouped[ind][self._charge_index] * counter
+        return coarse_pulse_data
+
+
+def coarsening_index(
+    pulse_data: dict,
+    coarsen_on: List[str],
+    reduce: int,
+    min_n: int,
+    method: str,
+) -> np.array:
+    """Get coarsening index.
+
+    Args:
+        pulse_data: Pulse series to coarsen.
+        coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm, time assumed included as last entry.
+        reduce: Target reduction factor for coarse pulsemap.
+        min_n: Minimum number of pulses to keep in coarse pulsemap.
+        method: Method to use for coarse pulsemap generation.
+
+    Returns:
+        index: Index list for grouping.
+    """
+    data = []
+    for i in coarsen_on:
+        data.append(np.array(pulse_data[i]))
+    data = np.array(data)
+    # Get coarse pulse series
+    # change time into  distance using speed of light in ice.
+    data[-1] = data[-1] * 2.3 * 10 ** (-1)
+    # Take the spatial + time (transformed) values and use those for the coarsening algorithm
+    tensor = Tensor(data).T
+    min_n = min([len(tensor), min_n])
+    reduce = int(len(tensor) / reduce)
+    # reduce by factor 100 ensuring not to   reduce below min red (unless less dom activations in event)
+    n_clusters = max([reduce, min_n])
+    if len(tensor) > min_n:
+        if method == "Kmeans":
+            clusterer = KMeans(
+                n_clusters=n_clusters,
+                random_state=10,
+                init="random",
+                n_init=1,
+            )
+        else:
+            raise ValueError("Method not implemented")
+
+        index = clusterer.fit_predict(tensor)
+    else:  # if less dom activations than clusters, just return the doms
+        index = np.arange(len(tensor))
+
+    return index
+
+
+def group_by_index(
+    pulse_data: dict,
+    index: List[int],
+    pulse_names: List[str],
+    charge_index: Optional[List[int]] = None,
+    keep_columns: Optional[List[str]] = None,
+) -> dict:
+    """Group pulses by given grouping index.
+
+    Args:
+        pulse_data: Pulse series to group.
+        index: Index list for grouping.
+        pulse_names: List of pulsemap columns.
+        charge_index: Index of charge column.
+        keep_columns: List of pulsemap columns to keep in final coarse pulsemap.
+
+    Returns:
+        result: Pulsemap grouped by input index.
+    """
+    pulse_df = pd.DataFrame(pulse_data, index=None).T
+    data_with_group = np.vstack([index, pulse_df])
+    data_with_group = data_with_group.T[data_with_group[0, :].argsort()]
+    data_grouped = np.array(
+        np.split(
+            data_with_group[:, 1:],
+            np.unique(data_with_group[:, 0], return_index=True)[1][1:],
+        ),
+        dtype=object,
+    )
+
+    # get mean of grouped data and multiply charge by number of pulses in group.
+    for data, ind in zip(data_grouped, range(len(data_grouped))):
+        counter = np.shape(data)[0]
+        data_grouped[ind] = np.mean(data, axis=0)
+        if charge_index is not None:
+            data_grouped[ind][charge_index] = (
+                data_grouped[ind][charge_index] * counter
             )
-        if len(np.shape(np.array(list(data_grouped)).T)) == 3:
-            data_grouped = data_grouped[:, 0, :]
+    if len(np.shape(np.array(list(data_grouped)).T)) == 3:
+        data_grouped = data_grouped[:, 0, :]
 
-        result = np.array(list(data_grouped)).T
-        # turn the np array of np arrays into a list of lists
+    result = np.array(list(data_grouped)).T
+    # turn the np array of np arrays into a list of lists
+
+    # get index values of columns to keep, and keep only those columns.
+    if keep_columns is not None:
+        keep_index = [pulse_names.index(i) for i in keep_columns]
         result = [list(i) for i in result[keep_index]]
-        # write to dict
         result = dict(
             zip(
-                list(np.array(self._pulse_names)[keep_index]),
+                list(np.array(pulse_names)[keep_index]),
+                result,
+            )
+        )
+    else:  # if no keep columns specified, keep all columns.
+        result = [list(i) for i in result]
+        result = dict(
+            zip(
+                list(pulse_names),
                 result,
             )
         )
 
-        return result
+    return result
-- 
GitLab