Skip to content
Snippets Groups Projects
Verified Commit 3e5ba7b9 authored by Tamas Gal's avatar Tamas Gal :speech_balloon:
Browse files

Cleanup and preparation for event reader rewrite

parent 589cc961
No related branches found
No related tags found
1 merge request!68Rewrite the online module
Pipeline #25156 failed
...@@ -23,7 +23,13 @@ RATE_FACTOR = np.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255 ...@@ -23,7 +23,13 @@ RATE_FACTOR = np.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255
CHANNEL_BITS_TEMPLATE = np.zeros(31, dtype=bool) CHANNEL_BITS_TEMPLATE = np.zeros(31, dtype=bool)
SummarysliceChunk = namedtuple(field_names=["headers", "slices"], typename="SummarysliceChunk") SummarysliceChunk = namedtuple(
field_names=["headers", "slices"], typename="SummarysliceChunk"
)
BranchConfiguration = namedtuple(
field_names=["branch_address", "interpretation"], typename="BranchConfiguration"
)
class SummarysliceReader: class SummarysliceReader:
...@@ -32,7 +38,86 @@ class SummarysliceReader: ...@@ -32,7 +38,86 @@ class SummarysliceReader:
To be used as an iterator (`for chunks in SummarysliceReader(...): ...`) To be used as an iterator (`for chunks in SummarysliceReader(...): ...`)
""" """
TREE_ADDR = "KM3NET_SUMMARYSLICE/KM3NET_SUMMARYSLICE" TREE_ADDR = "KM3NET_SUMMARYSLICE/KM3NET_SUMMARYSLICE"
_subbranches = [
BranchConfiguration(
"KM3NETDAQ::JDAQSummarysliceHeader",
uproot.interpretation.numerical.AsDtype(
[
(" cnt", "u4"),
(" vers", "u2"),
(" cnt2", "u4"),
(" vers2", "u2"),
(" cnt3", "u4"),
(" vers3", "u2"),
("detector_id", ">i4"),
("run", ">i4"),
("frame_index", ">i4"),
(" cnt4", "u4"),
(" vers4", "u2"),
("UTC_seconds", ">u4"),
("UTC_16nanosecondcycles", ">u4"),
]
),
),
BranchConfiguration(
"vector<KM3NETDAQ::JDAQSummaryFrame>",
uproot.interpretation.jagged.AsJagged(
uproot.interpretation.numerical.AsDtype(
[
("dom_id", ">i4"),
("dq_status", ">u4"),
("hrv", ">u4"),
("fifo", ">u4"),
("status3", ">u4"),
("status4", ">u4"),
]
+ [(f"ch{c}", "u1") for c in range(31)]
),
header_bytes=10,
),
),
]
def __init__(self, fobj, step_size=1000):
if isinstance(fobj, str):
self._fobj = uproot.open(fobj)
else:
self._fobj = fobj
self._step_size = step_size
self._branch = self._fobj[self.TREE_ADDR]
def _summaryslices_generator(self):
for chunk in self._branch.iterate(
dict(self._subbranches), step_size=self._step_size
):
yield SummarysliceChunk(
*[getattr(chunk, bc.branch_address) for bc in self._subbranches]
)
def __iter__(self):
self._summaryslices = self._summaryslices_generator()
return self
def __next__(self):
return next(self._summaryslices)
def __len__(self):
return int(np.ceil(self._branch.num_entries / self._step_size))
def __repr__(self):
return f"<SummarysliceReader {self._branch.num_entries} summaryslices, step_size={self._step_size} ({len(self)} chunks)>"
class EventReader:
"""
A reader for DAQ events which are loaded as chunks given by step_size.
To be used as an iterator (`for chunks in EventReader(...): ...`)
"""
TREE_ADDR = "KM3NET_EVENT"
SUMMARYSLICES_BRANCH_NAME = "vector<KM3NETDAQ::JDAQSummaryFrame>" SUMMARYSLICES_BRANCH_NAME = "vector<KM3NETDAQ::JDAQSummaryFrame>"
HEADERS_BRANCH_NAME = "KM3NETDAQ::JDAQSummarysliceHeader" HEADERS_BRANCH_NAME = "KM3NETDAQ::JDAQSummarysliceHeader"
SUMMARYSLICE_INTERPRETATION = uproot.interpretation.jagged.AsJagged( SUMMARYSLICE_INTERPRETATION = uproot.interpretation.jagged.AsJagged(
...@@ -50,22 +135,22 @@ class SummarysliceReader: ...@@ -50,22 +135,22 @@ class SummarysliceReader:
header_bytes=10, header_bytes=10,
) )
HEADER_INTERPRETATION = uproot.interpretation.numerical.AsDtype( HEADER_INTERPRETATION = uproot.interpretation.numerical.AsDtype(
[ [
(" cnt", "u4"), (" cnt", "u4"),
(" vers", "u2"), (" vers", "u2"),
(" cnt2", "u4"), (" cnt2", "u4"),
(" vers2", "u2"), (" vers2", "u2"),
(" cnt3", "u4"), (" cnt3", "u4"),
(" vers3", "u2"), (" vers3", "u2"),
("detector_id", ">i4"), ("detector_id", ">i4"),
("run", ">i4"), ("run", ">i4"),
("frame_index", ">i4"), ("frame_index", ">i4"),
(" cnt4", "u4"), (" cnt4", "u4"),
(" vers4", "u2"), (" vers4", "u2"),
("UTC_seconds", ">u4"), ("UTC_seconds", ">u4"),
("UTC_16nanosecondcycles", ">u4"), ("UTC_16nanosecondcycles", ">u4"),
] ]
) )
def __init__(self, fobj, step_size=1000): def __init__(self, fobj, step_size=1000):
if isinstance(fobj, str): if isinstance(fobj, str):
...@@ -80,9 +165,7 @@ class SummarysliceReader: ...@@ -80,9 +165,7 @@ class SummarysliceReader:
self.SUMMARYSLICES_BRANCH_NAME: self.SUMMARYSLICE_INTERPRETATION, self.SUMMARYSLICES_BRANCH_NAME: self.SUMMARYSLICE_INTERPRETATION,
self.HEADERS_BRANCH_NAME: self.HEADER_INTERPRETATION, self.HEADERS_BRANCH_NAME: self.HEADER_INTERPRETATION,
} }
for chunk in self._branch.iterate( for chunk in self._branch.iterate(expressions, step_size=self._step_size):
expressions, step_size=self._step_size
):
yield SummarysliceChunk( yield SummarysliceChunk(
chunk[self.HEADERS_BRANCH_NAME], chunk[self.HEADERS_BRANCH_NAME],
chunk[self.SUMMARYSLICES_BRANCH_NAME], chunk[self.SUMMARYSLICES_BRANCH_NAME],
...@@ -262,7 +345,9 @@ class OnlineReader: ...@@ -262,7 +345,9 @@ class OnlineReader:
@property @property
def summaryslices(self): def summaryslices(self):
if self._summaryslices is None: if self._summaryslices is None:
self._summaryslices = SummarysliceReader(uproot.open(self._filename)) # TODO: remove when using uproot4 self._summaryslices = SummarysliceReader(
uproot.open(self._filename)
) # TODO: remove when using uproot4
return self._summaryslices return self._summaryslices
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment