Newer
Older
from collections import namedtuple
TIMESLICE_FRAME_BASKET_CACHE_SIZE = 523 * 1024 ** 2 # [byte]
SUMMARYSLICE_FRAME_BASKET_CACHE_SIZE = 523 * 1024 ** 2 # [byte]
BASKET_CACHE_SIZE = 110 * 1024 ** 2
BASKET_CACHE = uproot3.cache.ThreadSafeArrayCache(BASKET_CACHE_SIZE)
# Parameters for PMT rate conversions, since the rates in summary slices are
# stored as a single byte to save space. The values from 0-255 can be decoded
# using the `get_rate(value)` function, which will yield the actual rate
# in Hz.
MINIMAL_RATE_HZ = 2.0e3
MAXIMAL_RATE_HZ = 2.0e6
RATE_FACTOR = np.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255
CHANNEL_BITS_TEMPLATE = np.zeros(31, dtype=bool)
BranchConfiguration = namedtuple(
field_names=["branch_address", "interpretation"], typename="BranchConfiguration"
)
"""
A reader for summaryslices which are loaded as chunks given by step_size.
To be used as an iterator (`for chunks in SummarysliceReader(...): ...`)
"""
TREE_ADDR = "KM3NET_SUMMARYSLICE/KM3NET_SUMMARYSLICE"
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
_subbranches = [
BranchConfiguration(
"KM3NETDAQ::JDAQSummarysliceHeader",
uproot.interpretation.numerical.AsDtype(
[
(" cnt", "u4"),
(" vers", "u2"),
(" cnt2", "u4"),
(" vers2", "u2"),
(" cnt3", "u4"),
(" vers3", "u2"),
("detector_id", ">i4"),
("run", ">i4"),
("frame_index", ">i4"),
(" cnt4", "u4"),
(" vers4", "u2"),
("UTC_seconds", ">u4"),
("UTC_16nanosecondcycles", ">u4"),
]
),
),
BranchConfiguration(
"vector<KM3NETDAQ::JDAQSummaryFrame>",
uproot.interpretation.jagged.AsJagged(
uproot.interpretation.numerical.AsDtype(
[
("dom_id", ">i4"),
("dq_status", ">u4"),
("hrv", ">u4"),
("fifo", ">u4"),
("status3", ">u4"),
("status4", ">u4"),
]
+ [(f"ch{c}", "u1") for c in range(31)]
),
header_bytes=10,
),
),
]
def __init__(self, fobj, step_size=1000):
if isinstance(fobj, str):
self._fobj = uproot.open(fobj)
else:
self._fobj = fobj
self._step_size = step_size
self._branch = self._fobj[self.TREE_ADDR]
self.ChunksConstructor = namedtuple(
field_names=["headers", "slices"], typename="SummarysliceChunk"
)
def _chunks_generator(self):
for chunk in self._branch.iterate(
dict(self._subbranches), step_size=self._step_size
):
yield self.ChunksConstructor(
*[getattr(chunk, bc.branch_address) for bc in self._subbranches]
)
def __iter__(self):
self._chunks = self._chunks_generator()
return self
def __next__(self):
return next(self._chunks)
return int(np.ceil(self._branch.num_entries / self._step_size))
def __repr__(self):
return f"<{self.__class__.__name__} {self._branch.num_entries} items, step_size={self._step_size} ({len(self)} chunks)>"
@nb.vectorize(
[nb.int32(nb.int8), nb.int32(nb.int16), nb.int32(nb.int32), nb.int32(nb.int64)]
)
def get_rate(value): # pragma: no cover
"""Return the rate in Hz from the short int value"""
if value == 0:
return 0
else:
return MINIMAL_RATE_HZ * np.exp(value * RATE_FACTOR)
@nb.guvectorize(
"void(i8, b1[:], b1[:])", "(), (n) -> (n)", target="parallel", nopython=True
)
def unpack_bits(value, bits_template, out): # pragma: no cover
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Return a boolean array for a value's bit representation.
This function also accepts arrays as input, the output shape will be
NxM where N is the number of input values and M the length of the
``bits_template`` array, which is just a dummy array, due to the weird
signature system of numba.
Parameters
----------
value: int or np.array(int) with shape (N,)
The binary value of containing the bit information
bits_template: np.array() with shape (M,)
The template for the output array, the only important is its shape
Returns
-------
np.array(bool) either with shape (M,) or (N, M)
"""
for i in range(bits_template.shape[0]):
out[30 - i] = value & (1 << i) > 0
def get_channel_flags(value):
"""Returns the hrv/fifo flags for the PMT channels (hrv/fifo)
Parameters
----------
value : int32
The integer value to be parsed.
"""
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
flags = unpack_bits(channel_bits, CHANNEL_BITS_TEMPLATE)
return np.flip(flags, axis=-1)
def get_number_udp_packets(value):
"""Returns the number of received UDP packets (dq_status)
Parameters
----------
value : int32
The integer value to be parsed.
"""
return np.bitwise_and(value, 0x7FFF)
def get_udp_max_sequence_number(value):
"""Returns the maximum sequence number of the received UDP packets (dq_status)
Parameters
----------
value : int32
The integer value to be parsed.
"""
return np.right_shift(value, 16)
def has_udp_trailer(value):
"""Returns the UDP Trailer flag (fifo)
Parameters
----------
value : int32
The integer value to be parsed.
"""
return np.any(np.bitwise_and(value, np.left_shift(1, 31)))
class OnlineReader:
"""Reader for online ROOT files"""
self._uuid = binascii.hexlify(self._fobj._context.uuid).decode("ascii")
def close(self):
self._fobj.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
uproot3.interpret(tree["KM3NETDAQ::JDAQEventHeader"], cntvers=True)
uproot3.asjagged(
uproot3.astable(
uproot3.asdtype(
[
("dom_id", ">i4"),
("channel_id", "u1"),
("time", "<u4"),
("tot", "u1"),
]
)
),
skipbytes=10,
)
)
uproot3.asjagged(
uproot3.astable(
uproot3.asdtype(
[
("dom_id", ">i4"),
("channel_id", "u1"),
("time", "<u4"),
("tot", "u1"),
(" cnt", "u4"),
(" vers", "u2"),
("trigger_mask", ">u8"),
]
)
),
skipbytes=10,
)
)
self._events = OnlineEvents(headers, snapshot_hits, triggered_hits)
@property
def timeslices(self):
if self._timeslices is None:
@property
def summaryslices(self):
if self._summaryslices is None:
self._summaryslices = SummarysliceReader(
uproot.open(self._filename)
) # TODO: remove when using uproot4
class Timeslices:
"""A simple wrapper for timeslices"""
self._read_streams()
def _read_streams(self):
"""Read the L0, L1, L2 and SN streams if available"""
s.split(b"KM3NET_TIMESLICE_")[1].split(b";")[0]
for s in self._fobj.keys()
if b"KM3NET_TIMESLICE_" in s
)
tree = self._fobj[b"KM3NET_TIMESLICE_" + stream][
b"KM3NETDAQ::JDAQTimeslice"
]
headers = tree[b"KM3NETDAQ::JDAQTimesliceHeader"][b"KM3NETDAQ::JDAQHeader"][
b"KM3NETDAQ::JDAQChronometer"
]
if len(headers) == 0:
continue
superframes = tree[b"vector<KM3NETDAQ::JDAQSuperFrame>"]
hits_dtype = np.dtype([("pmt", "u1"), ("tdc", "<u4"), ("tot", "u1")])
hits_buffer = superframes[
uproot3.asjagged(
uproot3.astable(uproot3.asdtype(hits_dtype)), skipbytes=6
basketcache=uproot3.cache.ThreadSafeArrayCache(
TIMESLICE_FRAME_BASKET_CACHE_SIZE
),
)
self._timeslices[stream.decode("ascii")] = (
headers,
superframes,
hits_buffer,
)
setattr(
self,
stream.decode("ascii"),
TimesliceStream(headers, superframes, hits_buffer),
)
def stream(self, stream, idx):
ts = self._timeslices[stream]
return "Available timeslice streams: {}".format(
", ".join(s for s in self._timeslices.keys())
)
def __repr__(self):
return str(self)
def __init__(self, headers, superframes, hits_buffer):
# self.headers = headers.lazyarray(
# uproot3.asjagged(uproot3.astable(
# uproot3.asdtype(
# np.dtype([('a', 'i4'), ('b', 'i4'), ('c', 'i4'),
# ('d', 'i4'), ('e', 'i4')]))),
# skipbytes=6),
# basketcache=uproot3.cache.ThreadSafeArrayCache(
# TIMESLICE_FRAME_BASKET_CACHE_SIZE))
self.headers = headers
self.superframes = superframes
self._hits_buffer = hits_buffer
# def frames(self):
# n_hits = self._superframe[
# b'vector<KM3NETDAQ::JDAQSuperFrame>.numberOfHits'].lazyarray(
# basketcache=BASKET_CACHE)[self._idx]
# module_ids = self._superframe[
# b'vector<KM3NETDAQ::JDAQSuperFrame>.id'].lazyarray(basketcache=BASKET_CACHE)[self._idx]
# idx = 0
# for module_id, n_hits in zip(module_ids, n_hits):
# self._frames[module_id] = hits_buffer[idx:idx + n_hits]
# idx += n_hits
def __init__(self, header, superframe, hits_buffer, idx, stream):
self._frames = {}
self._superframe = superframe
self._hits_buffer = hits_buffer
@property
def frames(self):
if not self._frames:
self._read_frames()
return self._frames
def _read_frames(self):
"""Populate a dictionary of frames with the module ID as key"""
hits_buffer = self._hits_buffer[self._idx]
b"vector<KM3NETDAQ::JDAQSuperFrame>.numberOfHits"
].lazyarray(basketcache=BASKET_CACHE)[self._idx]
b"vector<KM3NETDAQ::JDAQSuperFrame>.id"
].lazyarray(basketcache=BASKET_CACHE)[self._idx]
module_ids = (
self._superframe[
b"vector<KM3NETDAQ::JDAQSuperFrame>.KM3NETDAQ::JDAQModuleIdentifier"
]
.lazyarray(
uproot3.asjagged(
uproot3.astable(uproot3.asdtype([("dom_id", ">i4")]))
),
basketcache=BASKET_CACHE,
)[self._idx]
.dom_id
)
idx = 0
for module_id, n_hits in zip(module_ids, n_hits):
def __len__(self):
if self._n_frames is None:
self._n_frames = len(
self._superframe[b"vector<KM3NETDAQ::JDAQSuperFrame>.id"].lazyarray(
basketcache=BASKET_CACHE
)[self._idx]
)
return "{} timeslice with {} frames.".format(self._stream, len(self))
return "<{}: {} entries>".format(self.__class__.__name__, len(self.header))
class OnlineEvents:
"""A simple wrapper for online events"""
def __init__(self, headers, snapshot_hits, triggered_hits):
self.headers = headers
self.snapshot_hits = snapshot_hits
self.triggered_hits = triggered_hits
def __getitem__(self, item):
return OnlineEvent(
self.headers[item], self.snapshot_hits[item], self.triggered_hits[item]
)
def __len__(self):
return len(self.headers)
def __str__(self):
return "Number of events: {}".format(len(self.headers))
def __repr__(self):
return str(self)
class OnlineEvent:
"""A wrapper for a online event"""
def __init__(self, header, snapshot_hits, triggered_hits):
self.header = header
self.snapshot_hits = snapshot_hits
self.triggered_hits = triggered_hits
def __str__(self):
return "Online event with {} snapshot hits and {} triggered hits".format(