write Blob returned by Pipeline.drain with io.HDF5Sink
Summary
The need for the following feature request is meant to be discussed first, of course. Maybe it's not of general interest or I'm misusing the classes anyway.
Draining a pipeline returns a Blob
, which is filled by the returned value of the pre_finish
method of any attached module.
Example
class Finalizor(kp.Module):
def pre_finish(self):
return kp.Table({ 'useless': np.zeros(5) }, h5loc='/finally');
class Nihil(kp.Module):
def process(self, blob):
raise StopIteration
pipe=kp.Pipeline()
pipe.attach(Finalizor)
pipe.attach(Nihil)
final_blob = pipe.drain()
sink = kp.io.HDF5Sink(filename = 'test.h5')
for mod in final_blob:
if isinstance(final_blob[mod], kp.dataclasses.Table):
sink.write_table(final_blob[mod])
sink.finish()
Note that final_blob
is
Blob([('Finalizor',
Generic Table <class 'km3pipe.dataclasses.Table'> (rows: 5)),
('Nihil', None)])
and contains None
for the attached Nihil
module, which cannot be wrote to disk, of course.
Feature request
The loop above to check the final_blob
on existing kp.Table
s could be implemented into a new service HDF5Sink.write_blob
. In case the blob contains nested data, i.e. further Blobs with tables, write_blob
could recursively iterate through the final_blob
and write the tables to disk.
What do you think?
Edited by Matthias Bissinger