Stream: helpdesk (published)

Topic: Synchronization of multiple async tasks


view this post on Zulip Andrey Oskin (Aug 02 2021 at 18:47):

Today I was trying to solve an issue of synchronous termination of async tasks (only async, no multithreading) in case if one of them fail, and came to this solution

function foo()
    ts = Task[]
    for i in 1:20
        t = @task begin
            sleep(i/10)
            println(i)
            @assert i != 5
        end
        push!(ts, t)
    end
    foreach(schedule, ts)
    ht = @async begin
        while true
            all(istaskdone, ts) && break
            if any(istaskfailed, ts)
                for t in ts
                    istaskdone(t) && continue
                    istaskfailed(t) && continue
                    @async try Base.throwto(t, InterruptException()) catch end
                    yield()
                end
                break
            end
            yield()
        end
    end
    wait(ht)

    return nothing
end

I wonder is it complete madness or it is more or less ok? Maybe there are better, more idiomatic ways to solve this problem?

view this post on Zulip Takafumi Arakaki (tkf) (Aug 02 2021 at 19:18):

Base.throwto (or schedule) on task that you don't "own" is undefined behavior. Also, I'd avoid spinning like this as much as possible.

Funny story, I was answering a similar question in slack today https://julialang.slack.com/archives/C6SMTHQ3T/p1627916665016700?thread_ts=1627899423.013900&cid=C6SMTHQ3T

@sync let handles = Channel(Inf)
    for i in 1:100
        @async begin
            timer = Timer(i)
            try
                try
                    put!(handles, timer)
                catch
                    close(timer)  # closed by another task
                    return
                end
                try
                    wait(timer)
                catch err
                    err isa EOFError && return  # closed by another task
                    rethrow()
                end
                println(i)
                @assert i != 5
            catch
                close(handles)  # no more `put!`
                foreach(close, handles)  # cancel all `wait(timer)`
                rethrow()
            end
        end
    end
end

view this post on Zulip Sebastian Pfitzner (Aug 02 2021 at 19:29):

Base.throwto (or schedule) on task that you don't "own" is undefined behavior.

Can you elaborate on that? I know Jameson usually discourages people from doing that, but I'm not sure why (it might cause task queue inconsistencies or something, I guess)

view this post on Zulip Andrey Oskin (Aug 02 2021 at 19:31):

I also wonder, is it fundamental limitation that killing tasks that aren't owned is undefined? Or it's just current Julia implementation and it may change in the future?

I mean, from pedestrian point of view, there is nothing bad in killing some task (or delegate it to some authority if such exists).

view this post on Zulip Andrey Oskin (Aug 02 2021 at 19:32):

Funny story, I was answering a similar question in slack today

Yes, I have the same question from the same guy in our Telegram channel :-)

view this post on Zulip Sebastian Pfitzner (Aug 02 2021 at 19:33):

FWIW, Juno and the VSCode extension schedule InterruptExceptions on an execution task and, anecdotally, that seems to work fairly well

view this post on Zulip Sebastian Pfitzner (Aug 02 2021 at 19:34):

But in those cases I really want to be able to interrupt arbitrary code

view this post on Zulip Andrey Oskin (Aug 02 2021 at 19:37):

Hmm, but in Takafumi example, sleep is changed to timer. In real code, there will be no sleep of course, there will be some long running task (for example HTTP.get).

view this post on Zulip Andrey Oskin (Aug 02 2021 at 19:37):

Can this snippet be changed to support this situation as well?

view this post on Zulip Andrey Oskin (Aug 02 2021 at 19:40):

It should starts with something like

url = "http://example.url"
@sync let handles = Channel(Inf)
    for i in 1:100
        @async begin
            uri = "$url/$i"
           try
             HTTP.get(uri)
            ???
          catch
              ???
         end
     end
  end
end

view this post on Zulip Takafumi Arakaki (tkf) (Aug 02 2021 at 20:01):

Sebastian Pfitzner said:

FWIW, Juno and the VSCode extension schedule InterruptExceptions on an execution task and, anecdotally, that seems to work fairly well

Hasn't Workqueue inconsistency become a problem?

julia> t = @async nothing
       Base.throwto(t, KeyError(0))

WARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable
^CERROR: InterruptException:

Nothing can stop throwto-ing to a task that is in the workqueue. And it'd confuse scheduler since it thinks everything there is runnable.

Also, schedule in general is problematic since it sets the result in the task field non-atomically. It means that doing so from different worker thread is a data race.

view this post on Zulip Takafumi Arakaki (tkf) (Aug 02 2021 at 20:03):

Andrey Oskin said:

Hmm, but in Takafumi example, sleep is changed to timer. In real code, there will be no sleep of course, there will be some long running task (for example HTTP.get).

Yeah, you'd need some close-able object there. Does HTTP have open-like API?

But I know it's tedious and non-composable pattern...

view this post on Zulip Takafumi Arakaki (tkf) (Aug 02 2021 at 20:25):

Andrey Oskin said:

I also wonder, is it fundamental limitation that killing tasks that aren't owned is undefined? Or it's just current Julia implementation and it may change in the future?

I think it's implementable but I still don't know the task runtime end-to-end.

view this post on Zulip Sebastian Pfitzner (Aug 02 2021 at 20:27):

Hasn't Workqueue inconsistency become a problem?

No. I also don't care about the return value of that task (it basically lives forever and I communicate with it via a Channel), so non-atomically setting the result doesn't matter

view this post on Zulip Takafumi Arakaki (tkf) (Aug 02 2021 at 20:43):

If you are using it with threading, non-atomically setting it breaks your program. There's absolutely no guarantee that the object you set is meaningful on the reader's side.

Even if everything is @async task, I think it'd be nice to handle this the right way in an important software like VS code plugin.

view this post on Zulip Sebastian Pfitzner (Aug 02 2021 at 20:49):

What I was trying to say is that the result is never read

view this post on Zulip Sebastian Pfitzner (Aug 02 2021 at 20:49):

But yes, I'm all for cleaning up that part of the codebase!

view this post on Zulip Andrey Oskin (Aug 02 2021 at 21:00):

Yeah, you'd need some close-able object there. Does HTTP have open-like API?

Well, according to their documentation there is something open like:

HTTP.open("GET", "https://tinyurl.com/bach-cello-suite-1-ogg") do http
    n = 0
    r = startread(http)
    l = parse(Int, HTTP.header(r, "Content-Length"))
    open(`vlc -q --play-and-exit --intf dummy -`, "w") do vlc
        while !eof(http)
            bytes = readavailable(http)
            write(vlc, bytes)
            n += length(bytes)
            println("streamed \$n-bytes \$((100*n)÷l)%\\u1b[1A")
        end
    end
end

The problem is they have many layers of request processing. Not sure at which point things became @async.

view this post on Zulip Takafumi Arakaki (tkf) (Aug 02 2021 at 21:16):

If close on HTTP.open is "@async-safe", maybe you can do something like

HTTP.open(...) do http
    put!(handles, http)
    ... # actual processing
end

where the handles is the Channel in my example.

view this post on Zulip Takafumi Arakaki (tkf) (Aug 02 2021 at 21:17):

Sebastian Pfitzner said:

What I was trying to say is that the result is never read

Exception is a result internally. Here's Base.throwto (similar code in schedule too)

function throwto(t::Task, @nospecialize exc)
    t.result = exc
    t._isexception = true
    set_next_task(t)
    return try_yieldto(identity)
end

https://github.com/JuliaLang/julia/blob/f711f0a9076b9f11f9034985086d73bc285c2ca6/base/task.jl#L772-L777

So, internally, the exception is just a result with _isexception flag set. To throw an exception, the task needs to load it from the .result field.

view this post on Zulip Sebastian Pfitzner (Aug 02 2021 at 21:20):

Ah, that makes sense.

view this post on Zulip Sebastian Pfitzner (Aug 02 2021 at 21:40):

I still don't know a) which consequences any data races could have in this case and b) what to do about it :stuck_out_tongue:

view this post on Zulip Takafumi Arakaki (tkf) (Aug 02 2021 at 22:12):

a) It can create an ill-formed Julia object. Since it can then corrupt the Julia runtime, anything can happen. You get a segfault if you are lucky. It's like a misplaced @inbounds. But worse, since the error can be hard to reproduce.

b) Supporting cancellation is tedious ATM, unfortunately. You can keep around close-able objects like my example above. Or, pass around Threads.Atomic{Bool} and check it from time to time.

view this post on Zulip Andrey Oskin (Aug 03 2021 at 03:08):

I wonder, how structured concurrency is possible then (or how trio framework is working). If I get it right, nursery gracefully shutdown all children, but it looks impossible in current Julia implementation?

I mean, you have a function, function is wrapped in task. The only way to stop the task externally is to set this isexception field and then it removes itself from the schedule. But this wouldn't gracefully unallocate all resources, right?

So, how on earth, one can externally close the task in such a way that it would gracefully release all the resources? Task should keep track of all allocated resources (like all open files), but there is no way to do it, since there is no direct communication between function and wrapping task.

view this post on Zulip Andrey Oskin (Aug 03 2021 at 03:09):

Sorry maybe I mixing different concepts together, but it really looks more complicated now, then I thought before.

view this post on Zulip Takafumi Arakaki (tkf) (Aug 03 2021 at 03:26):

Yeah, it is implemented in other systems. It should also be possible to implement this as a package given all possibly blocking operations go through a consistent API. I think it is possible to do this in the Julia runtime with some manageable surgery. The main issue probably is convincing core devs that this is worth doing.

view this post on Zulip Andrey Oskin (Aug 03 2021 at 04:32):

Ah, but how it can be implemented? I am sorry, my knowledge of those things is too little.

Should it be made in such a way, that for example open(io) should be rewritten in such a way, that it leaves trace in wrapping Task? So that if task is interrupted it can call close on all such objects? It is it something different entirely (idk, some magic with stacktraces or may be even with garbage collector).

I mean, it would be interesting to see this technique implemented, even as a package prototype.

view this post on Zulip Andrey Oskin (Aug 03 2021 at 04:36):

I also seen word Tapir when things like this is discussed, but I do not know, is it relevant or not.

view this post on Zulip Takafumi Arakaki (tkf) (Aug 03 2021 at 05:56):

Yeah, that's the idea. I think creating a package for this would be useful.

(It is limited in the sense that you can't keep channel/timer/file/etc. open upon task termination, compared to full-blown Structured Concurrency systems like Trio. For this, you'd need to touch the runtime or have something like Go's select.)

view this post on Zulip Takafumi Arakaki (tkf) (Aug 03 2021 at 05:57):

Tapir is not related to the cancellation story.

view this post on Zulip Andrey Oskin (Aug 03 2021 at 06:33):

Sorry for keeping asking you questions, but what do you mean by "touching runtime"? I mean all things like closing/deallocating resources happens in a runtime, how is it diiferent?

And what is the technique behind Go select? Where one can read about it? Can it be implemented in Julia in some way (I mean, maybe even through touching low level things in LLVM somehow)? Or it will go very deep in Julia semantics?

view this post on Zulip Takafumi Arakaki (tkf) (Aug 04 2021 at 01:31):

I'm happy to nerd out about this :)

By "touching runtime," I meant inserting cancellation point for each yield to the runtime; e.g., reading a socket, take! on an empty channel, lock, etc. These operations need to "listen to" the actual I/O outcome (e.g., data read from the socket) and the cancellation signal.

Andrey Oskin said:

And what is the technique behind Go select? Where one can read about it?

See how Go's context https://pkg.go.dev/context (coupled with errgroup https://pkg.go.dev/golang.org/x/sync/errgroup) implements (something equivalent to) structured concurrency with the "Done channel idiom."

Andrey Oskin said:

Can it be implemented in Julia in some way (I mean, maybe even through touching low level things in LLVM somehow)?

I think it's totally implementable without just writing plain Julia code. No need to use LLVM hacking or even any kind of metaprogramming.

But the catch is that you'd need to re-implement all concurrent communication primitives (lock, condition variable, channel, ...) if you want to support cancellation (or select-like operations) on them. It is doable, and in fact I'm experimenting with it a bit.

It's probably easier to just directly implement channel and select only for channel (Go's strategy). But I have a bit more roundabout idea: (1) Implement Reagent (https://doi.org/10.1145/2254064.2254084) (2) Implement Concurrent ML (https://en.wikipedia.org/wiki/Concurrent_ML) on top of it (3) Implement Racket's Kill-safe synchronization abstractions (https://doi.org/10.1145/996893.996849) on top of it. This hopefully gives us very extensible concurrent synchronization mechanism which then let us create very user-friendly concurrency API like Trio on top of it.

view this post on Zulip Takafumi Arakaki (tkf) (Aug 04 2021 at 01:38):

More resources:

A talk on OCaml's Reagent implementation. A nice high-level overview:
LDN Functionals #8 KC Sivaramakrishnan: OCaml multicore and programming with Reagents - YouTube

A talk on the high-level idea of Concurrent ML. The code is kinda hard to read which is rather sad. But I think it's a nice accessible introduction. I learnt about Reagent via this talk.
Michael Sperber - Concurrent ML - The One That Got Away - Code Mesh 2017 - YouTube

Another talk on Concurrent ML. It was kinda hard to follow for me. But it goes into some implementation details.
Andy Wingo - Channels, Concurrency, and Cores: A new Concurrent ML implementation - YouTube

view this post on Zulip Takafumi Arakaki (tkf) (Aug 04 2021 at 02:11):

But, as I said before, it's totally possible that a package that automates the close-on-cancel pattern is sufficient for many practical problems. I know I'm in a rather deep stack of yak shaving :)

view this post on Zulip Chris Foster (Aug 04 2021 at 05:40):

Haha, deep indeed. The other day I was reading https://carllerche.com/2021/06/17/six-ways-to-make-async-rust-easier/ which discusses kill safety (or the lack of it) in Rust. It's a good thoughtful writeup of the current state of some of these things in Rust. We don't have cancellation in Julia, so we don't have some of their problems yet. But otoh we have all the other problems that lack of cancellation causes :sweat_smile:

view this post on Zulip Sebastian Pfitzner (Aug 04 2021 at 09:11):

Related: CancellationTokens.jl

view this post on Zulip Takafumi Arakaki (tkf) (Aug 06 2021 at 06:19):

So, here's the first step in my attempt to support cancellation in a rather roundabout way: https://github.com/tkf/Reagents.jl

I tired to summarize what I was saying above here: https://github.com/tkf/Reagents.jl/discussions/5

view this post on Zulip Andrey Oskin (Aug 06 2021 at 08:25):

Oh, I am so anxious to give it a spin :-)
That nice feeling of christmas present unpacking :-D

view this post on Zulip Takafumi Arakaki (tkf) (Aug 06 2021 at 20:28):

This is still a very low-level infrastructure. So don't hold your breath yet :)

view this post on Zulip Jameson Nash (Aug 30 2021 at 00:06):

The rust link from @Chris Foster seems to be proposing the fix for all their issues is to do what we do now in Julia (with implicit cancellation tokens on resources):

Cancel spawned tasks by disabling resources from completing.

view this post on Zulip Jameson Nash (Aug 30 2021 at 00:07):

@Takafumi Arakaki (tkf) I like that code snippet you posted at the top. Have you ever thought about how we could automate that better? I have wanted to make @sync also be able to manage in-flight resources like that, but been unsure where to start on it.

view this post on Zulip Takafumi Arakaki (tkf) (Aug 30 2021 at 02:27):

Good to know that you like the example! It was basically an outcome of me trying to understand your philosophy of the error handling/cancellation of concurrent programs :), to figure out the best practice for doing this in current Julia.

To add something like handle to @sync, I think the main questions are:

(1) Should handle be dynamically or lexically scoped? I prefer dynamically scoped direction but then we'd need to use something like task-local or context variable or this.

Or maybe it can be done lexically by default but we can also add a macro @syncgorup() that returns an object wrapping the current sync_varname and handle. We can then bind a closable object to the "syncgroup" object. It's still a bit manual but it's nice to automate the try-catch dance. In the lexical environment of @sync, we can simply use (say) @syncbind or something.

(2) Should the exit be non-local? In my code I used returns like this

try
    wait(timer)
catch err
    err isa EOFError && return  # closed by another task
    rethrow()
end

I could use return here since I know it'd end @async. But we can't do this for arbitrary code (e.g., the user might want to push an object to a deal with the timer inside a open(...) do block). If we go this direction, I think we'd need something like a Cancelled exception.

Once these design decisions are address, I imagine implementation would be straightforward.

view this post on Zulip Takafumi Arakaki (tkf) (Aug 30 2021 at 02:28):

Also, FYI, lexically scoped non-local cancellation is now implemented in https://github.com/tkf/Julio.jl

view this post on Zulip Takafumi Arakaki (tkf) (Aug 30 2021 at 02:42):

Just reading the Rust blog, I didn't realize Tokio's select does not guarantee atomicity. It seems like an unfortunate design... (The blog post also discuss restricting it.)

Reagents actually help writing something like this. Julio.select does not consume an IO if it's not selected: https://tkf.github.io/Julio.jl/dev/tutorials/select/#Selecting-an-arbitrary-event (It's done using a manager task ATM but I think it's possible to avoid this by touching lower-level interface)


Last updated: Oct 02 2023 at 04:34 UTC