Skip to content
Snippets Groups Projects
test_modules.py 13.7 KiB
Newer Older
Stefan Reck's avatar
Stefan Reck committed
from unittest import TestCase
import numpy as np
import orcasong.modules as modules
from km3pipe.dataclasses import Table


__author__ = 'Stefan Reck'


class TestModules(TestCase):
    def test_mc_info_maker(self):
        """ Test the mcinfo maker on some dummy data. """
        def mc_info_extr(blob):
            hits = blob["Hits"]
            return {"dom_id_0": hits.dom_id[0],
                    "time_2": hits.time[2]}

        in_blob = {
            "Hits": Table({
                'dom_id': [2, 3, 3],
                'channel_id': [0, 1, 2],
                'time': [10.1, 11.2, 12.3]
            })
        }
        module = modules.McInfoMaker(
            mc_info_extr=mc_info_extr, store_as="test")
        out_blob = module.process(in_blob)

        self.assertSequenceEqual(list(out_blob.keys()), ["Hits", "test"])
        self.assertSequenceEqual(list(out_blob["test"].dtype.names),
                                 ('dom_id_0', 'time_2'))
        np.testing.assert_array_equal(out_blob["test"]["dom_id_0"],
Stefan Reck's avatar
Stefan Reck committed
                                      np.array([2, ], dtype="float64"))
Stefan Reck's avatar
Stefan Reck committed
        np.testing.assert_array_equal(out_blob["test"]["time_2"],
Stefan Reck's avatar
Stefan Reck committed
                                      np.array([12.3, ], dtype="float64"))

    def test_mc_info_maker_dtype(self):
        """ Test the mcinfo maker on some dummy data. """
        def mc_info_extr(blob):
            hits = blob["Hits"]
            return {"dom_id_0": hits.dom_id[0],
                    "time_2": hits.time[2]}

        in_blob = {
            "Hits": Table({
                'dom_id': np.array([2, 3, 3], dtype="int8"),
                'time': np.array([10.1, 11.2, 12.3], dtype="float32"),
            })
        }
        module = modules.McInfoMaker(
            mc_info_extr=mc_info_extr, store_as="test", to_float64=False)
        out_blob = module.process(in_blob)

        np.testing.assert_array_equal(
            out_blob["test"]["dom_id_0"], np.array([2, ], dtype="int8"))
        np.testing.assert_array_equal(
            out_blob["test"]["time_2"], np.array([12.3, ], dtype="float32"))
Stefan Reck's avatar
Stefan Reck committed

    def test_event_skipper(self):
        def event_skipper(blob):
Stefan Reck's avatar
Stefan Reck committed
            # skip if true
            return blob["a"] == 42
Stefan Reck's avatar
Stefan Reck committed

        module = modules.EventSkipper(event_skipper=event_skipper)

Stefan Reck's avatar
Stefan Reck committed
        self.assertEqual(module.process({"a": 42}), None)
        self.assertEqual(module.process({"a": 25}), {"a": 25})
Stefan Reck's avatar
Stefan Reck committed


class TestTimePreproc(TestCase):
    def setUp(self):
        self.in_blob = {
            "Hits": Table({
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            })
        }

        self.in_blob_mc = {
            "Hits": Table({
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            }),
            "McHits": Table({
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            })
        }

    def test_time_preproc_t0(self):
        module = modules.TimePreproc(
            add_t0=True, center_time=False)

        target = {
            "Hits": Table({
                'time': [1.1, 2.2, 3.3],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            })
        }

        out_blob = module.process(self.in_blob)

        self.assertSetEqual(set(out_blob.keys()), set(target.keys()))
        np.testing.assert_array_equal(np.array(out_blob["Hits"]),
                                      np.array(target["Hits"]))

    def test_time_preproc_center(self):
        module = modules.TimePreproc(
            add_t0=False, center_time=True)

        target = {
            "Hits": Table({
                'time': [-1., 0., 1.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            })
        }

        out_blob = module.process(self.in_blob)

        self.assertSetEqual(set(out_blob.keys()), set(target.keys()))
        np.testing.assert_array_equal(np.array(out_blob["Hits"]),
                                      np.array(target["Hits"]))

    def test_time_preproc_t0_and_center(self):
        module = modules.TimePreproc(
            add_t0=True, center_time=True)

        target = {
            "Hits": Table({
                'time': [-1.1, 0., 1.1],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            })
        }

        out_blob = module.process(self.in_blob)

        self.assertSetEqual(set(out_blob.keys()), set(target.keys()))
        np.testing.assert_array_almost_equal(
            np.array(out_blob["Hits"].view("<f8")),
            np.array(target["Hits"].view("<f8")))

    def test_time_preproc_mchits_t0_and_center(self):
        module = modules.TimePreproc(
            add_t0=True, center_time=True)

        target = {
            "Hits": Table({
                'time': [-1.1, 0., 1.1],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            }),
            "McHits": Table({
                'time': [-1.1, 0., 1.1],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            }),
        }
        out_blob = module.process(self.in_blob_mc)

        self.assertSetEqual(set(out_blob.keys()), set(target.keys()))
        np.testing.assert_array_almost_equal(
            np.array(out_blob["McHits"].view("<f8")),
            np.array(target["McHits"].view("<f8")))


Stefan Reck's avatar
Stefan Reck committed
class TestPointMaker(TestCase):
    def setUp(self):
        self.input_blob_1 = {
            "Hits": Table({
                "x": [4, 5, 6],
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],}),
            "EventInfo": Table({
                "pad": 1.
            })
        }

    def test_default_settings(self):
