Newer
Older
from km3io.definitions import fitparameters as kfit
from km3io.tools import (
to_num,
cached_property,
unfold_indices,
unique,
uniquecount,
fitinf,
count_nested,
mask,
best_track,
get_w2list_param,
get_multiplicity,
best_jmuon,
best_jshower,
best_aashower,
best_dusjshower,
is_bit_set,
is_3dshower,
is_mxshower,
is_3dmuon,
OFFLINE_FILE = OfflineReader(data_path("offline/km3net_offline.root"))
OFFLINE_USR = OfflineReader(data_path("offline/usr-sample.root"))
OFFLINE_MC_TRACK_USR = OfflineReader(
data_path(
"offline/mcv5.11r2.gsg_muonCChigherE-CC_50-5000GeV.km3_AAv1.jterbr00004695.jchain.aanet.498.root"
)
)
GENHEN_OFFLINE_FILE = OfflineReader(
data_path("offline/mcv5.1.genhen_anumuNC.sirene.jte.jchain.aashower.sample.root")
)
GSEAGEN_OFFLINE_FILE = OfflineReader(data_path("offline/numucc.root"))
class TestFitinf(unittest.TestCase):
def setUp(self):
self.tracks = OFFLINE_FILE.events.tracks
self.fit = self.tracks.fitinf
self.best = self.tracks[:, 0]
self.best_fit = self.best.fitinf
beta = fitinf(kfit.JGANDALF_BETA0_RAD, self.tracks)
assert beta[0][0] == self.fit[0][0][0]
assert beta[0][1] == self.fit[0][1][0]
assert beta[0][2] == self.fit[0][2][0]
def test_fitinf_from_one_event(self):
beta = fitinf(kfit.JGANDALF_BETA0_RAD, self.best)
assert beta[0] == self.best_fit[0][0]
assert beta[1] == self.best_fit[1][0]
assert beta[2] == self.best_fit[2][0]
class TestBestTrackSelection(unittest.TestCase):
def setUp(self):
self.events = OFFLINE_FILE.events
self.one_event = OFFLINE_FILE.events[0]
def test_best_track_selection_from_multiple_events_with_explicit_stages_in_list(
best = best_track(self.events.tracks, stages=[1, 3, 5, 4])
assert best.rec_stages[0].tolist() == [1, 3, 5, 4]
assert best.rec_stages[1].tolist() == [1, 3, 5, 4]
assert best.rec_stages[2].tolist() == [1, 3, 5, 4]
assert best.rec_stages[3].tolist() == [1, 3, 5, 4]
# test with a shorter set of rec_stages
best2 = best_track(self.events.tracks, stages=[1, 3])
assert len(best2) == 10
assert best2.rec_stages[0].tolist() == [1, 3]
assert best2.rec_stages[1].tolist() == [1, 3]
assert best2.rec_stages[2].tolist() == [1, 3]
assert best2.rec_stages[3].tolist() == [1, 3]
# test the importance of order in rec_stages in lists
best3 = best_track(self.events.tracks, stages=[3, 1])
assert len(best3) == 10
assert best3.rec_stages[0] is None
assert best3.rec_stages[1] is None
assert best3.rec_stages[2] is None
assert best3.rec_stages[3] is None
def test_best_track_selection_from_multiple_events_with_a_set_of_stages(
best = best_track(self.events.tracks, stages={1, 3, 4, 5})
assert best.rec_stages[0].tolist() == [1, 3, 5, 4]
assert best.rec_stages[1].tolist() == [1, 3, 5, 4]
assert best.rec_stages[2].tolist() == [1, 3, 5, 4]
assert best.rec_stages[3].tolist() == [1, 3, 5, 4]
best2 = best_track(self.events.tracks, stages={1, 3})
for rec_stages in best2.rec_stages:
for stage in {1, 3}:
def test_best_track_selection_from_multiple_events_with_start_end(self):
best = best_track(self.events.tracks, startend=(1, 4))
assert best.rec_stages[0].tolist() == [1, 3, 5, 4]
assert best.rec_stages[1].tolist() == [1, 3, 5, 4]
assert best.rec_stages[2].tolist() == [1, 3, 5, 4]
assert best.rec_stages[3].tolist() == [1, 3, 5, 4]
best2 = best_track(self.events.tracks, startend=(1, 3))
assert best2.rec_stages[0].tolist() == [1, 3]
assert best2.rec_stages[1].tolist() == [1, 3]
assert best2.rec_stages[2].tolist() == [1, 3]
assert best2.rec_stages[3].tolist() == [1, 3]
# test the importance of start as a real start of rec_stages
best3 = best_track(self.events.tracks, startend=(0, 3))
assert len(best3) == 10
assert best3.rec_stages[0] is None
assert best3.rec_stages[1] is None
assert best3.rec_stages[2] is None
assert best3.rec_stages[3] is None
# test the importance of end as a real end of rec_stages
best4 = best_track(self.events.tracks, startend=(1, 10))
assert len(best4) == 10
assert best4.rec_stages[0] is None
assert best4.rec_stages[1] is None
assert best4.rec_stages[2] is None
assert best4.rec_stages[3] is None
def test_best_track_from_a_single_event(self):
best = best_track(self.one_event.tracks, stages=[1, 3, 5, 4])
assert best.lik == ak.max(self.one_event.tracks.lik)
assert np.allclose(best.rec_stages.tolist(), [1, 3, 5, 4])
best2 = best_track(self.one_event.tracks, stages={1, 3, 4, 5})
assert best2.lik == ak.max(self.one_event.tracks.lik)
assert np.allclose(best2.rec_stages.tolist(), [1, 3, 5, 4])
best3 = best_track(self.one_event.tracks, startend=(1, 4))
assert best3.lik == ak.max(self.one_event.tracks.lik)
assert np.allclose(best3.rec_stages.tolist(), [1, 3, 5, 4])
tracks_slice = self.one_event.tracks[self.one_event.tracks.rec_type == 4000]
best = best_track(tracks_slice, stages=[1, 3, 5, 4])
assert best.lik == ak.max(tracks_slice.lik)
assert best.rec_stages.tolist() == [1, 3, 5, 4]
best2 = best_track(tracks_slice, stages={1, 3, 4, 5})
assert best2.rec_stages.tolist() == [1, 3, 5, 4]
tracks_slice = self.one_event.tracks[0:5]
assert best.lik == ak.max(tracks_slice.lik)
assert best.rec_stages[0] == 1
assert best.rec_stages[-1] == 4
def test_best_track_on_slices_with_explicit_rec_stages_one_event(self):
tracks_slice = self.one_event.tracks[0:5]
best = best_track(tracks_slice, stages=[1, 3, 5, 4])
assert best.lik == ak.max(tracks_slice.lik)
assert best.rec_stages[0] == 1
assert best.rec_stages[-1] == 4
def test_best_track_on_slices_multiple_events(self):
# stages in list
best = best_track(tracks_slice, stages=[1, 3, 5, 4])
assert len(best) == 5
assert np.allclose(
best.lik.tolist(),
[
294.6407542676734,
96.75133289411137,
560.2775306614813,
278.2872951665753,
99.59098153341449,
],
)
for i in range(len(best)):
assert best.rec_stages[i].tolist() == [1, 3, 5, 4]
assert np.allclose(
best.lik.tolist(),
[
294.6407542676734,
96.75133289411137,
560.2775306614813,
278.2872951665753,
99.59098153341449,
],
)
for i in range(len(best)):
assert best.rec_stages[i].tolist() == [1, 3, 5, 4]
start, end = (1, 4)
best = best_track(tracks_slice, startend=(start, end))
assert np.allclose(
best.lik.tolist(),
[
294.6407542676734,
96.75133289411137,
560.2775306614813,
278.2872951665753,
99.59098153341449,
],
)
for i in range(len(best)):
rs = best.rec_stages[i].tolist()
assert rs[0] == start
assert rs[-1] == end
def test_best_track_raises_when_unknown_stages(self):
with self.assertRaises(ValueError):
best_track(self.events.tracks)
def test_best_track_raises_when_too_many_inputs(self):
with self.assertRaises(ValueError):
best_track(self.events.tracks, startend=(1, 4), stages=[1, 3, 5, 4])
class TestBestJmuon(unittest.TestCase):
def test_best_jmuon(self):
best = best_jmuon(OFFLINE_FILE.events.tracks)
assert len(best) == 10
assert best.rec_stages[0].tolist() == [1, 3, 5, 4]
assert best.rec_stages[1].tolist() == [1, 3, 5, 4]
assert best.rec_stages[2].tolist() == [1, 3, 5, 4]
assert best.rec_stages[3].tolist() == [1, 3, 5, 4]
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
assert best.lik[0] == ak.max(OFFLINE_FILE.events.tracks.lik[0])
assert best.lik[1] == ak.max(OFFLINE_FILE.events.tracks.lik[1])
assert best.lik[2] == ak.max(OFFLINE_FILE.events.tracks.lik[2])
class TestBestJshower(unittest.TestCase):
def test_best_jshower(self):
# there are no jshower events in this file
best = best_jshower(OFFLINE_FILE.events.tracks)
assert len(best) == 10
assert best.rec_stages[0] is None
assert best.rec_stages[1] is None
assert best.rec_stages[2] is None
assert best.rec_stages[3] is None
assert best.lik[0] is None
assert best.lik[1] is None
assert best.lik[2] is None
class TestBestAashower(unittest.TestCase):
def test_best_aashower(self):
# there are no aashower events in this file
best = best_aashower(OFFLINE_FILE.events.tracks)
assert len(best) == 10
assert best.rec_stages[0] is None
assert best.rec_stages[1] is None
assert best.rec_stages[2] is None
assert best.rec_stages[3] is None
assert best.lik[0] is None
assert best.lik[1] is None
assert best.lik[2] is None
class TestBestDusjshower(unittest.TestCase):
def test_best_dusjshower(self):
# there are no aashower events in this file
best = best_dusjshower(OFFLINE_FILE.events.tracks)
assert len(best) == 10
assert best.rec_stages[0] is None
assert best.rec_stages[1] is None
assert best.rec_stages[2] is None
assert best.rec_stages[3] is None
assert best.lik[0] is None
assert best.lik[1] is None
assert best.lik[2] is None
class TestGetMultiplicity(unittest.TestCase):
def test_get_multiplicity(self):
multiplicity = get_multiplicity(OFFLINE_FILE.events.tracks, [1, 3, 5, 4])
assert len(multiplicity) == 10
assert multiplicity[0] == 1
assert multiplicity[1] == 1
assert multiplicity[2] == 1
assert multiplicity[3] == 1
# test with no nexisting rec_stages
multiplicity2 = get_multiplicity(OFFLINE_FILE.events.tracks, [1, 2, 3, 4, 5])
assert len(multiplicity2) == 10
assert multiplicity2[0] == 0
assert multiplicity2[1] == 0
assert multiplicity2[2] == 0
assert multiplicity2[3] == 0
class TestCountNested(unittest.TestCase):
def test_count_nested(self):
fit = OFFLINE_FILE.events.tracks.fitinf
assert count_nested(fit, axis=0) == 10
assert count_nested(fit, axis=1)[0:4].tolist() == [56, 55, 56, 56]
assert count_nested(fit, axis=2)[0][0:4].tolist() == [17, 11, 8, 8]
class TestRecStagesMasks(unittest.TestCase):
def setUp(self):
self.nested = ak.Array(
[
[[1, 2, 3], [1, 2, 3], [1]],
[[0], [1, 2, 3]],
[[0], [0, 1, 3], [0], [1, 2, 3], [1, 2, 3]],
]
)
def test_mask_with_explicit_rec_stages_in_list_with_multiple_events(self):
masks = mask(self.tracks.rec_stages, sequence=stages)
assert masks[0][0] == all(rec_stages[0][0] == ak.Array(stages))
assert masks[1][0] == all(rec_stages[1][0] == ak.Array(stages))
assert masks[0][1] == False
def test_mask_with_atleast_on_multiple_events(self):
stages = [1, 3, 4, 5]
masks = mask(self.tracks.rec_stages, atleast=stages)
tracks = self.tracks[masks]
assert 1 in tracks.rec_stages[0][0]
assert 3 in tracks.rec_stages[0][0]
assert 4 in tracks.rec_stages[0][0]
assert 5 in tracks.rec_stages[0][0]
def test_mask_with_start_and_end_of_rec_stages_with_multiple_events(self):
rec_stages = self.tracks.rec_stages
stages = [1, 3, 5, 4]
masks = mask(self.tracks.rec_stages, startend=(1, 4))
assert masks[0][0] == all(rec_stages[0][0] == ak.Array(stages))
assert masks[1][0] == all(rec_stages[1][0] == ak.Array(stages))
assert masks[0][1] == False
def test_mask_with_start_and_end_of_rec_stages_signle_event(self):
rec_stages = self.tracks.rec_stages[0][0]
stages = [1, 3, 5, 4]
track = self.tracks[0]
masks = mask(track.rec_stages, startend=(1, 4))
assert track[masks].rec_stages[0][0] == 1
assert track[masks].rec_stages[0][-1] == 4
def test_mask_with_explicit_rec_stages_with_single_event(self):
rec_stages = self.tracks.rec_stages[0][0]
stages = [1, 3]
track = self.tracks[0]
masks = mask(track.rec_stages, sequence=stages)
assert track[masks].rec_stages[0][0] == stages[0]
assert track[masks].rec_stages[0][1] == stages[1]
def test_mask_raises_when_no_inputs(self):
with self.assertRaises(ValueError):
mask(self.tracks)
class TestMask(unittest.TestCase):
def test_minmax_2dim_mask(self):
arr = ak.Array([[1, 2, 3, 4], [3, 4, 5], [1, 2, 5]])
m = mask(arr, minmax=(1, 4))
self.assertListEqual(m.tolist(), [True, False, False])
def test_minmax_3dim_mask(self):
arr = ak.Array([[[1, 2, 3, 4], [3, 4, 5], [1, 2, 5]], [[1, 2, 3]]])
m = mask(arr, minmax=(1, 4))
self.assertListEqual(m.tolist(), [[True, False, False], [True]])
def test_minmax_4dim_mask(self):
arr = ak.Array(
[[[[1, 2, 3, 4], [3, 4, 5], [1, 2, 5]], [[1, 2, 3]]], [[[1, 9], [3, 3]]]]
)
m = mask(arr, minmax=(1, 4))
self.assertListEqual(
m.tolist(), [[[True, False, False], [True]], [[False, True]]]
)
class TestUnique(unittest.TestCase):
def run_random_test_with_dtype(self, dtype):
max_range = 100
for i in range(23):
low = np.random.randint(0, max_range)
high = np.random.randint(low + 1, low + 2 + np.random.randint(max_range))
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
n = np.random.randint(max_range)
arr = np.random.randint(low, high, n).astype(dtype)
np_reference = np.sort(np.unique(arr))
result = np.sort(unique(arr, dtype=dtype))
try:
np.allclose(np_reference, result, atol=1e-1)
except ValueError:
print("low:", low)
print("high:", high)
print("n:", n)
print("arr =", list(arr))
print("np.unique(arr) =", np_reference)
print("unique(arr) =", result)
assert False
def test_unique_with_dtype_int8(self):
self.run_random_test_with_dtype(np.int8)
def test_unique_with_dtype_int16(self):
self.run_random_test_with_dtype(np.int16)
def test_unique_with_dtype_int32(self):
self.run_random_test_with_dtype(np.int32)
def test_unique_with_dtype_int64(self):
self.run_random_test_with_dtype(np.int64)
def test_unique_with_dtype_uint8(self):
self.run_random_test_with_dtype(np.uint8)
def test_unique_with_dtype_uint16(self):
self.run_random_test_with_dtype(np.uint16)
def test_unique_with_dtype_uint32(self):
self.run_random_test_with_dtype(np.uint32)
def test_unique_with_dtype_uint64(self):
self.run_random_test_with_dtype(np.uint64)
class TestUniqueCount(unittest.TestCase):
def test_uniquecount(self):
arr = ak.Array([[1, 2, 3], [2, 2, 2], [3, 4, 5, 6, 6], [4, 4, 3, 1]])
assert np.allclose([3, 1, 4, 3], uniquecount(arr))
def test_uniquecount_with_empty_subarrays(self):
arr = ak.Array([[1, 2, 3], [2, 2, 2], [], [4, 4, 3, 1]])
assert np.allclose([3, 1, 0, 3], uniquecount(arr))
class TestToNum(unittest.TestCase):
def test_to_num(self):
self.assertEqual(10, to_num("10"))
self.assertEqual(10.5, to_num("10.5"))
self.assertEqual("test", to_num("test"))
self.assertIsNone(to_num(None))
class TestCachedProperty(unittest.TestCase):
def test_cached_property(self):
class Test:
@cached_property
def prop(self):
pass
self.assertTrue(isinstance(Test.prop, cached_property))
class TestUnfoldIndices(unittest.TestCase):
def test_unfold_indices(self):
data = range(10)
indices = [slice(2, 5), 0]
assert data[indices[0]][indices[1]] == unfold_indices(data, indices)
assert data[indices[0]][indices[1]][indices[2]] == unfold_indices(data, indices)
def test_unfold_indices_raises_index_error(self):
data = range(10)
indices = [slice(2, 5), 99]
with self.assertRaises(IndexError):
def test_is_cc(self):
NC_file = is_cc(GENHEN_OFFLINE_FILE)
CC_file = is_cc(GSEAGEN_OFFLINE_FILE)
self.assertFalse(
all(NC_file) == True
) # this test fails because the CC flags are not reliable in old files
assert np.allclose(
[118.6302815337638, 44.33580521344907, 99.93916717621543],
)
assert np.allclose(
[37.51967774166617, -10.280346193553832, 13.67595659707355],
[0.0487],
usr(OFFLINE_MC_TRACK_USR.mc_tracks[0], "bx").tolist(),
class TestIsBitSet(unittest.TestCase):
def test_is_bit_set_for_single_values(self):
value = 2 # 10
assert not is_bit_set(value, 0)
assert is_bit_set(value, 1)
value = 42 # 101010
assert not is_bit_set(value, 0)
assert is_bit_set(value, 1)
assert not is_bit_set(value, 2)
assert is_bit_set(value, 3)
assert not is_bit_set(value, 4)
assert is_bit_set(value, 5)
def test_is_bit_set_for_lists(self):
values = [2, 42, 4]
assert np.allclose([True, True, False], is_bit_set(values, 1))
def test_is_bit_set_for_numpy_arrays(self):
values = np.array([2, 42, 4])
assert np.allclose([True, True, False], is_bit_set(values, 1))
def test_is_bit_set_for_awkward_arrays(self):
values = ak.Array([2, 42, 4])
assert np.allclose([True, True, False], list(is_bit_set(values, 1)))
class TestTriggerMaskChecks(unittest.TestCase):
def test_is_3dshower(self):
assert np.allclose(
[True, True, True, True, True, False, False, True, True, True],
list(is_3dshower(OFFLINE_FILE.events.trigger_mask)),
)
assert np.allclose(
[True, True, True, True, True, True, True, True, True, False],
list(is_3dshower(GENHEN_OFFLINE_FILE.events.trigger_mask)),
)
def test_is_mxshower(self):
assert np.allclose(
[True, True, True, True, True, True, True, True, True, True],
list(is_mxshower(OFFLINE_FILE.events.trigger_mask)),
)
assert np.allclose(
[False, False, False, False, False, False, False, False, False, False],
list(is_mxshower(GENHEN_OFFLINE_FILE.events.trigger_mask)),
)
def test_is_3dmuon(self):
assert np.allclose(
[True, True, True, True, True, False, False, True, True, True],
list(is_3dmuon(OFFLINE_FILE.events.trigger_mask)),
)
assert np.allclose(
[False, False, False, True, False, False, True, False, True, True],
list(is_3dmuon(GENHEN_OFFLINE_FILE.events.trigger_mask)),
def test_is_nanobeacon(self):
assert np.allclose(
[False, False, False, False, False, False, False, False, False, False],
list(is_nanobeacon(OFFLINE_FILE.events.trigger_mask)),
)
assert np.allclose(
[False, False, False, False, False, False, False, False, False, False],
list(is_nanobeacon(GENHEN_OFFLINE_FILE.events.trigger_mask)),
)