Skip to content
Snippets Groups Projects

Refactor offline

Closed Tamas Gal requested to merge refactor-offline into master
1 file
+ 62
5
Compare changes
  • Side-by-side
  • Inline
+ 295
310
@@ -2,389 +2,374 @@ import unittest
import numpy as np
from pathlib import Path
from km3io.offline import OfflineEvents, OfflineHits, OfflineTracks
from km3io import OfflineReader
from km3io.offline import _nested_mapper, Header
SAMPLES_DIR = Path(__file__).parent / 'samples'
OFFLINE_FILE = SAMPLES_DIR / 'aanet_v2.0.0.root'
OFFLINE_USR = SAMPLES_DIR / 'usr-sample.root'
OFFLINE_NUMUCC = SAMPLES_DIR / "numucc.root" # with mc data
class TestOfflineKeys(unittest.TestCase):
def setUp(self):
self.keys = OfflineReader(OFFLINE_FILE).keys
def test_events_keys(self):
# there are 22 "valid" events keys
self.assertEqual(len(self.keys.events_keys), 22)
self.assertEqual(len(self.keys.cut_events_keys), 22)
def test_hits_keys(self):
# there are 20 "valid" hits keys
self.assertEqual(len(self.keys.hits_keys), 20)
self.assertEqual(len(self.keys.mc_hits_keys), 20)
self.assertEqual(len(self.keys.cut_hits_keys), 20)
def test_tracks_keys(self):
# there are 22 "valid" tracks keys
self.assertEqual(len(self.keys.tracks_keys), 22)
self.assertEqual(len(self.keys.mc_tracks_keys), 22)
self.assertEqual(len(self.keys.cut_tracks_keys), 22)
def test_valid_keys(self):
# there are 106 valid keys: 22*2 + 22 + 20*2
# (fit keys are excluded)
self.assertEqual(len(self.keys.valid_keys), 106)
def test_fit_keys(self):
# there are 18 fit keys
self.assertEqual(len(self.keys.fit_keys), 18)
OFFLINE_FILE = OfflineReader(SAMPLES_DIR / 'aanet_v2.0.0.root')
OFFLINE_USR = OfflineReader(SAMPLES_DIR / 'usr-sample.root')
OFFLINE_NUMUCC = OfflineReader(SAMPLES_DIR / "numucc.root") # with mc data
class TestOfflineReader(unittest.TestCase):
def setUp(self):
self.r = OfflineReader(OFFLINE_FILE)
self.nu = OfflineReader(OFFLINE_NUMUCC)
self.Nevents = 10
self.r = OFFLINE_FILE
self.nu = OFFLINE_NUMUCC
self.n_events = 10
def test_number_events(self):
Nevents = len(self.r)
# check that there are 10 events
self.assertEqual(Nevents, self.Nevents)
def test_find_empty(self):
fitinf = self.nu.tracks.fitinf
rec_stages = self.nu.tracks.rec_stages
empty_fitinf = np.array(
[match for match in self.nu._find_empty(fitinf)])
empty_stages = np.array(
[match for match in self.nu._find_empty(rec_stages)])
self.assertListEqual(empty_fitinf[:5, 1].tolist(),
[23, 14, 14, 4, None])
self.assertListEqual(empty_stages[:5, 1].tolist(),
[False, False, False, False, None])
def test_find_rec_stages(self):
stages = np.array(
[match for match in self.nu._find_rec_stages([1, 2, 3, 4, 5])])
self.assertListEqual(stages[:5, 1].tolist(), [0, 0, 0, 0, None])
def test_get_reco_fit(self):
JGANDALF_BETA0_RAD = [
0.0020367251782607574, 0.003306725805622178, 0.0057877124222254885,
0.015581698352185896
]
reco_fit = self.nu.get_reco_fit([1, 2, 3, 4, 5])['JGANDALF_BETA0_RAD']
self.assertListEqual(JGANDALF_BETA0_RAD, reco_fit[:4].tolist())
with self.assertRaises(ValueError):
self.nu.get_reco_fit([1000, 4512, 5625], mc=True)
def test_get_reco_hits(self):
doms = self.nu.get_reco_hits([1, 2, 3, 4, 5], ["dom_id"])["dom_id"]
mc_doms = self.nu.get_reco_hits([], ["dom_id"], mc=True)["dom_id"]
self.assertEqual(doms.size, 9)
self.assertEqual(mc_doms.size, 10)
self.assertListEqual(doms[0][0:4].tolist(),
self.nu.hits[0].dom_id[0:4].tolist())
self.assertListEqual(mc_doms[0][0:4].tolist(),
self.nu.mc_hits[0].dom_id[0:4].tolist())
with self.assertRaises(ValueError):
self.nu.get_reco_hits([1000, 4512, 5625], ["dom_id"])
def test_get_reco_tracks(self):
assert self.n_events == len(self.r.events)
pos = self.nu.get_reco_tracks([1, 2, 3, 4, 5], ["pos_x"])["pos_x"]
mc_pos = self.nu.get_reco_tracks([], ["pos_x"], mc=True)["pos_x"]
self.assertEqual(pos.size, 9)
self.assertEqual(mc_pos.size, 10)
self.assertEqual(pos[0], self.nu.tracks[0].pos_x[0])
self.assertEqual(mc_pos[0], self.nu.mc_tracks[0].pos_x[0])
with self.assertRaises(ValueError):
self.nu.get_reco_tracks([1000, 4512, 5625], ["pos_x"])
def test_get_reco_events(self):
hits = self.nu.get_reco_events([1, 2, 3, 4, 5], ["hits"])["hits"]
mc_hits = self.nu.get_reco_events([], ["mc_hits"], mc=True)["mc_hits"]
self.assertEqual(hits.size, 9)
self.assertEqual(mc_hits.size, 10)
self.assertListEqual(hits[0:4].tolist(),
self.nu.events.hits[0:4].tolist())
self.assertListEqual(mc_hits[0:4].tolist(),
self.nu.events.mc_hits[0:4].tolist())
with self.assertRaises(ValueError):
self.nu.get_reco_events([1000, 4512, 5625], ["hits"])
def test_get_max_reco_stages(self):
rec_stages = self.nu.tracks.rec_stages
max_reco = self.nu._get_max_reco_stages(rec_stages)
self.assertEqual(len(max_reco.tolist()), 9)
self.assertListEqual(max_reco[0].tolist(), [[1, 2, 3, 4, 5], 5, 0])
def test_best_reco(self):
JGANDALF_BETA1_RAD = [
0.0014177681261476852, 0.002094094517471032, 0.003923368624980349,
0.009491461076780453
]
best = self.nu.get_best_reco()
self.assertEqual(best.size, 9)
self.assertEqual(best['JGANDALF_BETA1_RAD'][:4].tolist(),
JGANDALF_BETA1_RAD)
def test_reading_header(self):
# head is the supported format
head = OfflineReader(OFFLINE_NUMUCC).header
self.assertEqual(float(head['DAQ']), 394)
self.assertEqual(float(head['kcut']), 2)
class TestHeader(unittest.TestCase):
def test_str_header(self):
assert "MC Header" in str(OFFLINE_NUMUCC.header)
def test_warning_if_unsupported_header(self):
# test the warning for unsupported fheader format
with self.assertWarns(UserWarning):
self.r.header
OFFLINE_FILE.header
def test_missing_key_definitions(self):
head = {'a': '1 2 3', 'b': '4', 'c': 'd'}
header = Header(head)
assert 1 == header.a.field_0
assert 2 == header.a.field_1
assert 3 == header.a.field_2
assert 4 == header.b
assert 'd' == header.c
def test_missing_values(self):
head = {'can': '1'}
header = Header(head)
assert 1 == header.can.zmin
assert header.can.zmax is None
assert header.can.r is None
def test_additional_values_compared_to_definition(self):
head = {'can': '1 2 3 4'}
header = Header(head)
assert 1 == header.can.zmin
assert 2 == header.can.zmax
assert 3 == header.can.r
assert 4 == header.can.field_3
def test_header(self):
head = {
'DAQ': '394',
'PDF': '4',
'can': '0 1027 888.4',
'undefined': '1 2 test 3.4'
}
header = Header(head)
assert 394 == header.DAQ.livetime
assert 4 == header.PDF.i1
assert header.PDF.i2 is None
assert 0 == header.can.zmin
assert 1027 == header.can.zmax
assert 888.4 == header.can.r
assert 1 == header.undefined.field_0
assert 2 == header.undefined.field_1
assert "test" == header.undefined.field_2
assert 3.4 == header.undefined.field_3
def test_reading_header_from_sample_file(self):
head = OFFLINE_NUMUCC.header
assert 394 == head.DAQ.livetime
assert 4 == head.PDF.i1
assert 58 == head.PDF.i2
assert 0 == head.coord_origin.x
assert 0 == head.coord_origin.y
assert 0 == head.coord_origin.z
assert 100 == head.cut_nu.Emin
assert 100000000.0 == head.cut_nu.Emax
assert -1 == head.cut_nu.cosTmin
assert 1 == head.cut_nu.cosTmax
assert "diffuse" == head.sourcemode
assert 100000.0 == head.ngen
class TestOfflineEvents(unittest.TestCase):
def setUp(self):
self.events = OfflineReader(OFFLINE_FILE).events
self.hits = {0: 176, 1: 125, -1: 105}
self.Nevents = 10
self.events = OFFLINE_FILE.events
self.n_events = 10
self.det_id = [44] * self.n_events
self.n_hits = [176, 125, 318, 157, 83, 60, 71, 84, 255, 105]
self.n_tracks = [56, 55, 56, 56, 56, 56, 56, 56, 54, 56]
self.t_sec = [
1567036818, 1567036818, 1567036820, 1567036816, 1567036816,
1567036816, 1567036822, 1567036818, 1567036818, 1567036820
]
self.t_ns = [
200000000, 300000000, 200000000, 500000000, 500000000, 500000000,
200000000, 500000000, 500000000, 400000000
]
def test_reading_hits(self):
# test item selection
for event_id, hit in self.hits.items():
self.assertEqual(hit, self.events.hits[event_id])
def test_len(self):
assert self.n_events == len(self.events)
def reading_tracks(self):
self.assertListEqual(list(self.events.trks[:3]), [56, 55, 56])
def test_attributes_available(self):
for key in self.events._keymap.keys():
getattr(self.events, key)
def test_item_selection(self):
for event_id, hit in self.hits.items():
self.assertEqual(hit, self.events[event_id].hits)
def test_attributes(self):
assert self.n_events == len(self.events.det_id)
self.assertListEqual(self.det_id, list(self.events.det_id))
self.assertListEqual(self.n_hits, list(self.events.n_hits))
self.assertListEqual(self.n_tracks, list(self.events.n_tracks))
self.assertListEqual(self.t_sec, list(self.events.t_sec))
self.assertListEqual(self.t_ns, list(self.events.t_ns))
def test_len(self):
self.assertEqual(len(self.events), self.Nevents)
def test_keys(self):
assert np.allclose(self.n_hits, self.events['n_hits'])
assert np.allclose(self.n_tracks, self.events['n_tracks'])
assert np.allclose(self.t_sec, self.events['t_sec'])
assert np.allclose(self.t_ns, self.events['t_ns'])
def test_IndexError(self):
# test handling IndexError with empty lists/arrays
self.assertEqual(len(OfflineEvents(['whatever'], [])), 0)
def test_slicing(self):
s = slice(2, 8, 2)
s_events = self.events[s]
assert 3 == len(s_events)
self.assertListEqual(self.n_hits[s], list(s_events.n_hits))
self.assertListEqual(self.n_tracks[s], list(s_events.n_tracks))
self.assertListEqual(self.t_sec[s], list(s_events.t_sec))
self.assertListEqual(self.t_ns[s], list(s_events.t_ns))
def test_slicing_consistency(self):
for s in [slice(1, 3), slice(2, 7, 3)]:
assert np.allclose(self.events[s].n_hits, self.events.n_hits[s])
def test_index_consistency(self):
for i in [0, 2, 5]:
assert np.allclose(self.events[i].n_hits, self.events.n_hits[i])
def test_index_chaining(self):
assert np.allclose(self.events[3:5].n_hits, self.events.n_hits[3:5])
assert np.allclose(self.events[3:5][0].n_hits,
self.events.n_hits[3:5][0])
assert np.allclose(self.events[3:5].hits[1].dom_id[4],
self.events.hits[3:5][1][4].dom_id)
assert np.allclose(self.events.hits[3:5][1][4].dom_id,
self.events[3:5][1][4].hits.dom_id)
def test_iteration(self):
i = 0
for event in self.events:
i += 1
assert 10 == i
def test_iteration_2(self):
n_hits = [e.n_hits for e in self.events]
assert np.allclose(n_hits, self.events.n_hits)
def test_str(self):
self.assertEqual(str(self.events), 'Number of events: 10')
assert str(self.n_events) in str(self.events)
def test_repr(self):
self.assertEqual(repr(self.events),
'<OfflineEvents: 10 parsed events>')
class TestOfflineEvent(unittest.TestCase):
def test_event(self):
self.event = OfflineReader(OFFLINE_FILE).events[0]
assert str(self.n_events) in repr(self.events)
class TestOfflineHits(unittest.TestCase):
def setUp(self):
self.hits = OfflineReader(OFFLINE_FILE).hits
self.lengths = {0: 176, 1: 125, -1: 105}
self.total_item_count = 1434
self.r_mc = OfflineReader(OFFLINE_NUMUCC)
self.Nevents = 10
def test_item_selection(self):
self.assertListEqual(list(self.hits[0].dom_id[:3]),
[806451572, 806451572, 806451572])
def test_IndexError(self):
# test handling IndexError with empty lists/arrays
self.assertEqual(len(OfflineHits(['whatever'], [])), 0)
def test_repr(self):
self.assertEqual(repr(self.hits), '<OfflineHits: 10 parsed elements>')
self.hits = OFFLINE_FILE.events.hits
self.n_hits = 10
self.dom_id = {
0: [
806451572, 806451572, 806451572, 806451572, 806455814,
806455814, 806455814, 806483369, 806483369, 806483369
],
5: [
806455814, 806487219, 806487219, 806487219, 806487226,
808432835, 808432835, 808432835, 808432835, 808432835
]
}
self.t = {
0: [
70104010., 70104016., 70104192., 70104123., 70103096.,
70103797., 70103796., 70104191., 70104223., 70104181.
],
5: [
81861237., 81859608., 81860586., 81861062., 81860357.,
81860627., 81860628., 81860625., 81860627., 81860629.
]
}
def test_attributes_available(self):
for key in self.hits._keymap.keys():
getattr(self.hits, key)
def test_channel_ids(self):
self.assertTrue(all(c >= 0 for c in self.hits.channel_id.min()))
self.assertTrue(all(c < 31 for c in self.hits.channel_id.max()))
def test_str(self):
self.assertEqual(str(self.hits), 'Number of hits: 10')
def test_reading_dom_id(self):
dom_ids = self.hits.dom_id
for event_id, length in self.lengths.items():
self.assertEqual(length, len(dom_ids[event_id]))
self.assertEqual(self.total_item_count, sum(dom_ids.count()))
self.assertListEqual([806451572, 806451572, 806451572],
list(dom_ids[0][:3]))
def test_reading_channel_id(self):
channel_ids = self.hits.channel_id
for event_id, length in self.lengths.items():
self.assertEqual(length, len(channel_ids[event_id]))
self.assertEqual(self.total_item_count, sum(channel_ids.count()))
assert str(self.n_hits) in str(self.hits)
self.assertListEqual([8, 9, 14], list(channel_ids[0][:3]))
# channel IDs are always between [0, 30]
self.assertTrue(all(c >= 0 for c in channel_ids.min()))
self.assertTrue(all(c < 31 for c in channel_ids.max()))
def test_reading_times(self):
ts = self.hits.t
for event_id, length in self.lengths.items():
self.assertEqual(length, len(ts[event_id]))
self.assertEqual(self.total_item_count, sum(ts.count()))
self.assertListEqual([70104010.0, 70104016.0, 70104192.0],
list(ts[0][:3]))
def test_reading_mc_pmt_id(self):
pmt_ids = self.r_mc.mc_hits.pmt_id
lengths = {0: 58, 2: 28, -1: 48}
def test_repr(self):
assert str(self.n_hits) in repr(self.hits)
for hit_id, length in lengths.items():
self.assertEqual(length, len(pmt_ids[hit_id]))
def test_attributes(self):
for idx, dom_id in self.dom_id.items():
self.assertListEqual(dom_id,
list(self.hits.dom_id[idx][:len(dom_id)]))
for idx, t in self.t.items():
assert np.allclose(t, self.hits.t[idx][:len(t)])
self.assertEqual(self.Nevents, len(pmt_ids))
def test_slicing(self):
s = slice(2, 8, 2)
s_hits = self.hits[s]
assert 3 == len(s_hits)
for idx, dom_id in self.dom_id.items():
self.assertListEqual(dom_id[s], list(self.hits.dom_id[idx][s]))
for idx, t in self.t.items():
self.assertListEqual(t[s], list(self.hits.t[idx][s]))
def test_slicing_consistency(self):
for s in [slice(1, 3), slice(2, 7, 3)]:
for idx in range(3):
assert np.allclose(self.hits.dom_id[idx][s],
self.hits[idx].dom_id[s])
assert np.allclose(OFFLINE_FILE.events[idx].hits.dom_id[s],
self.hits.dom_id[idx][s])
def test_index_consistency(self):
for idx, dom_ids in self.dom_id.items():
assert np.allclose(self.hits[idx].dom_id[:self.n_hits],
dom_ids[:self.n_hits])
assert np.allclose(
OFFLINE_FILE.events[idx].hits.dom_id[:self.n_hits],
dom_ids[:self.n_hits])
for idx, ts in self.t.items():
assert np.allclose(self.hits[idx].t[:self.n_hits],
ts[:self.n_hits])
assert np.allclose(OFFLINE_FILE.events[idx].hits.t[:self.n_hits],
ts[:self.n_hits])
self.assertListEqual([677, 687, 689], list(pmt_ids[0][:3]))
def test_keys(self):
assert "dom_id" in self.hits.keys()
class TestOfflineHit(unittest.TestCase):
class TestOfflineTracks(unittest.TestCase):
def setUp(self):
self.hit = OfflineReader(OFFLINE_FILE)[0].hits[0]
self.f = OFFLINE_FILE
self.tracks = OFFLINE_FILE.events.tracks
self.tracks_numucc = OFFLINE_NUMUCC
self.n_events = 10
def test_item_selection(self):
self.assertEqual(self.hit[0], self.hit.id)
self.assertEqual(self.hit[1], self.hit.dom_id)
def test_attributes_available(self):
for key in self.tracks._keymap.keys():
getattr(self.tracks, key)
class TestOfflineTracks(unittest.TestCase):
def setUp(self):
self.tracks = OfflineReader(OFFLINE_FILE).tracks
self.r_mc = OfflineReader(OFFLINE_NUMUCC)
self.Nevents = 10
@unittest.skip
def test_attributes(self):
for idx, dom_id in self.dom_id.items():
self.assertListEqual(dom_id,
list(self.hits.dom_id[idx][:len(dom_id)]))
for idx, t in self.t.items():
assert np.allclose(t, self.hits.t[idx][:len(t)])
def test_item_selection(self):
self.assertListEqual(list(self.tracks[0].dir_z[:2]),
[-0.872885221293917, -0.872885221293917])
def test_IndexError(self):
# test handling IndexError with empty lists/arrays
self.assertEqual(len(OfflineTracks(['whatever'], [])), 0)
def test_repr(self):
self.assertEqual(repr(self.tracks),
'<OfflineTracks: 10 parsed elements>')
def test_str(self):
self.assertEqual(str(self.tracks), 'Number of tracks: 10')
def test_reading_tracks_dir_z(self):
dir_z = self.tracks.dir_z
tracks_dir_z = {0: 56, 1: 55, 8: 54}
for track_id, n_dir in tracks_dir_z.items():
self.assertEqual(n_dir, len(dir_z[track_id]))
# check that there are 10 arrays of tracks.dir_z info
self.assertEqual(len(dir_z), self.Nevents)
def test_reading_mc_tracks_dir_z(self):
dir_z = self.r_mc.mc_tracks.dir_z
tracks_dir_z = {0: 11, 1: 25, 8: 13}
for track_id, n_dir in tracks_dir_z.items():
self.assertEqual(n_dir, len(dir_z[track_id]))
# check that there are 10 arrays of tracks.dir_z info
self.assertEqual(len(dir_z), self.Nevents)
self.assertListEqual([0.230189, 0.230189, 0.218663],
list(dir_z[0][:3]))
assert " 10 " in repr(self.tracks)
def test_slicing(self):
tracks = self.tracks
assert 10 == len(tracks)
# track_selection = tracks[2:7]
# assert 5 == len(track_selection)
# track_selection_2 = tracks[1:3]
# assert 2 == len(track_selection_2)
# for _slice in [
# slice(0, 0),
# slice(0, 1),
# slice(0, 2),
# slice(1, 5),
# slice(3, -2)
# ]:
# self.assertListEqual(list(tracks.E[:, 0][_slice]),
# list(tracks[_slice].E[:, 0]))
class TestOfflineTrack(unittest.TestCase):
self.assertEqual(10, len(tracks))
self.assertEqual(1, len(tracks[0]))
track_selection = tracks[2:7]
assert 5 == len(track_selection)
track_selection_2 = tracks[1:3]
assert 2 == len(track_selection_2)
for _slice in [
slice(0, 0),
slice(0, 1),
slice(0, 2),
slice(1, 5),
slice(3, -2)
]:
self.assertListEqual(list(tracks.E[:, 0][_slice]),
list(tracks[_slice].E[:, 0]))
def test_nested_indexing(self):
self.assertAlmostEqual(
self.f.events.tracks.fitinf[3:5][1][9][2],
self.f.events[3:5].tracks[1].fitinf[9][2])
self.assertAlmostEqual(
self.f.events.tracks.fitinf[3:5][1][9][2],
self.f.events[3:5][1][9][2].tracks.fitinf)
self.assertAlmostEqual(
self.f.events.tracks.fitinf[3:5][1][9][2],
self.f.events[3:5][1].tracks[9][2].fitinf)
self.assertAlmostEqual(
self.f.events.tracks.fitinf[3:5][1][9][2],
self.f.events[3:5][1].tracks[9].fitinf[2])
class TestBranchIndexingMagic(unittest.TestCase):
def setUp(self):
self.track = OfflineReader(OFFLINE_FILE)[0].tracks[0]
self.events = OFFLINE_FILE.events
def test_item_selection(self):
self.assertEqual(self.track[0], self.track.fUniqueID)
self.assertEqual(self.track[10], self.track.E)
def test_foo(self):
self.assertEqual(318, self.events[2:4].n_hits[0])
assert np.allclose(self.events[3].tracks.dir_z[10],
self.events.tracks.dir_z[3, 10])
assert np.allclose(self.events[3:6].tracks.pos_y[:, 0],
self.events.tracks.pos_y[3:6, 0])
def test_str(self):
self.assertEqual(str(self.track).split('\n\t')[0], 'offline track:')
# test selecting with a list
self.assertEqual(3, len(self.events[[0, 2, 3]]))
class TestUsr(unittest.TestCase):
def setUp(self):
self.f = OfflineReader(OFFLINE_USR)
def test_str(self):
print(self.f.usr)
self.f = OFFLINE_USR
def test_nonexistent_usr(self):
f = OfflineReader(SAMPLES_DIR / "daq_v1.0.0.root")
self.assertListEqual([], f.usr.keys())
def test_str_flat(self):
print(self.f.events.usr)
def test_keys(self):
def test_keys_flat(self):
self.assertListEqual([
'RecoQuality', 'RecoNDF', 'CoC', 'ToT', 'ChargeAbove',
'ChargeBelow', 'ChargeRatio', 'DeltaPosZ', 'FirstPartPosZ',
'LastPartPosZ', 'NSnapHits', 'NTrigHits', 'NTrigDOMs',
'NTrigLines', 'NSpeedVetoHits', 'NGeometryVetoHits',
'ClassficationScore'
], self.f.usr.keys())
], self.f.events.usr.keys())
def test_getitem(self):
def test_getitem_flat(self):
assert np.allclose(
[118.6302815337638, 44.33580521344907, 99.93916717621543],
self.f.usr['CoC'])
self.f.events.usr['CoC'])
assert np.allclose(
[37.51967774166617, -10.280346193553832, 13.67595659707355],
self.f.usr['DeltaPosZ'])
self.f.events.usr['DeltaPosZ'])
def test_attributes(self):
@unittest.skip
def test_keys_nested(self):
self.assertListEqual(["a"], self.f.events.mc_tracks.usr.keys())
def test_attributes_flat(self):
assert np.allclose(
[118.6302815337638, 44.33580521344907, 99.93916717621543],
self.f.usr.CoC)
self.f.events.usr.CoC)
assert np.allclose(
[37.51967774166617, -10.280346193553832, 13.67595659707355],
self.f.usr.DeltaPosZ)
self.f.events.usr.DeltaPosZ)
class TestNestedMapper(unittest.TestCase):
def test_nested_mapper(self):
self.assertEqual('pos_x', _nested_mapper("trks.pos.x"))
Loading