diff --git a/CHANGELOG.rst b/CHANGELOG.rst index f1160cc6a926cf91e026e5162aa4284afcd47495..f6e4afba4873bab3be4c1da6f49f2908a61d196c 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,5 +1,9 @@ Unreleased changes ------------------ +* Added a new, high-performance Summaryslice reader ``km3io.online.SummarysliceReader`` +* The old ``km3io.OnlineReader.summarslices`` is now using the new ``SummarysliceReader`` + which has a slightly different API (but at least an order of magnitude better + performance and much nicer high-level API thanks to AwkwardArrays) Version 0 --------- diff --git a/examples/plot_online_example.py b/examples/plot_online_example.py index 72a38ddfc9ccfefbe091d5a9632effa966c5c493..c735b128b0254e7e1719e0d3891844d749a8e253 100644 --- a/examples/plot_online_example.py +++ b/examples/plot_online_example.py @@ -40,29 +40,38 @@ print(f.events[0].snapshot_hits.tot) # Reading SummarySlices # --------------------- # The following example shows how to access summary slices, in particular the DOM -# IDs of the slice with the index 0: +# IDs of the slice with the index 0. +# The current implementation of the summaryslice I/O uses a chunked reading for +# better performance, which means that when you iterate through the `.slices`, +# you'll get chunks of summaryslices in each iteration instead of a single one. +# +# In the example below, we simulate a single iteration by using the `break` +# keyword and then use the data which has been "pulled out" of the ROOT file. -dom_ids = f.summaryslices.slices[0].dom_id -print(dom_ids) +for chunk in f.summaryslices: + break ##################################################### -# The .dtype attribute (or in general, <TAB> completion) is useful to find out -# more about the field structure: +# `chunk` now contains the first set of summaryslices so `chunk.slice[0]` refers +# to the first summaryslice in the ROOT file. To access e.g. the DOM IDs, use +# the `.dom_id` attribute + +dom_ids = chunk.slices[0].dom_id -print(f.summaryslices.headers.dtype) +print(dom_ids) ##################################################### -# To read the frame index: +# The .type attribute (or in general, <TAB> completion) is useful to find out +# more about the field structure: -print(f.summaryslices.headers.frame_index) +print(chunk.slices.type) ##################################################### -# The resulting array is a ChunkedArray which is an extended version of a -# numpy array and behaves like one. +# Similar to the summaryslice data, the headers can be accessed the same way +# To read the frame index of all summaryslices in the obtained chunk: + +print(chunk.headers.frame_index) ##################################################### -# Reading TimeSlices -# ------------------ -# To be continued. -# +# To be continued... diff --git a/km3io/online.py b/km3io/online.py index 5ebec6ed9c38679da968f253c1eac6affebe1eb4..ad080f96a31e9f84a123f29aff60600406271f4e 100644 --- a/km3io/online.py +++ b/km3io/online.py @@ -1,5 +1,7 @@ import binascii +from collections import namedtuple import os +import uproot import uproot3 import numpy as np @@ -21,6 +23,116 @@ RATE_FACTOR = np.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255 CHANNEL_BITS_TEMPLATE = np.zeros(31, dtype=bool) +BranchConfiguration = namedtuple( + field_names=["branch_address", "interpretation"], typename="BranchConfiguration" +) + + +class SummarysliceReader: + """ + A reader for summaryslices which are loaded as chunks given by step_size. + + To be used as an iterator (`for chunks in SummarysliceReader(...): ...`) + """ + + TREE_ADDR = "KM3NET_SUMMARYSLICE/KM3NET_SUMMARYSLICE" + _subbranches = [ + BranchConfiguration( + "KM3NETDAQ::JDAQSummarysliceHeader", + uproot.interpretation.numerical.AsDtype( + [ + (" cnt", "u4"), + (" vers", "u2"), + (" cnt2", "u4"), + (" vers2", "u2"), + (" cnt3", "u4"), + (" vers3", "u2"), + ("detector_id", ">i4"), + ("run", ">i4"), + ("frame_index", ">i4"), + (" cnt4", "u4"), + (" vers4", "u2"), + ("UTC_seconds", ">u4"), + ("UTC_16nanosecondcycles", ">u4"), + ] + ), + ), + BranchConfiguration( + "vector<KM3NETDAQ::JDAQSummaryFrame>", + uproot.interpretation.jagged.AsJagged( + uproot.interpretation.numerical.AsDtype( + [ + ("dom_id", ">i4"), + ("dq_status", ">u4"), + ("hrv", ">u4"), + ("fifo", ">u4"), + ("status3", ">u4"), + ("status4", ">u4"), + ] + + [(f"ch{c}", "u1") for c in range(31)] + ), + header_bytes=10, + ), + ), + ] + + def __init__(self, fobj, step_size=1000): + if isinstance(fobj, str): + self._fobj = uproot.open(fobj) + else: + self._fobj = fobj + self._step_size = step_size + self._branch = self._fobj[self.TREE_ADDR] + + self.ChunksConstructor = namedtuple( + field_names=["headers", "slices"], typename="SummarysliceChunk" + ) + + def _chunks_generator(self): + for chunk in self._branch.iterate( + dict(self._subbranches), step_size=self._step_size + ): + yield self.ChunksConstructor( + *[getattr(chunk, bc.branch_address) for bc in self._subbranches] + ) + + def __getitem__(self, idx): + if idx >= len(self) or idx < -len(self): + raise IndexError("Chunk index out of range") + + s = self._step_size + + if idx < 0: + idx = len(self) + idx + + chunk = self._branch.arrays( + dict(self._subbranches), entry_start=idx * s, entry_stop=(idx + 1) * s + ) + return self.ChunksConstructor( + *[getattr(chunk, bc.branch_address) for bc in self._subbranches] + ) + + def __iter__(self): + self._chunks = self._chunks_generator() + return self + + def __next__(self): + return next(self._chunks) + + def __len__(self): + return int(np.ceil(self._branch.num_entries / self._step_size)) + + def __repr__(self): + step_size = self._step_size + n_items = self._branch.num_entries + cls_name = self.__class__.__name__ + n_chunks = len(self) + return ( + f"<{cls_name} {n_items} items, step_size={step_size} " + f"({n_chunks} chunk{'' if n_chunks == 1 else 's'})>" + ) + + @nb.vectorize( [nb.int32(nb.int8), nb.int32(nb.int16), nb.int32(nb.int32), nb.int32(nb.int64)] ) @@ -181,74 +293,12 @@ class OnlineReader: @property def summaryslices(self): if self._summaryslices is None: - self._summaryslices = SummarySlices(self._fobj) + self._summaryslices = SummarysliceReader( + uproot.open(self._filename) + ) # TODO: remove when using uproot4 return self._summaryslices -class SummarySlices: - """A wrapper for summary slices""" - - def __init__(self, fobj): - self._fobj = fobj - self._slices = None - self._headers = None - self._rates = None - self._ch_selector = ["ch{}".format(c) for c in range(31)] - - @property - def headers(self): - if self._headers is None: - self._headers = self._read_headers() - return self._headers - - @property - def slices(self): - if self._slices is None: - self._slices = self._read_summaryslices() - return self._slices - - @property - def rates(self): - if self._rates is None: - self._rates = self.slices[["dom_id"] + self._ch_selector] - return self._rates - - def _read_summaryslices(self): - """Reads a lazyarray of summary slices""" - tree = self._fobj[b"KM3NET_SUMMARYSLICE"][b"KM3NET_SUMMARYSLICE"] - return tree[b"vector<KM3NETDAQ::JDAQSummaryFrame>"].lazyarray( - uproot3.asjagged( - uproot3.astable( - uproot3.asdtype( - [ - ("dom_id", "i4"), - ("dq_status", "u4"), - ("hrv", "u4"), - ("fifo", "u4"), - ("status3", "u4"), - ("status4", "u4"), - ] - + [(c, "u1") for c in self._ch_selector] - ) - ), - skipbytes=10, - ), - basketcache=uproot3.cache.ThreadSafeArrayCache( - SUMMARYSLICE_FRAME_BASKET_CACHE_SIZE - ), - ) - - def _read_headers(self): - """Reads a lazyarray of summary slice headers""" - tree = self._fobj[b"KM3NET_SUMMARYSLICE"][b"KM3NET_SUMMARYSLICE"] - return tree[b"KM3NETDAQ::JDAQSummarysliceHeader"].lazyarray( - uproot3.interpret(tree[b"KM3NETDAQ::JDAQSummarysliceHeader"], cntvers=True) - ) - - def __str__(self): - return "Number of summaryslices: {}".format(len(self.headers)) - - class Timeslices: """A simple wrapper for timeslices""" diff --git a/tests/test_online.py b/tests/test_online.py index 50e5689aab4b48dd8ee33b57ddaa027e058ebf20..8684f6c41e4e97e1bb71d1801956372d5210f615 100644 --- a/tests/test_online.py +++ b/tests/test_online.py @@ -3,11 +3,13 @@ import itertools import os import re import unittest +import numpy as np from km3net_testdata import data_path from km3io.online import ( OnlineReader, + SummarysliceReader, get_rate, has_udp_trailer, get_udp_max_sequence_number, @@ -173,7 +175,9 @@ class TestTimeslice(unittest.TestCase): class TestSummaryslices(unittest.TestCase): def setUp(self): - self.ss = OnlineReader(ONLINE_FILE).summaryslices + for chunk in OnlineReader(ONLINE_FILE).summaryslices: + self.ss = chunk + break def test_headers(self): assert 3 == len(self.ss.headers) @@ -185,9 +189,6 @@ class TestSummaryslices(unittest.TestCase): def test_slices(self): assert 3 == len(self.ss.slices) - def test_rates(self): - assert 3 == len(self.ss.rates) - def test_fifo(self): s = self.ss.slices[0] dct_fifo_stat = { @@ -698,7 +699,10 @@ class TestGetChannelFlags_Issue59(unittest.TestCase): r = OnlineReader( data_path("online/KM3NeT_00000049_00008456.summaryslice-167941.root") ) - summaryslice = r.summaryslices.slices[0] + + for chunks in r.summaryslices: + summaryslice = chunks.slices[0] + break for ours, ref in zip(summaryslice, ref_entries): assert ours.dom_id == to_num(ref.dom_id) @@ -731,3 +735,97 @@ class TestGetRate(unittest.TestCase): def test_vectorized_input(self): self.assertListEqual([2054], list(get_rate([1]))) self.assertListEqual([2054, 2111, 2169], list(get_rate([1, 2, 3]))) + + +class TestSummarysliceReader(unittest.TestCase): + def test_init(self): + sr = SummarysliceReader(data_path("online/km3net_online.root")) + + def test_length(self): + sr = SummarysliceReader(data_path("online/km3net_online.root")) + assert 1 == len(sr) + sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=2) + assert 2 == len(sr) + sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=3) + assert 1 == len(sr) + + def test_getitem_raises_when_out_of_range(self): + sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1) + with self.assertRaises(IndexError): + sr[123] + with self.assertRaises(IndexError): + sr[-123] + with self.assertRaises(IndexError): + sr[3] + sr[-3] # this should still work, gives the first element in this case + with self.assertRaises(IndexError): + sr[-4] + + def test_getitem(self): + sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1) + for idx in range(len(sr)): + assert len(sr[idx].headers) == 1 + assert len(sr[idx].slices) == 1 + + first_frame_index = sr[0].headers.frame_index # 126 + last_frame_index = sr[2].headers.frame_index # 128 + + assert 126 == first_frame_index + assert 128 == last_frame_index + + sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=2) + assert len(sr[0].headers) == 2 + assert len(sr[0].slices) == 2 + assert len(sr[1].headers) == 1 + assert len(sr[1].slices) == 1 + with self.assertRaises(IndexError): + assert len(sr[2].headers) == 0 + assert len(sr[2].slices) == 0 + + assert first_frame_index == sr[0].headers[0].frame_index + assert last_frame_index == sr[1].headers[0].frame_index + + assert last_frame_index == sr[-1].headers[0].frame_index + assert first_frame_index == sr[-2].headers[0].frame_index + + def test_iterate_with_step_size_one(self): + sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1) + i = 0 + for ss in sr: + i += 1 + assert i == 3 + + def test_iterate_with_step_size_bigger_than_number_of_elements(self): + sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1000) + i = 0 + for ss in sr: + i += 1 + assert i == 1 + + def test_iterate_gives_correct_data_slices(self): + sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1000) + + for ss in sr: + self.assertListEqual( + ss.slices[0].dom_id[:3].to_list(), [806451572, 806455814, 806465101] + ) + self.assertListEqual( + ss.slices[0].dom_id[-3:].to_list(), [809526097, 809544058, 809544061] + ) + assert len(ss.slices) == 3 + assert len(ss.slices[0]) == 64 + assert len(ss.slices[1]) == 66 + assert len(ss.slices[2]) == 68 + self.assertListEqual(ss.slices[0].ch5[:3].to_list(), [75, 62, 55]) + + sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1) + + lengths = [64, 66, 68] + + for idx, ss in enumerate(sr): + # self.assertListEqual(ss[0].dom_id[:3].to_list(), [806451572, 806455814, 806465101]) + # self.assertListEqual(ss[0].dom_id[-3:].to_list(), [809526097, 809544058, 809544061]) + assert len(ss.slices) == 1 + assert len(ss.slices[0]) == lengths[idx] + assert len(ss.slices[0].dom_id) == lengths[idx] + assert len(ss.slices[0].ch3) == lengths[idx]