Stefan Reck's avatar
Stefan Reck committed
        pm = modules.PointMaker(
            max_n_hits=4)
        result = pm.process(self.input_blob_1)["samples"]
        self.assertTupleEqual(
            pm.finish()["hit_infos"], ("t0", "time", "x", "is_valid"))
Stefan Reck's avatar
Stefan Reck committed
        target = np.array(
            [[[0.1, 1, 4, 1],
              [0.2, 2, 5, 1],
              [0.3, 3, 6, 1],
              [0,   0, 0, 0]]], dtype="float32")
        np.testing.assert_array_equal(result, target)

    def test_input_blob_1(self):
Stefan Reck's avatar
Stefan Reck committed
        pm = modules.PointMaker(
Stefan Reck's avatar
Stefan Reck committed
            max_n_hits=4,
            hit_infos=("x", "time"),
            time_window=None,
            dset_n_hits=None,
Stefan Reck's avatar
Stefan Reck committed
        )
        result = pm.process(self.input_blob_1)["samples"]
        self.assertTupleEqual(
            pm.finish()["hit_infos"], ("x", "time", "is_valid"))
Stefan Reck's avatar
Stefan Reck committed
        target = np.array(
            [[[4, 1, 1],
              [5, 2, 1],
              [6, 3, 1],
              [0, 0, 0]]], dtype="float32")
        np.testing.assert_array_equal(result, target)

    def test_input_blob_1_max_n_hits(self):
        input_blob_long = {
            "Hits": Table({
                "x": np.random.rand(1000).astype("float32"),
        })}
        result = modules.PointMaker(
            max_n_hits=10,
            hit_infos=("x",),
            time_window=None,
            dset_n_hits=None,
        ).process(input_blob_long)["samples"]

        self.assertSequenceEqual(result.shape, (1, 10, 2))
        self.assertTrue(all(
            np.isin(result[0, :, 0], input_blob_long["Hits"]["x"])))

    def test_input_blob_time_window(self):
        result = modules.PointMaker(
            max_n_hits=4,
            hit_infos=("x", "time"),
            time_window=[1, 2],
            dset_n_hits=None,
        ).process(self.input_blob_1)["samples"]
        target = np.array(
            [[[4, 1, 1],
              [5, 2, 1],
              [0, 0, 0],
              [0, 0, 0]]], dtype="float32")
        np.testing.assert_array_equal(result, target)

    def test_input_blob_time_window_nhits(self):
        result = modules.PointMaker(
            max_n_hits=4,
            hit_infos=("x", "time"),
            time_window=[1, 2],
            dset_n_hits="EventInfo",
        ).process(self.input_blob_1)["EventInfo"]
        print(result)
        self.assertEqual(result["n_hits_intime"], 2)


Stefan Reck's avatar
Stefan Reck committed
class TestImageMaker(TestCase):
    def test_2d_xt_binning(self):
        # (3 x 2) x-t binning
        bin_edges_list = [
            ["x", [3.5, 4.5, 5.5, 6.5]],
            ["time", [0.5, 2, 3.5]]
        ]

Stefan Reck's avatar
Stefan Reck committed
        module = modules.ImageMaker(bin_edges_list=bin_edges_list)
Stefan Reck's avatar
Stefan Reck committed
        in_blob = {
            "Hits": Table({
                "x": [4, 5, 6],
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            })
        }

        target = {
            "Hits": Table({
                "x": [4, 5, 6],
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            }),
Stefan Reck's avatar
Stefan Reck committed
            "samples": np.array([[
Stefan Reck's avatar
Stefan Reck committed
                [1, 0],
                [0, 1],
                [0, 1],
            ]])
        }

        out_blob = module.process(in_blob)
        self.assertSetEqual(set(out_blob.keys()), set(target.keys()))
        np.testing.assert_array_almost_equal(
            np.array(out_blob["Hits"].view("<f8")),
            np.array(target["Hits"].view("<f8")))
        np.testing.assert_array_almost_equal(
Stefan Reck's avatar
Stefan Reck committed
            np.array(out_blob["samples"]),
            np.array(target["samples"]))
Stefan Reck's avatar
Stefan Reck committed

    def test_unknown_field(self):
        # (3 x 2) x-t binning
        bin_edges_list = [
            ["aggg", [3.5, 4.5, 5.5, 6.5]],
            ["time", [0.5, 2, 3.5]]
        ]

        module = modules.ImageMaker(
Stefan Reck's avatar
Stefan Reck committed
            bin_edges_list=bin_edges_list)
Stefan Reck's avatar
Stefan Reck committed
        in_blob = {
            "Hits": Table({
                "x": [4, 5, 6],
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            })
        }

        with self.assertRaises(ValueError):
            module.process(in_blob)

    def test_1d_binning(self):
        # (1, ) t binning
        bin_edges_list = [
            ["time", [2.5, 3.5]]
        ]

Stefan Reck's avatar
Stefan Reck committed
        module = modules.ImageMaker(bin_edges_list=bin_edges_list)
Stefan Reck's avatar
Stefan Reck committed
        in_blob = {
            "Hits": Table({
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            })
        }

        target = {
            "Hits": Table({
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            }),
Stefan Reck's avatar
Stefan Reck committed
            "samples": np.array([
Stefan Reck's avatar
Stefan Reck committed
                [1, ],
            ])
        }

        out_blob = module.process(in_blob)
        self.assertSetEqual(set(out_blob.keys()), set(target.keys()))
        np.testing.assert_array_almost_equal(
            np.array(out_blob["Hits"].view("<f8")),
            np.array(target["Hits"].view("<f8")))
        np.testing.assert_array_almost_equal(
Stefan Reck's avatar
Stefan Reck committed
            np.array(out_blob["samples"]),
            np.array(target["samples"]))
Stefan Reck's avatar
Stefan Reck committed

    def test_1d_binning_no_hits(self):
        # (1, ) t binning
        bin_edges_list = [
            ["time", [3.5, 4.5]]
        ]

Stefan Reck's avatar
Stefan Reck committed
        module = modules.ImageMaker(bin_edges_list=bin_edges_list)
Stefan Reck's avatar
Stefan Reck committed
        in_blob = {
            "Hits": Table({
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            })
        }

        target = {
            "Hits": Table({
                'time': [1., 2., 3.],
                "t0": [0.1, 0.2, 0.3],
                "triggered": [0, 1, 1],
            }),
Stefan Reck's avatar
Stefan Reck committed
            "samples": np.array([
Stefan Reck's avatar
Stefan Reck committed
                [0, ],
            ])
        }

        out_blob = module.process(in_blob)
        self.assertSetEqual(set(out_blob.keys()), set(target.keys()))
        np.testing.assert_array_almost_equal(
            np.array(out_blob["Hits"].view("<f8")),
            np.array(target["Hits"].view("<f8")))
        np.testing.assert_array_almost_equal(
Stefan Reck's avatar
Stefan Reck committed
            np.array(out_blob["samples"]),
            np.array(target["samples"]))
Stefan Reck's avatar
Stefan Reck committed


class TestBinningStatsMaker(TestCase):
    def test_it(self):
        # (3 x 2) x-t binning
        bin_edges_list = [
            ["x", [3.5, 4.5, 5.5, 6.5]],
            ["time", [0.5, 2, 3.5]],
            ["z", [1, 4]]
        ]

        in_blob = {
            "Hits": Table({
                "x": [4, 5, 6, 6],
                'time': [1., 2., 3., 50],
                "z": [0, 3, 4, 5],

                "t0": [0.1, 0.2, 0.3, 0.4],
                "triggered": [0, 1, 1, 1],
            })
        }

        target = {
            'x': {
                'hist': np.array([0., 0., 0., 1., 0., 1.]),
                'hist_bin_edges': np.array([3.5, 4., 4.5, 5., 5.5, 6., 6.5]),
                'bin_edges': [3.5, 4.5, 5.5, 6.5],
                'cut_off': np.array([0., 0.])
            },
            'time': {
                'hist': np.array([0., 2.]),
                'hist_bin_edges': [0.5, 2, 3.5],
                'bin_edges': [0.5, 2, 3.5],
                'cut_off': np.array([0., 1.])
            },
            'z': {
                'hist': np.array([0., 2.]),
                'hist_bin_edges': np.array([1., 2.5, 4.]),
                'bin_edges': [1, 4],
                'cut_off': np.array([1., 1.])
            }
        }

        module = modules.BinningStatsMaker(
            bin_edges_list=bin_edges_list, res_increase=2)
        module.process(in_blob)
        output = module.finish()
        check_dicts_n_ray(output, target)


def check_dicts_n_ray(a, b):
    """ Check if dicts with dicts with ndarrays are equal. """
    if set(a.keys()) != set(b.keys()):
        raise KeyError("{} != {}".format(a.keys(), b.keys()))
    for key in a.keys():
        if set(a[key].keys()) != set(b[key].keys()):
            raise KeyError("{} != {}".format(a[key].keys(), b[key].keys()))
        for skey in a[key].keys():
            np.testing.assert_array_almost_equal(a[key][skey], b[key][skey])