Skip to content
Snippets Groups Projects
Verified Commit dd2f9cac authored by Tamas Gal's avatar Tamas Gal :speech_balloon:
Browse files

Make H5File write operations thread-safe

parent 26828fba
No related branches found
No related tags found
No related merge requests found
......@@ -13,6 +13,7 @@ written to the target HDF5 path when full.
struct H5CompoundDataset{T}
dset::HDF5.Dataset
cache::H5CompoundDatasetCache{T}
_lock::ReentrantLock
end
Base.length(c::H5CompoundDatasetCache) = length(c.buffer)
isfull(c::H5CompoundDatasetCache) = length(c) >= c.size
......@@ -23,13 +24,16 @@ HDF5.read_attribute(cdset::H5CompoundDataset, name::AbstractString) = HDF5.read_
Forces the cache to be written to the HDF5 file.
"""
function Base.flush(d::H5CompoundDataset)
function Base.flush(d::H5CompoundDataset; nolock=false)
!nolock && lock(d._lock)
current_dims, _ = HDF5.get_extent_dims(d.dset)
idx = first(current_dims)
n = length(d.cache)
HDF5.set_extent_dims(d.dset, (idx + n,))
d.dset[idx+1:idx+n] = d.cache.buffer
empty!(d.cache.buffer)
!nolock && unlock(d._lock)
d
end
"""
......@@ -40,6 +44,7 @@ A wrapper for an HDF5 file used in KM3NeT.
struct H5File
_h5f::HDF5.File
_datasets::Dict{String, H5CompoundDataset}
_lock::ReentrantLock
function H5File(fname::AbstractString, mode::AbstractString="r")
h5f = h5open(fname, mode)
......@@ -51,7 +56,7 @@ struct H5File
attrs(h5f)["KM3io.jl"] = string(version)
end
end
new(h5f, Dict{String, H5CompoundDataset}())
new(h5f, Dict{String, H5CompoundDataset}(), ReentrantLock())
end
end
function Base.flush(f::H5File)
......@@ -64,7 +69,9 @@ function Base.close(f::H5File)
close(f._h5f)
end
function Base.write(f::H5File, path::AbstractString, data)
lock(f._lock)
write_dataset(f._h5f, path, data)
unlock(f._lock)
end
Base.getindex(f::H5File, args...) = getindex(f._h5f, args...)
HDF5.read_attribute(f::H5File, name::AbstractString) = HDF5.read_attribute(f._h5f, name)
......@@ -79,16 +86,21 @@ the HDF5 dataset will be extended, the buffer written and cleared.
To force the writing, use [`flush`](@ref)
"""
function HDF5.create_dataset(f::H5File, path::AbstractString, ::Type{T}; cache_size=10000, chunk=(10000,), filters=[Filters.Deflate(5)], kwargs...) where T
lock(f._lock)
dset = HDF5.create_dataset(f._h5f, path, T, ((0,), (-1,)); chunk=chunk, filters=filters, kwargs...)
attrs(dset)["struct_name"] = string(nameof(T))
cache = H5CompoundDatasetCache(T[], cache_size)
d = H5CompoundDataset(dset, cache)
d = H5CompoundDataset(dset, cache, f._lock)
f._datasets[path] = d
unlock(f._lock)
d
end
function Base.push!(d::H5CompoundDataset{T}, element::T) where T
lock(d._lock)
push!(d.cache.buffer, element)
isfull(d.cache) && flush(d)
isfull(d.cache) && flush(d; nolock=true)
unlock(d._lock)
d
end
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment