Skip to content
Snippets Groups Projects
Commit c3a50421 authored by Tamas Gal's avatar Tamas Gal :speech_balloon:
Browse files

Merge branch 'rewrite-online' into 'master'

Rewrite the online module

See merge request !68
parents 627995b6 b4a0949d
1 merge request!68Rewrite the online module
Pipeline #25960 passed
Unreleased changes
------------------
* Added a new, high-performance Summaryslice reader ``km3io.online.SummarysliceReader``
* The old ``km3io.OnlineReader.summarslices`` is now using the new ``SummarysliceReader``
which has a slightly different API (but at least an order of magnitude better
performance and much nicer high-level API thanks to AwkwardArrays)
Version 0
---------
......
......@@ -40,29 +40,38 @@ print(f.events[0].snapshot_hits.tot)
# Reading SummarySlices
# ---------------------
# The following example shows how to access summary slices, in particular the DOM
# IDs of the slice with the index 0:
# IDs of the slice with the index 0.
# The current implementation of the summaryslice I/O uses a chunked reading for
# better performance, which means that when you iterate through the `.slices`,
# you'll get chunks of summaryslices in each iteration instead of a single one.
#
# In the example below, we simulate a single iteration by using the `break`
# keyword and then use the data which has been "pulled out" of the ROOT file.
dom_ids = f.summaryslices.slices[0].dom_id
print(dom_ids)
for chunk in f.summaryslices:
break
#####################################################
# The .dtype attribute (or in general, <TAB> completion) is useful to find out
# more about the field structure:
# `chunk` now contains the first set of summaryslices so `chunk.slice[0]` refers
# to the first summaryslice in the ROOT file. To access e.g. the DOM IDs, use
# the `.dom_id` attribute
dom_ids = chunk.slices[0].dom_id
print(f.summaryslices.headers.dtype)
print(dom_ids)
#####################################################
# To read the frame index:
# The .type attribute (or in general, <TAB> completion) is useful to find out
# more about the field structure:
print(f.summaryslices.headers.frame_index)
print(chunk.slices.type)
#####################################################
# The resulting array is a ChunkedArray which is an extended version of a
# numpy array and behaves like one.
# Similar to the summaryslice data, the headers can be accessed the same way
# To read the frame index of all summaryslices in the obtained chunk:
print(chunk.headers.frame_index)
#####################################################
# Reading TimeSlices
# ------------------
# To be continued.
#
# To be continued...
import binascii
from collections import namedtuple
import os
import uproot
import uproot3
import numpy as np
......@@ -21,6 +23,116 @@ RATE_FACTOR = np.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255
CHANNEL_BITS_TEMPLATE = np.zeros(31, dtype=bool)
BranchConfiguration = namedtuple(
field_names=["branch_address", "interpretation"], typename="BranchConfiguration"
)
class SummarysliceReader:
"""
A reader for summaryslices which are loaded as chunks given by step_size.
To be used as an iterator (`for chunks in SummarysliceReader(...): ...`)
"""
TREE_ADDR = "KM3NET_SUMMARYSLICE/KM3NET_SUMMARYSLICE"
_subbranches = [
BranchConfiguration(
"KM3NETDAQ::JDAQSummarysliceHeader",
uproot.interpretation.numerical.AsDtype(
[
(" cnt", "u4"),
(" vers", "u2"),
(" cnt2", "u4"),
(" vers2", "u2"),
(" cnt3", "u4"),
(" vers3", "u2"),
("detector_id", ">i4"),
("run", ">i4"),
("frame_index", ">i4"),
(" cnt4", "u4"),
(" vers4", "u2"),
("UTC_seconds", ">u4"),
("UTC_16nanosecondcycles", ">u4"),
]
),
),
BranchConfiguration(
"vector<KM3NETDAQ::JDAQSummaryFrame>",
uproot.interpretation.jagged.AsJagged(
uproot.interpretation.numerical.AsDtype(
[
("dom_id", ">i4"),
("dq_status", ">u4"),
("hrv", ">u4"),
("fifo", ">u4"),
("status3", ">u4"),
("status4", ">u4"),
]
+ [(f"ch{c}", "u1") for c in range(31)]
),
header_bytes=10,
),
),
]
def __init__(self, fobj, step_size=1000):
if isinstance(fobj, str):
self._fobj = uproot.open(fobj)
else:
self._fobj = fobj
self._step_size = step_size
self._branch = self._fobj[self.TREE_ADDR]
self.ChunksConstructor = namedtuple(
field_names=["headers", "slices"], typename="SummarysliceChunk"
)
def _chunks_generator(self):
for chunk in self._branch.iterate(
dict(self._subbranches), step_size=self._step_size
):
yield self.ChunksConstructor(
*[getattr(chunk, bc.branch_address) for bc in self._subbranches]
)
def __getitem__(self, idx):
if idx >= len(self) or idx < -len(self):
raise IndexError("Chunk index out of range")
s = self._step_size
if idx < 0:
idx = len(self) + idx
chunk = self._branch.arrays(
dict(self._subbranches), entry_start=idx * s, entry_stop=(idx + 1) * s
)
return self.ChunksConstructor(
*[getattr(chunk, bc.branch_address) for bc in self._subbranches]
)
def __iter__(self):
self._chunks = self._chunks_generator()
return self
def __next__(self):
return next(self._chunks)
def __len__(self):
return int(np.ceil(self._branch.num_entries / self._step_size))
def __repr__(self):
step_size = self._step_size
n_items = self._branch.num_entries
cls_name = self.__class__.__name__
n_chunks = len(self)
return (
f"<{cls_name} {n_items} items, step_size={step_size} "
f"({n_chunks} chunk{'' if n_chunks == 1 else 's'})>"
)
@nb.vectorize(
[nb.int32(nb.int8), nb.int32(nb.int16), nb.int32(nb.int32), nb.int32(nb.int64)]
)
......@@ -181,74 +293,12 @@ class OnlineReader:
@property
def summaryslices(self):
if self._summaryslices is None:
self._summaryslices = SummarySlices(self._fobj)
self._summaryslices = SummarysliceReader(
uproot.open(self._filename)
) # TODO: remove when using uproot4
return self._summaryslices
class SummarySlices:
"""A wrapper for summary slices"""
def __init__(self, fobj):
self._fobj = fobj
self._slices = None
self._headers = None
self._rates = None
self._ch_selector = ["ch{}".format(c) for c in range(31)]
@property
def headers(self):
if self._headers is None:
self._headers = self._read_headers()
return self._headers
@property
def slices(self):
if self._slices is None:
self._slices = self._read_summaryslices()
return self._slices
@property
def rates(self):
if self._rates is None:
self._rates = self.slices[["dom_id"] + self._ch_selector]
return self._rates
def _read_summaryslices(self):
"""Reads a lazyarray of summary slices"""
tree = self._fobj[b"KM3NET_SUMMARYSLICE"][b"KM3NET_SUMMARYSLICE"]
return tree[b"vector<KM3NETDAQ::JDAQSummaryFrame>"].lazyarray(
uproot3.asjagged(
uproot3.astable(
uproot3.asdtype(
[
("dom_id", "i4"),
("dq_status", "u4"),
("hrv", "u4"),
("fifo", "u4"),
("status3", "u4"),
("status4", "u4"),
]
+ [(c, "u1") for c in self._ch_selector]
)
),
skipbytes=10,
),
basketcache=uproot3.cache.ThreadSafeArrayCache(
SUMMARYSLICE_FRAME_BASKET_CACHE_SIZE
),
)
def _read_headers(self):
"""Reads a lazyarray of summary slice headers"""
tree = self._fobj[b"KM3NET_SUMMARYSLICE"][b"KM3NET_SUMMARYSLICE"]
return tree[b"KM3NETDAQ::JDAQSummarysliceHeader"].lazyarray(
uproot3.interpret(tree[b"KM3NETDAQ::JDAQSummarysliceHeader"], cntvers=True)
)
def __str__(self):
return "Number of summaryslices: {}".format(len(self.headers))
class Timeslices:
"""A simple wrapper for timeslices"""
......
......@@ -3,11 +3,13 @@ import itertools
import os
import re
import unittest
import numpy as np
from km3net_testdata import data_path
from km3io.online import (
OnlineReader,
SummarysliceReader,
get_rate,
has_udp_trailer,
get_udp_max_sequence_number,
......@@ -173,7 +175,9 @@ class TestTimeslice(unittest.TestCase):
class TestSummaryslices(unittest.TestCase):
def setUp(self):
self.ss = OnlineReader(ONLINE_FILE).summaryslices
for chunk in OnlineReader(ONLINE_FILE).summaryslices:
self.ss = chunk
break
def test_headers(self):
assert 3 == len(self.ss.headers)
......@@ -185,9 +189,6 @@ class TestSummaryslices(unittest.TestCase):
def test_slices(self):
assert 3 == len(self.ss.slices)
def test_rates(self):
assert 3 == len(self.ss.rates)
def test_fifo(self):
s = self.ss.slices[0]
dct_fifo_stat = {
......@@ -698,7 +699,10 @@ class TestGetChannelFlags_Issue59(unittest.TestCase):
r = OnlineReader(
data_path("online/KM3NeT_00000049_00008456.summaryslice-167941.root")
)
summaryslice = r.summaryslices.slices[0]
for chunks in r.summaryslices:
summaryslice = chunks.slices[0]
break
for ours, ref in zip(summaryslice, ref_entries):
assert ours.dom_id == to_num(ref.dom_id)
......@@ -731,3 +735,97 @@ class TestGetRate(unittest.TestCase):
def test_vectorized_input(self):
self.assertListEqual([2054], list(get_rate([1])))
self.assertListEqual([2054, 2111, 2169], list(get_rate([1, 2, 3])))
class TestSummarysliceReader(unittest.TestCase):
def test_init(self):
sr = SummarysliceReader(data_path("online/km3net_online.root"))
def test_length(self):
sr = SummarysliceReader(data_path("online/km3net_online.root"))
assert 1 == len(sr)
sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=2)
assert 2 == len(sr)
sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=3)
assert 1 == len(sr)
def test_getitem_raises_when_out_of_range(self):
sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1)
with self.assertRaises(IndexError):
sr[123]
with self.assertRaises(IndexError):
sr[-123]
with self.assertRaises(IndexError):
sr[3]
sr[-3] # this should still work, gives the first element in this case
with self.assertRaises(IndexError):
sr[-4]
def test_getitem(self):
sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1)
for idx in range(len(sr)):
assert len(sr[idx].headers) == 1
assert len(sr[idx].slices) == 1
first_frame_index = sr[0].headers.frame_index # 126
last_frame_index = sr[2].headers.frame_index # 128
assert 126 == first_frame_index
assert 128 == last_frame_index
sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=2)
assert len(sr[0].headers) == 2
assert len(sr[0].slices) == 2
assert len(sr[1].headers) == 1
assert len(sr[1].slices) == 1
with self.assertRaises(IndexError):
assert len(sr[2].headers) == 0
assert len(sr[2].slices) == 0
assert first_frame_index == sr[0].headers[0].frame_index
assert last_frame_index == sr[1].headers[0].frame_index
assert last_frame_index == sr[-1].headers[0].frame_index
assert first_frame_index == sr[-2].headers[0].frame_index
def test_iterate_with_step_size_one(self):
sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1)
i = 0
for ss in sr:
i += 1
assert i == 3
def test_iterate_with_step_size_bigger_than_number_of_elements(self):
sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1000)
i = 0
for ss in sr:
i += 1
assert i == 1
def test_iterate_gives_correct_data_slices(self):
sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1000)
for ss in sr:
self.assertListEqual(
ss.slices[0].dom_id[:3].to_list(), [806451572, 806455814, 806465101]
)
self.assertListEqual(
ss.slices[0].dom_id[-3:].to_list(), [809526097, 809544058, 809544061]
)
assert len(ss.slices) == 3
assert len(ss.slices[0]) == 64
assert len(ss.slices[1]) == 66
assert len(ss.slices[2]) == 68
self.assertListEqual(ss.slices[0].ch5[:3].to_list(), [75, 62, 55])
sr = SummarysliceReader(data_path("online/km3net_online.root"), step_size=1)
lengths = [64, 66, 68]
for idx, ss in enumerate(sr):
# self.assertListEqual(ss[0].dom_id[:3].to_list(), [806451572, 806455814, 806465101])
# self.assertListEqual(ss[0].dom_id[-3:].to_list(), [809526097, 809544058, 809544061])
assert len(ss.slices) == 1
assert len(ss.slices[0]) == lengths[idx]
assert len(ss.slices[0].dom_id) == lengths[idx]
assert len(ss.slices[0].ch3) == lengths[idx]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment