diff --git a/src/graphnet/data/dataconverter.py b/src/graphnet/data/dataconverter.py
index f691afb4bb31e86ffa4568cde593a26e5d901feb..99bfebac610d60dc867a0e90ce27892456550d43 100644
--- a/src/graphnet/data/dataconverter.py
+++ b/src/graphnet/data/dataconverter.py
@@ -35,6 +35,11 @@ from graphnet.data.extractors import (
     I3TruthExtractor,
     I3GenericExtractor,
 )
+from graphnet.data.generators import (
+    Generator,
+    GeneratorCollection,
+)
+
 from graphnet.utilities.decorators import final
 from graphnet.utilities.filesys import find_i3_files
 from graphnet.utilities.imports import has_icecube_package
@@ -101,6 +106,7 @@ class DataConverter(ABC, LoggerMixin):
         outdir: str,
         gcd_rescue: Optional[str] = None,
         *,
+        generators: Optional[List[Generator]] = None,
         nb_files_to_batch: Optional[int] = None,
         sequential_batch_pattern: Optional[str] = None,
         input_file_batch_pattern: Optional[str] = None,
@@ -169,6 +175,8 @@ class DataConverter(ABC, LoggerMixin):
 
         # Create I3Extractors
         self._extractors = I3ExtractorCollection(*extractors)
+        if generators is not None:
+            self._generators = GeneratorCollection(*generators)
 
         # Create shorthand of names of all pulsemaps queried
         self._table_names = [extractor.name for extractor in self._extractors]
@@ -444,6 +452,9 @@ class DataConverter(ABC, LoggerMixin):
                 if isinstance(extractor, I3GenericExtractor):
                     data_dict.update(data_dict.pop(extractor._name))
 
+            if self._generators:
+                data_dict = self._generators(data_dict)
+
             # Get new, unique index and increment value
             if multi_processing:
                 with global_index.get_lock():  # type: ignore[name-defined]
diff --git a/src/graphnet/data/generators/__init__.py b/src/graphnet/data/generators/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..3986334f79c2d57fa241bbf2c91bddc35fad6af0
--- /dev/null
+++ b/src/graphnet/data/generators/__init__.py
@@ -0,0 +1,5 @@
+"""Collection of I3Generators, for generating additional data from I3Frames."""
+
+from .generator import Generator, GeneratorCollection
+
+from .coarsepulsegenerator import CoarsePulseGenerator
diff --git a/src/graphnet/data/generators/coarsepulsegenerator.py b/src/graphnet/data/generators/coarsepulsegenerator.py
new file mode 100644
index 0000000000000000000000000000000000000000..ffb4daec8f9d907fe3242d35ac6199a058f042fc
--- /dev/null
+++ b/src/graphnet/data/generators/coarsepulsegenerator.py
@@ -0,0 +1,236 @@
+"""I3Extractor class(es) for extracting specific, reconstructed features."""
+
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+from graphnet.data.generators import Generator
+
+from graphnet.utilities.imports import has_icecube_package
+
+from torch import Tensor
+
+from sklearn.cluster import KMeans
+
+import numpy as np
+import pandas as pd
+
+if has_icecube_package() or TYPE_CHECKING:
+    from icecube import (
+        icetray,
+        dataclasses,
+    )  # pyright: reportMissingImports=false
+
+
+class CoarsePulseGenerator(Generator):
+    """Generator for producing coarse pulsemaps from I3PulseSeriesMaps."""
+
+    def __init__(
+        self,
+        pulsemap: str,
+        name: str,
+        method: str,
+        coarsen_on: List[str] = [
+            "dom_x",
+            "dom_y",
+            "dom_z",
+        ],
+        time_label: str = "dom_time",
+        keep_columns: Optional[List[str]] = None,
+        reduce: int = 100,
+        min_n: int = 25,
+    ):
+        """Construct CoarsePulsemapGenerator.
+
+        Args:
+            pulsemap: Name of the pulse (series) map for which to extract
+                reconstructed features.
+            name: Name of the `Generator` instance.
+            method: Method to use for coarse pulsemap generation.
+            coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm.
+            time_label: Name of the time column in the pulsemap.
+            keep_columns: List of pulsemap columns to keep in final coarse pulsemap.
+            reduce: Target reduction factor for coarse pulsemap.
+            min_n: Minimum number of pulses to keep in coarse pulsemap.
+        """
+        # Member variable(s)
+        self._pulsemap = pulsemap
+        # reduction method
+        self._method = method
+        # pseudo-distance to use for coarsening force time to be included at end.
+        self._coarsen_on = coarsen_on + [time_label]
+        # target reduction factor ie. 1/reduce
+        self._reduce = reduce
+        # minimum number of pulses to keep
+        self._min_n = min_n
+        # columns to keep in final coarse pulsemap
+        self._keep_columns = keep_columns
+
+        # Base class constructor
+        super().__init__(name)
+
+    def __call__(self, data: Dict[str, Any]) -> dict:
+        """Extract reconstructed features from `frame`.
+
+        Args:
+            data: Ordered dictionary generated from physics I3 frame.
+
+        Returns:
+            data:
+        """
+        # Get pulse series
+        self._pulse_data = data[self._pulsemap]
+        # get feature keys
+        self._pulse_names = list(self._pulse_data.keys())
+
+        # Get keep columns
+        if self._keep_columns is None:
+            self._keep_columns = self._pulse_names
+        # Get charge index
+        self._charge_index = [
+            self._pulse_names.index(i)
+            for i in self._pulse_names
+            if "charge" in i
+        ]
+        # Get coarse pulse series
+        coarse_pulse_data = self.get_coarse_pulse_data()
+        # return coarsened pulse series
+        return {self._name: coarse_pulse_data}
+
+    def get_coarse_pulse_data(self) -> dict:
+        """Get coarse pulse series.
+
+        Returns:
+            coarse_pulse_data: Coarsened pulse series.
+        """
+        # get index values for grouping
+        index = coarsening_index(
+            pulse_data=self._pulse_data,
+            coarsen_on=self._coarsen_on,
+            reduce=self._reduce,
+            min_n=self._min_n,
+            method=self._method,
+        )
+        # group pulses by index
+        coarse_pulse_data = group_by_index(
+            pulse_data=self._pulse_data,
+            index=index,
+            pulse_names=self._pulse_names,
+            charge_index=self._charge_index,
+            keep_columns=self._keep_columns,
+        )
+
+        return coarse_pulse_data
+
+
+def coarsening_index(
+    pulse_data: dict,
+    coarsen_on: List[str],
+    reduce: int,
+    min_n: int,
+    method: str,
+) -> np.array:
+    """Get coarsening index.
+
+    Args:
+        pulse_data: Pulse series to coarsen.
+        coarsen_on: List of pulsemap columns to use for pseudo-distance used by coarsening algorithm, time assumed included as last entry.
+        reduce: Target reduction factor for coarse pulsemap.
+        min_n: Minimum number of pulses to keep in coarse pulsemap.
+        method: Method to use for coarse pulsemap generation.
+
+    Returns:
+        index: Index list for grouping.
+    """
+    data = []
+    for i in coarsen_on:
+        data.append(np.array(pulse_data[i]))
+    data = np.array(data)
+    # Get coarse pulse series
+    # change time into  distance using speed of light in ice.
+    data[-1] = data[-1] * 2.3 * 10 ** (-1)
+    # Take the spatial + time (transformed) values and use those for the coarsening algorithm
+    tensor = Tensor(data).T
+    min_n = min([len(tensor), min_n])
+    reduce = int(len(tensor) / reduce)
+    # reduce by factor 100 ensuring not to   reduce below min red (unless less dom activations in event)
+    n_clusters = max([reduce, min_n])
+    if len(tensor) > min_n:
+        if method == "Kmeans":
+            clusterer = KMeans(
+                n_clusters=n_clusters,
+                random_state=10,
+                init="random",
+                n_init=1,
+            )
+        else:
+            raise ValueError("Method not implemented")
+
+        index = clusterer.fit_predict(tensor)
+    else:  # if less dom activations than clusters, just return the doms
+        index = np.arange(len(tensor))
+
+    return index
+
+
+def group_by_index(
+    pulse_data: dict,
+    index: List[int],
+    pulse_names: List[str],
+    charge_index: Optional[List[int]] = None,
+    keep_columns: Optional[List[str]] = None,
+) -> dict:
+    """Group pulses by given grouping index.
+
+    Args:
+        pulse_data: Pulse series to group.
+        index: Index list for grouping.
+        pulse_names: List of pulsemap columns.
+        charge_index: Index of charge column.
+        keep_columns: List of pulsemap columns to keep in final coarse pulsemap.
+
+    Returns:
+        result: Pulsemap grouped by input index.
+    """
+    pulse_df = pd.DataFrame(pulse_data, index=None).T
+    data_with_group = np.vstack([index, pulse_df])
+    data_with_group = data_with_group.T[data_with_group[0, :].argsort()]
+    data_grouped = np.array(
+        np.split(
+            data_with_group[:, 1:],
+            np.unique(data_with_group[:, 0], return_index=True)[1][1:],
+        ),
+        dtype=object,
+    )
+
+    # get mean of grouped data and multiply charge by number of pulses in group.
+    for data, ind in zip(data_grouped, range(len(data_grouped))):
+        counter = np.shape(data)[0]
+        data_grouped[ind] = np.mean(data, axis=0)
+        if charge_index is not None:
+            data_grouped[ind][charge_index] = (
+                data_grouped[ind][charge_index] * counter
+            )
+    if len(np.shape(np.array(list(data_grouped)).T)) == 3:
+        data_grouped = data_grouped[:, 0, :]
+
+    result = np.array(list(data_grouped)).T
+    # turn the np array of np arrays into a list of lists
+
+    # get index values of columns to keep, and keep only those columns.
+    if keep_columns is not None:
+        keep_index = [pulse_names.index(i) for i in keep_columns]
+        result = [list(i) for i in result[keep_index]]
+        result = dict(
+            zip(
+                list(np.array(pulse_names)[keep_index]),
+                result,
+            )
+        )
+    else:  # if no keep columns specified, keep all columns.
+        result = [list(i) for i in result]
+        result = dict(
+            zip(
+                list(pulse_names),
+                result,
+            )
+        )
+
+    return result
diff --git a/src/graphnet/data/generators/generator.py b/src/graphnet/data/generators/generator.py
new file mode 100644
index 0000000000000000000000000000000000000000..fcb6b35ef6ef8764023b3d4a304c5149c92828bf
--- /dev/null
+++ b/src/graphnet/data/generators/generator.py
@@ -0,0 +1,63 @@
+"""Base class for I3 generators."""
+
+from abc import ABC, abstractmethod
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+from collections import OrderedDict
+
+
+from graphnet.utilities.imports import has_icecube_package
+from graphnet.utilities.logging import LoggerMixin
+from copy import deepcopy
+
+if has_icecube_package() or TYPE_CHECKING:
+    from icecube import icetray, dataio  # pyright: reportMissingImports=false
+
+
+class Generator(ABC, LoggerMixin):
+    """Base class for generating additional data from Frames.
+
+    All classes inheriting from `Generator` should implement the `__call__`
+    method, and can be applied directly on an OrderedDict generated from an
+    icetray.I3Frame objects to return generated table.
+    """
+
+    def __init__(self, name: str):
+        """Construct Generator.
+
+        Args:
+            name: Name of the `Generator` instance. Used to keep track of the
+                provenance of different data, and to name tables to which this
+                data is saved.
+        """
+        # Member variable(s)
+        self._name: str = name
+
+    @abstractmethod
+    def __call__(self, data: Dict[str, Any]) -> dict:
+        """Return ordered dict with generated features."""
+        pass
+
+    @property
+    def name(self) -> str:
+        """Get name of generator instance."""
+        return self._name
+
+
+class GeneratorCollection(list):
+    """Collection of Generators, for generating additional data from Frames."""
+
+    def __init__(self, *generators: Generator) -> None:
+        """Construct GeneratorCollection."""
+        for generator in generators:
+            assert isinstance(
+                generator, Generator
+            ), "All generators must be of type Generator"
+
+        super().__init__(generators)
+
+    def __call__(self, data: OrderedDict) -> OrderedDict:
+        """Update input dict with generated features."""
+        tmp_data = deepcopy(data)
+        for generator in self:
+            data.update(generator(tmp_data))
+        return data