Skip to content
Snippets Groups Projects
Commit 27acb916 authored by Stefan Reck's avatar Stefan Reck
Browse files

Merge branch 'v2.0.7' into 'master'

V2.0.7

See merge request !8
parents 2d1ec7ec fa954846
Branches
Tags
1 merge request!8V2.0.7
......@@ -9,6 +9,34 @@ results when used on data other then the specified.
import numpy as np
def get_4line_bin_edges_list(time_resolution="low"):
"""
Designed for four line detector.
XYZ/PM : 1 dom per bin
Time:
low:
100 bins, 6 ns/bin
mchits cut off (left/right): 1.34%, 4.20%
high:
300 bins, 3.33 ns/bin
mchits cut off (left/right): 0.6%, 0.73%
"""
time_resolutions = {
"low": np.linspace(-50, 550, 100 + 1),
"high": np.linspace(-100, 900, 300 + 1),
}
return [
["pos_x", np.array([-2 - 30, -2, -2 + 30])],
["pos_y", np.array([3.7 - 30, 3.7, 3.7 + 30])],
["pos_z", np.linspace(24, 198, 19)],
["time", time_resolutions[time_resolution]],
["channel_id", np.linspace(-0.5, 30.5, 31 + 1)],
]
def get_edges_2017_ztc():
"""
Designed for the 2017 runs with the one line detector.
......@@ -26,4 +54,3 @@ def get_edges_2017_ztc():
["channel_id", np.linspace(-0.5, 30.5, 31 + 1)],
]
return bin_edges_list
......@@ -203,11 +203,11 @@ class FileBinner:
plot_binstats.plot_hist_of_files(
files=outfiles, save_as=outfolder+"binning_hist.pdf")
def build_pipe(self, infile, outfile):
def build_pipe(self, infile, outfile, timeit=True):
"""
Build the pipeline to generate images and mc_info for a file.
"""
pipe = kp.Pipeline()
pipe = kp.Pipeline(timeit=timeit)
if self.n_statusbar is not None:
pipe.attach(km.common.StatusBar, every=self.n_statusbar)
......
......
......@@ -31,36 +31,38 @@ class McInfoMaker(kp.Module):
dtypes = [(key, np.float64) for key in track.keys()]
kp_hist = kp.dataclasses.Table(
track, dtype=dtypes, h5loc='y', name='event_info')
if len(kp_hist) != 1:
self.cprint(
"Warning: Extracted mc_info should have len 1, "
"but it has len {}".format(len(kp_hist))
)
blob[self.store_as] = kp_hist
return blob
class TimePreproc(kp.Module):
"""
Preprocess the time in the blob.
Can add t0 to hit times.
Times of hits and mchits can be centered with the time of the first
triggered hit.
Preprocess the time in the blob in various ways.
Attributes
----------
add_t0 : bool
If true, t0 will be added.
If true, t0 will be added to times of hits and mchits.
center_time : bool
If true, center hit and mchit times.
If true, center hit and mchit times with the time of the first
triggered hit.
subtract_t0_mchits : bool
It True, subtract t0 from the times of mchits.
"""
def configure(self):
self.add_t0 = self.require('add_t0')
self.center_time = self.get('center_time', default=True)
self.subtract_t0_mchits = self.get('subtract_t0_mchits', default=False)
self.has_mchits = None
self._t0_flag = False
self._cent_hits_flag = False
self._cent_mchits_flag = False
self._print_flags = set()
def process(self, blob):
if self.has_mchits is None:
......@@ -68,42 +70,49 @@ class TimePreproc(kp.Module):
if self.add_t0:
blob = self.add_t0_time(blob)
if self.subtract_t0_mchits and self.has_mchits:
blob = self.subtract_t0_mctime(blob)
blob = self.center_hittime(blob)
return blob
def add_t0_time(self, blob):
if not self._t0_flag:
self._t0_flag = True
self.cprint("Adding t0 to hit times")
self._print_once("Adding t0 to hit times")
blob["Hits"].time = np.add(blob["Hits"].time, blob["Hits"].t0)
if self.has_mchits:
blob["McHits"].time = np.add(blob["McHits"].time,
blob["McHits"].t0)
self._print_once("Adding t0 to mchit times")
blob["McHits"].time = np.add(
blob["McHits"].time, blob["McHits"].t0)
return blob
def subtract_t0_mctime(self, blob):
self._print_once("Subtracting t0 from mchits")
blob["McHits"].time = np.subtract(
blob["McHits"].time, blob["McHits"].t0)
return blob
def center_hittime(self, blob):
hits_time = blob["Hits"].time
hits_triggered = blob["Hits"].triggered
t_first_trigger = np.min(hits_time[hits_triggered == 1])
if self.center_time:
if not self._cent_hits_flag:
self.cprint("Centering time of Hits")
self._cent_hits_flag = True
self._print_once("Centering time of Hits with first triggered hit")
blob["Hits"].time = np.subtract(hits_time, t_first_trigger)
if self.has_mchits:
if not self._cent_mchits_flag:
self.cprint("Centering time of McHits")
self._cent_mchits_flag = True
self._print_once("Centering time of McHits with first triggered hit")
mchits_time = blob["McHits"].time
blob["McHits"].time = np.subtract(mchits_time, t_first_trigger)
return blob
def _print_once(self, text):
if text not in self._print_flags:
self._print_flags.add(text)
self.cprint(text)
class ImageMaker(kp.Module):
"""
......
......
......@@ -3,6 +3,7 @@
import numpy as np
import km3pipe as kp
import km3modules as km
import matplotlib.pyplot as plt
import orcasong.modules as modules
......@@ -42,6 +43,7 @@ class TimePlotter:
det_file=None,
center_time=True,
add_t0=False,
subtract_t0_mchits=False,
inactive_du=None):
if isinstance(files, str):
self.files = [files]
......@@ -50,6 +52,7 @@ class TimePlotter:
self.do_mchits = do_mchits
self.det_file = det_file
self.add_t0 = add_t0
self.subtract_t0_mchits = subtract_t0_mchits
self.center_time = center_time
self.inactive_du = inactive_du
......@@ -104,12 +107,18 @@ class TimePlotter:
else:
cal = None
if self.center_time or self.add_t0:
time_pp = modules.TimePreproc(add_t0=self.add_t0, center_time=self.center_time)
time_pp = modules.TimePreproc(
add_t0=self.add_t0,
center_time=self.center_time,
subtract_t0_mchits=self.subtract_t0_mchits,
)
else:
time_pp = None
pump = kp.io.hdf5.HDF5Pump(filename=file)
for blob in pump:
for i, blob in enumerate(pump):
if i % 1000 == 0:
print("Blob {}".format(i))
if cal is not None:
blob = cal.process(blob)
if time_pp is not None:
......
......
......@@ -195,7 +195,9 @@ def plot_hist_of_files(save_as, files=None):
hists_list = []
print("Plotting stats of {} file(s)".format(len(files)))
for file in files:
for i, file in enumerate(files, start=1):
if i % 100 == 0:
print(f"Reading file {i} / {len(files)} ...")
try:
hists_list.append(read_hists_from_h5file(file))
except KeyError:
......
......
import time
import h5py
import numpy as np
import argparse
import warnings
__author__ = 'Stefan Reck, Michael Moser'
class FileConcatenator:
"""
For concatenating many small h5 files to a single large one.
Attributes
----------
input_files : list
List that contains all filepaths of the input files.
comptopts : dict
Options for compression. They are read from the first input file.
E.g. complib
cumu_rows : np.array
The cumulative number of rows (axis_0) of the specified
input .h5 files (i.e. [0,100,200,300,...] if each file has 100 rows).
"""
def __init__(self, input_files):
self.input_files = input_files
print(f"Checking {len(self.input_files)} files ...")
self.cumu_rows = self._get_cumu_rows()
print(f"Total rows:\t{self.cumu_rows[-1]}")
# Get compression options from first file in the list
self.comptopts = get_compopts(self.input_files[0])
print("\n".join([f"{k}:\t{v}" for k, v in self.comptopts.items()]))
self._append_mc_index = False
@classmethod
def from_list(cls, list_file, n_files=None, **kwargs):
"""
Initialize with a .txt list containing the filepaths.
Parameters
----------
list_file : str
Path to a txt file containing the input filepaths, one per line.
n_files : int, optional
Only load these many files from the list.
"""
input_files = []
with open(list_file) as f:
for line in f:
filepath = line.rstrip('\n')
if filepath != "":
input_files.append(filepath)
if n_files is not None:
input_files = input_files[:n_files]
return cls(input_files, **kwargs)
def concatenate(self, output_filepath):
""" Concatenate input files and save output to given path. """
f_out = h5py.File(output_filepath, 'x')
start_time = time.time()
for n, input_file in enumerate(self.input_files):
print(f'Processing file {n+1}/{len(self.input_files)}: {input_file}')
f_in = h5py.File(input_file, 'r')
# create metadata
if n == 0 and 'format_version' in list(f_in.attrs.keys()):
f_out.attrs['format_version'] = f_in.attrs['format_version']
for folder_name in f_in:
if is_folder_ignored(folder_name):
# we dont need datasets created by pytables anymore
continue
folder_data = f_in[folder_name][()]
if n > 0 and folder_name in [
'event_info', 'group_info', 'x_indices', 'y']:
# we need to add the current number of the group_id / index in the file_output
# to the group_ids / indices of the file that is to be appended
column_name = 'group_id' if folder_name in [
'event_info', 'group_info', 'y'] else 'index'
# add 1 because the group_ids / indices start with 0
folder_data[column_name] += np.amax(
f_out[folder_name][column_name]) + 1
if self._append_mc_index and folder_name == "event_info":
folder_data = self._modify_event_info(input_file, folder_data)
if n == 0:
# first file; create the dummy dataset with no max shape
print(f"\tCreating dataset '{folder_name}' with shape "
f"{(self.cumu_rows[-1],) + folder_data.shape[1:]}")
output_dataset = f_out.create_dataset(
folder_name,
data=folder_data,
maxshape=(None,) + folder_data.shape[1:],
chunks=(self.comptopts["chunksize"],) + folder_data.shape[1:],
compression=self.comptopts["complib"],
compression_opts=self.comptopts["complevel"],
)
output_dataset.resize(self.cumu_rows[-1], axis=0)
else:
f_out[folder_name][self.cumu_rows[n]:self.cumu_rows[n + 1]] = folder_data
f_in.close()
f_out.flush()
elapsed_time = time.time() - start_time
# include the used filepaths in the file
f_out.create_dataset(
"used_files",
data=[n.encode("ascii", "ignore") for n in self.input_files]
)
f_out.close()
print(f"\nConcatenation complete!"
f"\nElapsed time: {elapsed_time/60:.2f} min "
f"({elapsed_time/len(self.input_files):.2f} s per file)")
def _modify_event_info(self, input_file, folder_data):
raise NotImplementedError
def _get_cumu_rows(self):
"""
Get the cumulative number of rows of the input_files.
Also checks if all the files can be safely concatenated to the
first one.
"""
# names of datasets that will be in the output; read from first file
with h5py.File(self.input_files[0], 'r') as f:
keys_stripped = strip_keys(list(f.keys()))
rows_per_file = np.zeros(len(self.input_files) + 1, dtype=int)
for i, file_name in enumerate(self.input_files, start=1):
with h5py.File(file_name, 'r') as f:
if not all(k in f.keys() for k in keys_stripped):
raise KeyError(
f"File {file_name} does not have the "
f"keys of the first file! "
f"It has {f.keys()} First file: {keys_stripped}")
# length of each dataset
rows = [f[k].shape[0] for k in keys_stripped]
if not all(row == rows[0] for row in rows):
raise ValueError(
f"Datasets in file {file_name} have varying length! "
f"{dict(zip(keys_stripped, rows))}"
)
if not all(k in keys_stripped for k in strip_keys(list(f.keys()))):
warnings.warn(
f"Additional datasets found in file {file_name} compared "
f"to the first file, they wont be in the output! "
f"This file: {strip_keys(list(f.keys()))} "
f"First file {keys_stripped}"
)
rows_per_file[i] = rows[0]
return np.cumsum(rows_per_file)
def strip_keys(f_keys):
"""
Remove pytables folders starting with '_i_', because the shape
of its first axis does not correspond to the number of events
in the file. All other folders normally have an axis_0 shape
that is equal to the number of events in the file.
Also remove bin_stats.
"""
return [x for x in f_keys if not is_folder_ignored(x)]
def is_folder_ignored(folder_name):
"""
Defines pytable folders which should be ignored during concat.
"""
return '_i_' in folder_name or "bin_stats" in folder_name
def get_compopts(file):
"""
Extract the following compression options:
complib : str
Specifies the compression library that should be used for saving
the concatenated output files.
It's read from the first input file.
complevel : None/int
Specifies the compression level that should be used for saving
the concatenated output files.
A compression level is only available for gzip compression, not lzf!
It's read from the first input file.
chunksize : None/int
Specifies the chunksize for axis_0 in the concatenated output files.
It's read from the first input file.
"""
with h5py.File(file, 'r') as f:
dset = f[strip_keys(list(f.keys()))[0]]
comptopts = {}
comptopts["complib"] = dset.compression
if comptopts["complib"] == 'lzf':
comptopts["complevel"] = None
else:
comptopts["complevel"] = dset.compression_opts
comptopts["chunksize"] = dset.chunks[0]
return comptopts
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Concatenate many small h5 files to a single large one. '
'Compression options and the datasets to be created in '
'the new file will be read from the first input file.')
parser.add_argument(
'list_file', type=str, help='A txt list of files to concatenate. '
'One absolute filepath per line. ')
parser.add_argument(
'output_filepath', type=str, help='The absoulte filepath of the output '
'.h5 file that will be created. ')
parsed_args = parser.parse_args()
fc = FileConcatenator.from_list(parsed_args.list_file)
fc.concatenate(parsed_args.output_filepath)
import h5py
import numpy as np
def create_dummy_file(filepath, columns=10, val_array=1, val_recarray=(1, 3)):
""" Create a dummy h5 file with an array and a recarray in it. """
with h5py.File(filepath, "w") as f:
f.create_dataset(
"numpy_array",
data=np.ones(shape=(columns, 7, 3))*val_array,
chunks=(5, 7, 3),
compression="gzip",
compression_opts=1
)
f.create_dataset(
"rec_array",
data=np.array([val_recarray] * columns, dtype=[('x', '<f8'), ('y', '<i8')]),
chunks=(5,),
compression="gzip",
compression_opts=1
)
......@@ -29,7 +29,7 @@ setup(
entry_points={'console_scripts': [
'make_nn_images=legacy.make_nn_images:main',
'shuffle=orcasong_contrib.data_tools.shuffle.shuffle_h5:main',
'concatenate=orcasong_contrib.data_tools.concatenate.concatenate_h5:main',
'concatenate=orcasong.tools.concatenate:main',
'make_dsplit=orcasong_contrib.data_tools.make_data_split.make_data_split:main',
'plot_binstats=orcasong.plotting.plot_binstats:main']}
......
......
File added
File added
import tempfile
from unittest import TestCase
import os
import numpy as np
import h5py
import orcasong.tools.concatenate as conc
__author__ = 'Stefan Reck'
class TestFileConcatenator(TestCase):
"""
Test concatenation on pre-generated h5 files. They are in test/data.
create_dummy_file(
"dummy_file_1.h5", columns=10, val_array=1, val_recarray=(1, 3)
)
create_dummy_file(
"dummy_file_2.h5", columns=15, val_array=2, val_recarray=(4, 5)
)
"""
def setUp(self):
# the files to test on
data_dir = os.path.join(os.path.dirname(__file__), "data")
self.dummy_files = (
os.path.join(data_dir, "dummy_file_1.h5"), # 10 columns
os.path.join(data_dir, "dummy_file_2.h5"), # 15 columns
)
# their compression opts
self.compt_opts = {
'complib': 'gzip', 'complevel': 1, 'chunksize': 5
}
def test_from_list(self):
with tempfile.NamedTemporaryFile("w+") as tf:
tf.writelines([f + "\n" for f in self.dummy_files])
tf.seek(0)
fc = conc.FileConcatenator.from_list(tf.name)
self.assertSequenceEqual(self.dummy_files, fc.input_files)
def test_get_compopts(self):
comptopts = conc.get_compopts(self.dummy_files[0])
self.assertDictEqual(comptopts, self.compt_opts)
def test_fc_get_comptopts(self):
fc = conc.FileConcatenator(self.dummy_files)
self.assertDictEqual(fc.comptopts, self.compt_opts)
def test_get_cumu_rows(self):
fc = conc.FileConcatenator(self.dummy_files)
np.testing.assert_array_equal(fc.cumu_rows, [0, 10, 25])
def test_concatenate_used_files(self):
fc = conc.FileConcatenator(self.dummy_files)
with tempfile.TemporaryFile() as tf:
fc.concatenate(tf)
with h5py.File(tf) as f:
self.assertSequenceEqual(
f["used_files"][()].tolist(),
[n.encode("ascii", "ignore") for n in self.dummy_files],
)
def test_concatenate_array(self):
fc = conc.FileConcatenator(self.dummy_files)
with tempfile.TemporaryFile() as tf:
fc.concatenate(tf)
with h5py.File(tf) as f:
target = np.ones((25, 7, 3))
target[10:, :, :] = 2.
np.testing.assert_array_equal(
target,
f["numpy_array"][()]
)
def test_concatenate_recarray(self):
fc = conc.FileConcatenator(self.dummy_files)
with tempfile.TemporaryFile() as tf:
fc.concatenate(tf)
with h5py.File(tf) as f:
target = np.array(
[(1, 3)] * 25,
dtype=[('x', '<f8'), ('y', '<i8')]
)
target["x"][10:] = 4.
target["y"][10:] = 5.
np.testing.assert_array_equal(
target,
f["rec_array"][()]
)
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment