Skip to content
Snippets Groups Projects
Verified Commit 6cc663cb authored by Tamas Gal's avatar Tamas Gal :speech_balloon:
Browse files

Multi threading using channels

parent b61ebef7
No related branches found
No related tags found
1 merge request!3Muonscanfit
......@@ -76,69 +76,22 @@ be empty if none of the directions had enough hits to perform the algorithm.
"""
function scanfit(params::MuonScanfitParameters, rhits::Vector{T}, directions::Vector{Direction{Float64}}) where T<:AbstractReducedHit
candidates = Vector{MuonScanfitCandidate}(undef, length(directions))
hit_buffers = Vector{Vector{HitR1}}()
covmatrices = Vector{CovMatrix}()
for _ in 1:Threads.nthreads()
push!(hit_buffers, Vector{HitR1}())
push!(covmatrices, CovMatrix(1.0, 5.0, params.nmaxhits)) # TODO: hardcoded values for alpha and sigma
xytsolvers = Channel{XYTSolver}(Threads.nthreads())
for _ in Threads.nthreads()
put!(xytsolvers, XYTSolver(params, 1.0))
end
Threads.@threads :static for (c_idx, dir) collect(enumerate(directions))
est = Line1ZEstimator(Line1Z(Position(0, 0, 0), 0))
χ² = Inf
R = rotator(dir)
rhits_copy = hit_buffers[Threads.threadid()]
length(rhits) > length(rhits_copy) && resize!(rhits_copy, length(rhits))
# rotate hits
for (idx, rhit) enumerate(rhits_copy)
rhit = rhits[idx]
rhits_copy[idx] = @set rhit.pos = R * rhit.pos
end
if length(rhits_copy) > params.nmaxhits
# TODO: review this block, here we may need a partial sort
sort!(rhits_copy; by=timetoz)
resize!(rhits_copy, params.nmaxhits)
end
clusterize!(rhits_copy, Match1D(params.roadwidth, params.tmaxlocal)) # 3053 allocations until here
NDF = length(rhits_copy) - est.NUMBER_OF_PARAMETERS
N = hitcount(rhits_copy)
if length(rhits_copy) <= est.NUMBER_OF_PARAMETERS
candidates[c_idx] = MuonScanfitCandidate(Position(0, 0, 0), dir, 0, -Inf, 0)
continue
end
sort!(rhits_copy)
try
estimate!(est, rhits_copy) # +11700 allocations
catch ex
# if isa(ex, SingularSVDException)
# @warn "Singular SVD"
candidates[c_idx] = MuonScanfitCandidate(Position(0, 0, 0), dir, 0, -Inf, 0)
continue
chunk_size = max(1, length(directions) ÷ Threads.nthreads())
chunks = Iterators.partition(directions, chunk_size)
tasks = map(chunks) do chunk
Threads.@spawn begin
xytsolver = take!(xytsolvers)
results = map(c -> xytsolver(rhits, c), chunk)
put!(xytsolvers, xytsolver)
results
end
# TODO: consider creating a "pos()" getter for everything
C = update!(covmatrices[Threads.threadid()], est.model.pos, rhits_copy) # 0 allocations
# TODO: this is really ugly... make update!() return the view itself
V = view(C.M, 1:length(rhits_copy), 1:length(rhits_copy))
Y = timeresvec(est.model, rhits_copy) # about 700 extra allocations here
V⁻¹ = inv(V) # +3000 allocations
χ² = transpose(Y) * V⁻¹ * Y # +700 allocations
fit_pos = R \ est.model.pos # 0 allocations
candidates[c_idx] = MuonScanfitCandidate(fit_pos, dir, est.model.t, quality(χ², N, NDF), NDF)
end
candidates
collect(Iterators.flatten(fetch.(tasks)))
end
struct MuonScanfitCandidate
......@@ -359,3 +312,66 @@ end
# TODO: generalise hits parameter
timeresvec(lz::Line1Z, hits) = [time(hit) - time(lz, hit.pos) for hit hits]
"""
A task worker whichs solves for x, y an t for a given set of hits and a direction.
"""
struct XYTSolver
hits_buffer::Vector{HitR1}
covmatrix::CovMatrix
N::Int
matcher::Match1D
# TODO: revise passing params since α is redundant
function XYTSolver(params::MuonScanfitParameters, α::Float64)
N = params.nmaxhits
new(Vector{HitR1}(), CovMatrix(α, params.σ, N), N, Match1D(params.roadwidth, params.tmaxlocal))
end
end
# TODO: revise if we really need to pass params here
function (s::XYTSolver)(hits::Vector{T}, dir::Direction{Float64}) where T<:AbstractReducedHit
est = Line1ZEstimator(Line1Z(Position(0, 0, 0), 0))
χ² = Inf
R = rotator(dir)
n_initial_hits = length(hits)
resize!(s.hits_buffer, n_initial_hits)
for (idx, hit) enumerate(hits) # rotate hits
s.hits_buffer[idx] = @set hit.pos = R * hit.pos
end
if n_initial_hits > s.N
# TODO: review this block, here we may need a partial sort
sort!(s.hits_buffer; by=timetoz)
resize!(s.hits_buffer, s.N)
end
clusterize!(s.hits_buffer, s.matcher) # 3053 allocations until here
NDF = length(s.hits_buffer) - est.NUMBER_OF_PARAMETERS
N = hitcount(s.hits_buffer)
length(s.hits_buffer) <= est.NUMBER_OF_PARAMETERS && return MuonScanfitCandidate(Position(0, 0, 0), dir, 0, -Inf, 0)
sort!(s.hits_buffer)
try
estimate!(est, s.hits_buffer) # +11700 allocations
catch ex
# if isa(ex, SingularSVDException)
# @warn "Singular SVD"
return MuonScanfitCandidate(Position(0, 0, 0), dir, 0, -Inf, 0)
end
# TODO: consider creating a "pos()" getter for everything
update!(s.covmatrix, est.model.pos, s.hits_buffer) # 0 allocations
# TODO: this is really ugly... make update!() return the view itself
V = view(s.covmatrix.M, 1:length(s.hits_buffer), 1:length(s.hits_buffer))
Y = timeresvec(est.model, s.hits_buffer) # about 700 extra allocations here
V⁻¹ = inv(V) # +3000 allocations
χ² = transpose(Y) * V⁻¹ * Y # +700 allocations
fit_pos = R \ est.model.pos # 0 allocations
MuonScanfitCandidate(fit_pos, dir, est.model.t, quality(χ², N, NDF), NDF)
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment