Skip to content
Snippets Groups Projects
Commit 4b1a3da8 authored by Stefan Reck's avatar Stefan Reck
Browse files

Merge branch 'center_hits' into 'master'

Center hits

See merge request !13
parents 5fa360ae 3c46999d
No related branches found
No related tags found
1 merge request!13Center hits
......@@ -38,6 +38,11 @@ class BaseProcessor:
correct_timeslew : bool
If true, the time slewing of hits depending on their tot
will be corrected.
center_hits_to : tuple, optional
Translate the xyz positions of the hits (and mchits), as if
the detector was centered at the given position.
E.g., if its (0, 0, None), the hits and mchits will be
centered at xy = 00, and z will be left untouched.
add_t0 : bool
If true, add t0 to the time of hits and mchits. If using a
det_file, this will already have been done automatically
......@@ -87,6 +92,7 @@ class BaseProcessor:
center_time=True,
add_t0=False,
correct_timeslew=True,
center_hits_to=None,
event_skipper=None,
chunksize=32,
keep_event_info=False,
......@@ -98,6 +104,7 @@ class BaseProcessor:
self.center_time = center_time
self.add_t0 = add_t0
self.correct_timeslew = correct_timeslew
self.center_hits_to = center_hits_to
self.event_skipper = event_skipper
self.chunksize = chunksize
self.keep_event_info = keep_event_info
......@@ -184,7 +191,9 @@ class BaseProcessor:
if self.det_file:
cmpts.append((modules.DetApplier, {
"det_file": self.det_file,
"correct_timeslew": self.correct_timeslew}))
"correct_timeslew": self.correct_timeslew,
"center_hits_to": self.center_hits_to,
}))
if any((self.center_time, self.add_t0)):
cmpts.append((modules.TimePreproc, {
......
......@@ -397,17 +397,29 @@ class DetApplier(kp.Module):
correct_timeslew : bool
If true, the time slewing of hits depending on their tot
will be corrected.
center_hits_to : tuple, optional
Translate the xyz positions of the hits (and mchits), as if
the detector was centered at the given position.
E.g., if its (0, 0, None), the hits and mchits will be
centered at xy = 00, and z will be left untouched.
"""
def configure(self):
self.det_file = self.require("det_file")
self.correct_timeslew = self.get("correct_timeslew", default=True)
self.center_hits_to = self.get("center_hits_to", default=None)
self.cprint(f"Calibrating with {self.det_file}")
self.calib = kp.calib.Calibration(filename=self.det_file)
self._calib_checked = False
# dict dim_name: float
self._vector_shift = None
if self.center_hits_to:
self._cache_shift_center()
def process(self, blob):
if self._calib_checked is False:
if "pos_x" in blob["Hits"]:
......@@ -421,8 +433,32 @@ class DetApplier(kp.Module):
blob["Hits"], correct_slewing=self.correct_timeslew)
if "McHits" in blob:
blob["McHits"] = self.calib.apply(blob["McHits"])
if self.center_hits_to:
self.shift_hits(blob)
return blob
def shift_hits(self, blob):
""" Translate hits by cached vector. """
for dim_name in ("pos_x", "pos_y", "pos_z"):
blob["Hits"][dim_name] += self._vector_shift[dim_name]
if "McHits" in blob:
blob["McHits"][dim_name] += self._vector_shift[dim_name]
def _cache_shift_center(self):
det_center, shift = {}, {}
for i, dim_name in enumerate(("pos_x", "pos_y", "pos_z")):
center = self.calib.detector.dom_table[dim_name].mean()
det_center[dim_name] = center
if self.center_hits_to[i] is None:
shift[dim_name] = 0
else:
shift[dim_name] = self.center_hits_to[i] - center
self._vector_shift = shift
self.cprint(f"original detector center: {det_center}")
self.cprint(f"shift for hits: {self._vector_shift}")
class HitRotator(kp.Module):
"""
......
import os
from unittest import TestCase
import numpy as np
import orcasong.modules as modules
......@@ -7,6 +8,11 @@ import km3pipe as kp
__author__ = 'Stefan Reck'
test_dir = os.path.dirname(os.path.realpath(__file__))
MUPAGE_FILE = os.path.join(test_dir, "data", "mupage.root.h5")
DET_FILE = os.path.join(test_dir, "data", "KM3NeT_-00000001_20171212.detx")
class TestModules(TestCase):
def test_mc_info_maker(self):
""" Test the mcinfo maker on some dummy data. """
......@@ -385,6 +391,36 @@ class TestImageMaker(TestCase):
np.array(target["samples"]))
class TestDetApplier(TestCase):
def setUp(self):
self.deta = modules.DetApplier(
det_file=DET_FILE,
center_hits_to=(0, 5, None),
)
# self.pump = kp.io.HDF5Pump(filename=MUPAGE_FILE)
# self.blob = self.pump[0]
def test_cache_center(self):
target = {"pos_x": 58.75166782379619, "pos_y": -21.5, "pos_z": 0}
for d in ("pos_x", "pos_y", "pos_z"):
np.testing.assert_array_almost_equal(target[d], self.deta._vector_shift[d])
def test_shift_is_applied_to_hits(self):
blob = {"Hits": {
"pos_x": np.ones(3),
"pos_y": np.ones(3)*2,
"pos_z": np.ones(3)*3,
}}
target = {
"pos_x": np.ones(3) * 59.75166782379619,
"pos_y": np.ones(3) * -19.5,
"pos_z": np.ones(3) * 3,
}
self.deta.shift_hits(blob)
for d in ("pos_x", "pos_y", "pos_z"):
np.testing.assert_array_almost_equal(target[d], blob["Hits"][d])
class TestBinningStatsMaker(TestCase):
def test_it(self):
# (3 x 2) x-t binning
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment