I am using OhMyThreads.jl function tmap!
. I would like to mutate the input array A
, but I am not sure, if it is safe, as I don't have much experience with parallel computation. Basically, I am running tmap!(f, A, A)
. Each calculation of f(A[i])
is independent of other elements of A
. There shouldn't be any racing happening, right? Each thread works on its own elements?
Yes, that should be fine.
Thanks!
It might not work right on A::BitArray
or some other types?
See also https://github.com/JuliaLang/julia/issues/53140
I have a related question, where I am trying to run a simulation across independent objects and then aggregate time-varying results into some storage arrays. Based on reading the OhMyThreads documentation, I think the following code would be susceptible to data races, though testing simulation
thousands of iterations I haven't encountered an assertion failure which I added to check for data races in this dummy case. Of course in the non-MWE, the inner loop is doing a lot more work.
My questions are:
function inner_loop!(output_vecs,n)
tid = Threads.threadid()
for i in 1:n
output_vecs.x[i,tid] += 1
output_vecs.y[i,tid] += 2
end
end
function simulation()
n = 10^5
proj_length = 1200
nt = Threads.nthreads()
output = (
x = zeros(proj_length,nt),
y = zeros(proj_length,nt),
)
Threads.@threads for i in 1:n
inner_loop!(output,500)
end
output = map(x -> vec(reduce(+, x, dims=2)), output)
@assert sum(output.x) == n * 500
@assert sum(output.y) == n * 500 *2
output
end
simulation()
Buffered channel with OhMyThreads.jl
function inner_loop_buffer!(output_vecs,n)
tid = Threads.threadid()
for i in 1:n
output_vecs.x[i] += 1
output_vecs.y[i] += 2
end
end
function simulation_buffer()
n = 10^5
proj_length = 1200
nt = Threads.nthreads()
output = (
x = zeros(proj_length),
y = zeros(proj_length),
)
chnl = Channel{typeof(output)}(nt)
foreach(1:nt) do _
put!(chnl, (
x = zeros(proj_length),
y = zeros(proj_length),
))
end
OhMyThreads.tmap(1:n) do i
C = take!(chnl)
inner_loop_buffer!(C,500)
put!(chnl,C)
end
close(chnl) # Ensure no more items will be put into the channel
for M in chnl
output.x .+= M.x
output.y .+= M.y
end
@assert sum(output.x) == n * 500
@assert sum(output.y) == n * 500 *2
output
end
simulation_buffer()
In addition to being more verbose, the buffered channel version is much slower (maybe just because creating more output arrays is the predominant time spent?
julia> @benchmark simulation()
BenchmarkTools.Trial: 1016 samples with 1 evaluation per sample.
Range (min … max): 4.871 ms … 7.371 ms ┊ GC (min … max): 0.00% … 32.04%
Time (median): 4.897 ms ┊ GC (median): 0.00%
Time (mean ± σ): 4.925 ms ± 131.609 μs ┊ GC (mean ± σ): 0.17% ± 1.79%
▂▅█▅▁
▁▂▇█████▆▅▅▄▄▂▂▄▄▄▄▄▃▃▂▃▃▂▂▃▂▂▃▃▃▂▂▂▂▂▃▂▃▂▂▂▂▂▂▂▂▂▁▂▂▁▁▁▁▁▁ ▂
4.87 ms Histogram: frequency by time 5.03 ms <
Memory estimate: 96.42 KiB, allocs estimate: 42.
julia> @benchmark simulation_buffer()
BenchmarkTools.Trial: 243 samples with 1 evaluation per sample.
Range (min … max): 19.796 ms … 24.860 ms ┊ GC (min … max): 0.00% … 0.00%
Time (median): 20.524 ms ┊ GC (median): 0.00%
Time (mean ± σ): 20.627 ms ± 559.408 μs ┊ GC (mean ± σ): 1.08% ± 2.01%
▃ ▁▄█ ▆█▁▇▁▆▁▁▇▄▂ ▆▂ ▂ ▂
▃▃▄▃█▇███▇███████████▇██▆█▆█▇▆█▇██▄▇▄▃▃▆▃▇▇▃▄▃▃▁▃▁▃▄▁▁▁▁▁▁▁▃ ▄
19.8 ms Histogram: frequency by time 22.2 ms <
Memory estimate: 4.60 MiB, allocs estimate: 163.
Hey @Alec, somehow I missed your post before. So to answer your two questions
yield
, but in general you can't actually know for sure if code will never yield
since it depends on things like optimization parameters, internal details like if it does @debug
logging, etc. Channel
is one option, or task local storage, but for this example, I'd say you should just use ChunkSplitters.jl directly, it'd probably be cleaner and nicer.i.e.
using ChunkSplitters
function simulation_cs()
n = 10^5
proj_length = 1200
output = (
x = zeros(proj_length),
y = zeros(proj_length),
)
tasks = map(chunks(1:n; n=Threads.nthreads())) do chunk
Threads.@spawn begin
C = (;x = zeros(proj_length), y = zeros(proj_length))
for i ∈ chunk
inner_loop_buffer!(C,500)
end
C
end
end
for task ∈ tasks
C = fetch(task)
output.x .+= C.x
output.y .+= C.y
end
@assert sum(output.x) == n * 500
@assert sum(output.y) == n * 500 *2
output
end
julia> @benchmark simulation()
BenchmarkTools.Trial: 1930 samples with 1 evaluation per sample.
Range (min … max): 1.997 ms … 6.052 ms ┊ GC (min … max): 0.00% … 45.98%
Time (median): 2.572 ms ┊ GC (median): 0.00%
Time (mean ± σ): 2.588 ms ± 228.110 μs ┊ GC (mean ± σ): 0.48% ± 3.05%
▁ ▁ ▁ ▁ ▅██▇▅▃▂▂▁▁▃▂▂ ▁ ▁
▅▄▇▆██▇████▇███▇██▇█▇██████████████████▇▇█▆▆▆▄▆▄▄▄▄▁▁▁▁▁▄▁▄ █
2 ms Histogram: log(frequency) by time 3.33 ms <
Memory estimate: 173.52 KiB, allocs estimate: 62.
julia> @benchmark simulation_cs()
BenchmarkTools.Trial: 3289 samples with 1 evaluation per sample.
Range (min … max): 1.176 ms … 5.898 ms ┊ GC (min … max): 0.00% … 67.40%
Time (median): 1.473 ms ┊ GC (median): 0.00%
Time (mean ± σ): 1.518 ms ± 260.415 μs ┊ GC (mean ± σ): 0.92% ± 3.70%
▂▇█▇▆▅▄▄▃▂▂▂▂▂▂▁▂▁▁ ▁ ▁
▇▇▇█▇▇█▇▇▇▇█▇▇████▇███████████████████████████▇▇▅▇▆▆▄▄▄▃▁▃▃ █
1.18 ms Histogram: log(frequency) by time 1.93 ms <
Memory estimate: 174.75 KiB, allocs estimate: 120.
Last updated: Apr 04 2025 at 04:42 UTC