Stream: helpdesk (published)

Topic: how does `Threads.Condition` work?


view this post on Zulip Expanding Man (Mar 06 2021 at 18:36):

Base.Condition works exactly like I'd expect it to, given the docs, but I don't understand Threads.Condition at all.

See for example, the following

using Base.Threads
using Base.Threads: @spawn, Condition

function f(c::Condition)
    wait(c)
    println("hello f")
end

function g()
    c = Condition()
    lock(c)
    @spawn f(c)
    println("hello g")
    notify(c)
    unlock(c)
end

This only prints "hello g". There doesn't seem to be anything I can do to get the condition to stop blocking.

Also, I'm not completely sure I understand what lock is doing, I take it is ensuring that a certain piece of memory can only be written to by the current thread?

view this post on Zulip Takafumi Arakaki (tkf) (Mar 06 2021 at 23:21):

I strongly suggest to "never" write rogue @spawn/@async like this. They should "always" be in @sync. (Of course, by "never", I mean as in "I never write @goto (except when I do)." But I do hope we can improve @sync.)

Anyway, @sync here is useful for finding out what went wrong:

julia> function g()
           c = Condition()
           @sync begin
               lock(c)
               @spawn f(c)
               println("hello g")
               notify(c)
               unlock(c)
           end
       end
g (generic function with 1 method)

julia> g()
hello g
ERROR: TaskFailedException:
concurrency violation detected
Stacktrace:
 [1] error(::String) at ./error.jl:33
 [2] concurrency_violation() at ./condition.jl:8
 [3] assert_havelock at ./condition.jl:27 [inlined]
 [4] assert_havelock at ./lock.jl:19 [inlined]
 [5] assert_havelock at ./condition.jl:72 [inlined]
 [6] wait(::Base.GenericCondition{ReentrantLock}) at ./condition.jl:102
 [7] f at ./REPL[3]:2 [inlined]
 [8] (::var"#3#4"{Base.GenericCondition{ReentrantLock}})() at ./threadingconstructs.jl:169
Stacktrace:
 [1] sync_end(::Channel{Any}) at ./task.jl:314
 [2] macro expansion at ./task.jl:333 [inlined]
 [3] g() at ./REPL[6]:3
 [4] top-level scope at REPL[7]:1

As you can see from

 [6] wait(::Base.GenericCondition{ReentrantLock}) at ./condition.jl:102
 [7] f at ./REPL[3]:2 [inlined]

the task system doesn't like wait in f and says it is a concurrency violation.

To actually understand Threads.Condition usage, the documentation https://docs.julialang.org/en/v1/base/multi-threading/#Base.Threads.Condition is helpful:

Therefore idiomatic use of a Threads.Condition c looks like the following:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end

So, you need some object for representing thing_we_are_waiting_for. An MWE is something like this:

ulia> function f(wait_for, c::Condition)
           lock(c) do
               while !wait_for[]
                   wait(c)
               end
           end
           println("hello f")
       end
f (generic function with 2 methods)

julia> function g()
           c = Condition()
           wait_for = Ref(false)
           @sync begin
               lock(c) do
                   @spawn f(wait_for, c)
                   wait_for[] = true
                   println("hello g")
                   notify(c)
               end
           end
       end
g (generic function with 1 method)

julia> g();
hello g
hello f

If you want to look at an example, see my implementation of Promise in FoldsThreads.jl.

view this post on Zulip Takafumi Arakaki (tkf) (Mar 06 2021 at 23:26):

what lock is doing

As you can see in the example and the documentation, there is typically a thing_we_are_waiting_for and it is possible that it is not OK to access it concurrently. In this case, a lock is required for avoiding data races.

Another way to understand lock/unlock around wait/notify is to read them as "start/stop synchronization region" (if it makes sense).

view this post on Zulip Takafumi Arakaki (tkf) (Mar 06 2021 at 23:46):

(BTW, if I were to be very honest, the best place to ask this is Slack (and then maybe Discourse), since there are more threading gurus out there. I'll do my best here, though.)

view this post on Zulip Paul Bayer (Mar 07 2021 at 09:04):

You need locks both for notify and for wait. Also they are difficult to compose. For example this works:

using .Threads

f(c) = lock(c) do
    wait(c)
    println("hello f")
end

function g(c)
    lock(c) do
        println("hello g")
        notify(c)
    end
    nothing
end

then:

julia> c = Threads.Condition();

julia> t = Threads.@spawn f(c)
Task (runnable) @0x0000000167508cd0

julia> g(c)
hello g
hello f

julia> t
Task (done) @0x0000000167508cd0

view this post on Zulip Expanding Man (Mar 07 2021 at 17:54):

Ok, thanks all. I think the main thing that had me confused was that I was getting to notify before the thread was spun up and got to wait.

view this post on Zulip Expanding Man (Mar 07 2021 at 17:56):

@Takafumi Arakaki (tkf) about your comment with no rogue @spawn, what do you do in situations where you just need to continue on with the thread and not wait for a sync? For example, in a "producer/consumer" pattern, suppose you need a long-running "consumer" and are e.g. waiting for stuff to come in for the producer? I can't really imagine wrapping something like that in a @sync because then you'd have to wait for the whole thing

view this post on Zulip Andrey Oskin (Mar 07 2021 at 18:19):

These are almost the same questions I've asked in stream:helpdesk+(published) topic:[Tangent]+(Un)Structured+concurrency If I understand correctly in this case there is a big @sync over all server (so it's basically no @sync at all).

view this post on Zulip Andrey Oskin (Mar 07 2021 at 18:22):

But (other then infinite producer/consumer scenario) I get a feeling, that such a restriction (wrap everything in @sync) actually can help compiler to infer more from the code, and as a result produce more efficient code. This is worthy tradeoff I think.

view this post on Zulip Takafumi Arakaki (tkf) (Mar 07 2021 at 22:15):

You can handle producer/consumer style with "nursery passing style" as I noted in the last paragraph of https://julialang.zulipchat.com/#narrow/stream/274208-helpdesk-%28published%29/topic/.5BTangent.5D.20%28Un%29Structured.20concurrency/near/229164272

The point is that you need to handle errors at some point if you want to write a correct/debuggable/testable program. Structured concurrency provides a coherent strategy for doing this; i.e., end of @sync enforces this.


Last updated: Oct 02 2023 at 04:34 UTC