Newer
Older
import tempfile
from unittest import TestCase
import numpy as np
import h5py
import orcasong.tools.concatenate as conc
__author__ = 'Stefan Reck'
class TestFileConcatenator(TestCase):
"""
Test concatenation on pre-generated h5 files. They are in tests/data.
@classmethod
def setUpClass(cls):
cls.dummy_file_1 = tempfile.NamedTemporaryFile()
_create_dummy_file(
cls.dummy_file_1, columns=10, val_array=1, val_recarray=(1, 3)
)
cls.dummy_file_2 = tempfile.NamedTemporaryFile()
_create_dummy_file(
cls.dummy_file_2, columns=15, val_array=2, val_recarray=(4, 5)
)
cls.dummy_files = (
cls.dummy_file_1.name,
cls.dummy_file_2.name,
cls.compt_opts = {
'complib': 'gzip', 'complevel': 1, 'chunksize': 5, "shuffle": False
@classmethod
def tearDownClass(cls):
cls.dummy_file_1.close()
cls.dummy_file_2.close()
def test_from_list(self):
with tempfile.NamedTemporaryFile("w+") as tf:
tf.writelines([f + "\n" for f in self.dummy_files])
tf.seek(0)
fc = conc.FileConcatenator.from_list(tf.name)
self.assertSequenceEqual(self.dummy_files, fc.input_files)
def test_get_compopts(self):
comptopts = conc.get_compopts(self.dummy_files[0])
self.assertDictEqual(comptopts, self.compt_opts)
def test_fc_get_comptopts(self):
fc = conc.FileConcatenator(self.dummy_files)
self.assertDictEqual(fc.comptopts, self.compt_opts)
def test_fc_get_comptopts_updates(self):
fc = conc.FileConcatenator(self.dummy_files, comptopts_update={'chunksize': 1})
target_compt_opts = dict(self.compt_opts)
target_compt_opts["chunksize"] = 1
self.assertDictEqual(fc.comptopts, target_compt_opts)
def test_get_cumu_rows(self):
fc = conc.FileConcatenator(self.dummy_files)
np.testing.assert_array_equal(fc.cumu_rows, [0, 10, 25])
def test_concatenate_used_files(self):
fc = conc.FileConcatenator(self.dummy_files)
with tempfile.TemporaryFile() as tf:
fc.concatenate(tf)
self.assertSequenceEqual(
f["used_files"][()].tolist(),
[n.encode("ascii", "ignore") for n in self.dummy_files],
)
def test_concatenate_attrs(self):
fc = conc.FileConcatenator(self.dummy_files)
with tempfile.TemporaryFile() as tf:
fc.concatenate(tf)
with h5py.File(tf, "r") as f:
target_attrs = dict(f.attrs)
target_dset_attrs = dict(f["numpy_array"].attrs)
with h5py.File(self.dummy_files[0], "r") as f:
source_attrs = dict(f.attrs)
source_dset_attrs = dict(f["numpy_array"].attrs)
self.assertDictEqual(source_attrs, target_attrs)
self.assertDictEqual(source_dset_attrs, target_dset_attrs)
def test_concatenate_array(self):
fc = conc.FileConcatenator(self.dummy_files)
with tempfile.TemporaryFile() as tf:
fc.concatenate(tf)
target = np.ones((25, 7, 3))
target[10:, :, :] = 2.
np.testing.assert_array_equal(
target,
f["numpy_array"][()]
)
fc = conc.FileConcatenator(self.dummy_files)
with tempfile.TemporaryFile() as tf:
fc.concatenate(tf)
[(1, 3, 1)] * 25,
dtype=[('x', '<f8'), ('y', '<i8'), ("group_id", "<i8")]
np.testing.assert_array_equal(
target,
f["rec_array"][()]
)
def _create_dummy_file(filepath, columns=10, val_array=1, val_recarray=(1, 3)):
""" Create a dummy h5 file with an array and a recarray in it. """
with h5py.File(filepath, "w") as f:
"numpy_array",
data=np.ones(shape=(columns, 7, 3))*val_array,
chunks=(5, 7, 3),
compression="gzip",
compression_opts=1
)
rec_array = np.array(
[val_recarray + (1, )] * columns,
dtype=[('x', '<f8'), ('y', '<i8'), ("group_id", "<i8")]
)
rec_array["group_id"] = np.arange(columns)
f.create_dataset(
"rec_array",
data=rec_array,
chunks=(5,),
compression="gzip",
compression_opts=1
)