Base.Condition
works exactly like I'd expect it to, given the docs, but I don't understand Threads.Condition
at all.
See for example, the following
using Base.Threads
using Base.Threads: @spawn, Condition
function f(c::Condition)
wait(c)
println("hello f")
end
function g()
c = Condition()
lock(c)
@spawn f(c)
println("hello g")
notify(c)
unlock(c)
end
This only prints "hello g"
. There doesn't seem to be anything I can do to get the condition to stop blocking.
Also, I'm not completely sure I understand what lock
is doing, I take it is ensuring that a certain piece of memory can only be written to by the current thread?
I strongly suggest to "never" write rogue @spawn
/@async
like this. They should "always" be in @sync
. (Of course, by "never", I mean as in "I never write @goto
(except when I do)." But I do hope we can improve @sync
.)
Anyway, @sync
here is useful for finding out what went wrong:
julia> function g()
c = Condition()
@sync begin
lock(c)
@spawn f(c)
println("hello g")
notify(c)
unlock(c)
end
end
g (generic function with 1 method)
julia> g()
hello g
ERROR: TaskFailedException:
concurrency violation detected
Stacktrace:
[1] error(::String) at ./error.jl:33
[2] concurrency_violation() at ./condition.jl:8
[3] assert_havelock at ./condition.jl:27 [inlined]
[4] assert_havelock at ./lock.jl:19 [inlined]
[5] assert_havelock at ./condition.jl:72 [inlined]
[6] wait(::Base.GenericCondition{ReentrantLock}) at ./condition.jl:102
[7] f at ./REPL[3]:2 [inlined]
[8] (::var"#3#4"{Base.GenericCondition{ReentrantLock}})() at ./threadingconstructs.jl:169
Stacktrace:
[1] sync_end(::Channel{Any}) at ./task.jl:314
[2] macro expansion at ./task.jl:333 [inlined]
[3] g() at ./REPL[6]:3
[4] top-level scope at REPL[7]:1
As you can see from
[6] wait(::Base.GenericCondition{ReentrantLock}) at ./condition.jl:102
[7] f at ./REPL[3]:2 [inlined]
the task system doesn't like wait
in f
and says it is a concurrency violation.
To actually understand Threads.Condition
usage, the documentation https://docs.julialang.org/en/v1/base/multi-threading/#Base.Threads.Condition is helpful:
Therefore idiomatic use of a Threads.Condition c looks like the following:
lock(c) try while !thing_we_are_waiting_for wait(c) end finally unlock(c) end
So, you need some object for representing thing_we_are_waiting_for
. An MWE is something like this:
ulia> function f(wait_for, c::Condition)
lock(c) do
while !wait_for[]
wait(c)
end
end
println("hello f")
end
f (generic function with 2 methods)
julia> function g()
c = Condition()
wait_for = Ref(false)
@sync begin
lock(c) do
@spawn f(wait_for, c)
wait_for[] = true
println("hello g")
notify(c)
end
end
end
g (generic function with 1 method)
julia> g();
hello g
hello f
If you want to look at an example, see my implementation of Promise
in FoldsThreads.jl.
what
lock
is doing
As you can see in the example and the documentation, there is typically a thing_we_are_waiting_for
and it is possible that it is not OK to access it concurrently. In this case, a lock is required for avoiding data races.
Another way to understand lock
/unlock
around wait
/notify
is to read them as "start/stop synchronization region" (if it makes sense).
(BTW, if I were to be very honest, the best place to ask this is Slack (and then maybe Discourse), since there are more threading gurus out there. I'll do my best here, though.)
You need locks both for notify
and for wait
. Also they are difficult to compose. For example this works:
using .Threads
f(c) = lock(c) do
wait(c)
println("hello f")
end
function g(c)
lock(c) do
println("hello g")
notify(c)
end
nothing
end
then:
julia> c = Threads.Condition();
julia> t = Threads.@spawn f(c)
Task (runnable) @0x0000000167508cd0
julia> g(c)
hello g
hello f
julia> t
Task (done) @0x0000000167508cd0
Ok, thanks all. I think the main thing that had me confused was that I was getting to notify before the thread was spun up and got to wait
.
@Takafumi Arakaki (tkf) about your comment with no rogue @spawn
, what do you do in situations where you just need to continue on with the thread and not wait for a sync? For example, in a "producer/consumer" pattern, suppose you need a long-running "consumer" and are e.g. waiting for stuff to come in for the producer? I can't really imagine wrapping something like that in a @sync
because then you'd have to wait for the whole thing
These are almost the same questions I've asked in stream:helpdesk+(published) topic:[Tangent]+(Un)Structured+concurrency If I understand correctly in this case there is a big @sync
over all server (so it's basically no @sync
at all).
But (other then infinite producer/consumer scenario) I get a feeling, that such a restriction (wrap everything in @sync
) actually can help compiler to infer more from the code, and as a result produce more efficient code. This is worthy tradeoff I think.
You can handle producer/consumer style with "nursery passing style" as I noted in the last paragraph of https://julialang.zulipchat.com/#narrow/stream/274208-helpdesk-%28published%29/topic/.5BTangent.5D.20%28Un%29Structured.20concurrency/near/229164272
The point is that you need to handle errors at some point if you want to write a correct/debuggable/testable program. Structured concurrency provides a coherent strategy for doing this; i.e., end
of @sync
enforces this.
Last updated: Dec 28 2024 at 04:38 UTC