Skip to content
Snippets Groups Projects
Verified Commit 07efad81 authored by Tamas Gal's avatar Tamas Gal :speech_balloon:
Browse files

Generalise summaryslice reader into a "chunks reader"

parent 3e5ba7b9
No related branches found
No related tags found
1 merge request!68Rewrite the online module
Pipeline #25157 failed
......@@ -23,10 +23,6 @@ RATE_FACTOR = np.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255
CHANNEL_BITS_TEMPLATE = np.zeros(31, dtype=bool)
SummarysliceChunk = namedtuple(
field_names=["headers", "slices"], typename="SummarysliceChunk"
)
BranchConfiguration = namedtuple(
field_names=["branch_address", "interpretation"], typename="BranchConfiguration"
)
......@@ -88,101 +84,30 @@ class SummarysliceReader:
self._step_size = step_size
self._branch = self._fobj[self.TREE_ADDR]
def _summaryslices_generator(self):
self.ChunksConstructor = namedtuple(
field_names=["headers", "slices"], typename="SummarysliceChunk"
)
def _chunks_generator(self):
for chunk in self._branch.iterate(
dict(self._subbranches), step_size=self._step_size
):
yield SummarysliceChunk(
yield self.ChunksConstructor(
*[getattr(chunk, bc.branch_address) for bc in self._subbranches]
)
def __iter__(self):
self._summaryslices = self._summaryslices_generator()
return self
def __next__(self):
return next(self._summaryslices)
def __len__(self):
return int(np.ceil(self._branch.num_entries / self._step_size))
def __repr__(self):
return f"<SummarysliceReader {self._branch.num_entries} summaryslices, step_size={self._step_size} ({len(self)} chunks)>"
class EventReader:
"""
A reader for DAQ events which are loaded as chunks given by step_size.
To be used as an iterator (`for chunks in EventReader(...): ...`)
"""
TREE_ADDR = "KM3NET_EVENT"
SUMMARYSLICES_BRANCH_NAME = "vector<KM3NETDAQ::JDAQSummaryFrame>"
HEADERS_BRANCH_NAME = "KM3NETDAQ::JDAQSummarysliceHeader"
SUMMARYSLICE_INTERPRETATION = uproot.interpretation.jagged.AsJagged(
uproot.interpretation.numerical.AsDtype(
[
("dom_id", ">i4"),
("dq_status", ">u4"),
("hrv", ">u4"),
("fifo", ">u4"),
("status3", ">u4"),
("status4", ">u4"),
]
+ [(f"ch{c}", "u1") for c in range(31)]
),
header_bytes=10,
)
HEADER_INTERPRETATION = uproot.interpretation.numerical.AsDtype(
[
(" cnt", "u4"),
(" vers", "u2"),
(" cnt2", "u4"),
(" vers2", "u2"),
(" cnt3", "u4"),
(" vers3", "u2"),
("detector_id", ">i4"),
("run", ">i4"),
("frame_index", ">i4"),
(" cnt4", "u4"),
(" vers4", "u2"),
("UTC_seconds", ">u4"),
("UTC_16nanosecondcycles", ">u4"),
]
)
def __init__(self, fobj, step_size=1000):
if isinstance(fobj, str):
self._fobj = uproot.open(fobj)
else:
self._fobj = fobj
self._step_size = step_size
self._branch = self._fobj[self.TREE_ADDR]
def _summaryslices_generator(self):
expressions = {
self.SUMMARYSLICES_BRANCH_NAME: self.SUMMARYSLICE_INTERPRETATION,
self.HEADERS_BRANCH_NAME: self.HEADER_INTERPRETATION,
}
for chunk in self._branch.iterate(expressions, step_size=self._step_size):
yield SummarysliceChunk(
chunk[self.HEADERS_BRANCH_NAME],
chunk[self.SUMMARYSLICES_BRANCH_NAME],
)
def __iter__(self):
self._summaryslices = self._summaryslices_generator()
self._chunks = self._chunks_generator()
return self
def __next__(self):
return next(self._summaryslices)
return next(self._chunks)
def __len__(self):
return int(np.ceil(self._branch.num_entries / self._step_size))
def __repr__(self):
return f"<SummarysliceReader {self._branch.num_entries} summaryslices, step_size={self._step_size} ({len(self)} chunks)>"
return f"<{self.__class__.__name__} {self._branch.num_entries} items, step_size={self._step_size} ({len(self)} chunks)>"
@nb.vectorize(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment