Today I was trying to solve an issue of synchronous termination of async tasks (only async, no multithreading) in case if one of them fail, and came to this solution
function foo()
ts = Task[]
for i in 1:20
t = @task begin
sleep(i/10)
println(i)
@assert i != 5
end
push!(ts, t)
end
foreach(schedule, ts)
ht = @async begin
while true
all(istaskdone, ts) && break
if any(istaskfailed, ts)
for t in ts
istaskdone(t) && continue
istaskfailed(t) && continue
@async try Base.throwto(t, InterruptException()) catch end
yield()
end
break
end
yield()
end
end
wait(ht)
return nothing
end
I wonder is it complete madness or it is more or less ok? Maybe there are better, more idiomatic ways to solve this problem?
Base.throwto
(or schedule
) on task that you don't "own" is undefined behavior. Also, I'd avoid spinning like this as much as possible.
Funny story, I was answering a similar question in slack today https://julialang.slack.com/archives/C6SMTHQ3T/p1627916665016700?thread_ts=1627899423.013900&cid=C6SMTHQ3T
@sync let handles = Channel(Inf)
for i in 1:100
@async begin
timer = Timer(i)
try
try
put!(handles, timer)
catch
close(timer) # closed by another task
return
end
try
wait(timer)
catch err
err isa EOFError && return # closed by another task
rethrow()
end
println(i)
@assert i != 5
catch
close(handles) # no more `put!`
foreach(close, handles) # cancel all `wait(timer)`
rethrow()
end
end
end
end
Base.throwto
(orschedule
) on task that you don't "own" is undefined behavior.
Can you elaborate on that? I know Jameson usually discourages people from doing that, but I'm not sure why (it might cause task queue inconsistencies or something, I guess)
I also wonder, is it fundamental limitation that killing tasks that aren't owned is undefined? Or it's just current Julia implementation and it may change in the future?
I mean, from pedestrian point of view, there is nothing bad in killing some task (or delegate it to some authority if such exists).
Funny story, I was answering a similar question in slack today
Yes, I have the same question from the same guy in our Telegram channel :-)
FWIW, Juno and the VSCode extension schedule
InterruptExceptions
on an execution task and, anecdotally, that seems to work fairly well
But in those cases I really want to be able to interrupt arbitrary code
Hmm, but in Takafumi example, sleep
is changed to timer
. In real code, there will be no sleep
of course, there will be some long running task (for example HTTP.get
).
Can this snippet be changed to support this situation as well?
It should starts with something like
url = "http://example.url"
@sync let handles = Channel(Inf)
for i in 1:100
@async begin
uri = "$url/$i"
try
HTTP.get(uri)
???
catch
???
end
end
end
end
Sebastian Pfitzner said:
FWIW, Juno and the VSCode extension
schedule
InterruptExceptions
on an execution task and, anecdotally, that seems to work fairly well
Hasn't Workqueue inconsistency become a problem?
julia> t = @async nothing
Base.throwto(t, KeyError(0))
WARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable
^CERROR: InterruptException:
Nothing can stop throwto
-ing to a task that is in the workqueue. And it'd confuse scheduler since it thinks everything there is runnable.
Also, schedule
in general is problematic since it sets the result in the task field non-atomically. It means that doing so from different worker thread is a data race.
Andrey Oskin said:
Hmm, but in Takafumi example,
sleep
is changed totimer
. In real code, there will be nosleep
of course, there will be some long running task (for exampleHTTP.get
).
Yeah, you'd need some close
-able object there. Does HTTP have open
-like API?
But I know it's tedious and non-composable pattern...
Andrey Oskin said:
I also wonder, is it fundamental limitation that killing tasks that aren't owned is undefined? Or it's just current Julia implementation and it may change in the future?
I think it's implementable but I still don't know the task runtime end-to-end.
Hasn't Workqueue inconsistency become a problem?
No. I also don't care about the return value of that task (it basically lives forever and I communicate with it via a Channel), so non-atomically setting the result doesn't matter
If you are using it with threading, non-atomically setting it breaks your program. There's absolutely no guarantee that the object you set is meaningful on the reader's side.
Even if everything is @async
task, I think it'd be nice to handle this the right way in an important software like VS code plugin.
What I was trying to say is that the result is never read
But yes, I'm all for cleaning up that part of the codebase!
Yeah, you'd need some close-able object there. Does HTTP have open-like API?
Well, according to their documentation there is something open
like:
HTTP.open("GET", "https://tinyurl.com/bach-cello-suite-1-ogg") do http
n = 0
r = startread(http)
l = parse(Int, HTTP.header(r, "Content-Length"))
open(`vlc -q --play-and-exit --intf dummy -`, "w") do vlc
while !eof(http)
bytes = readavailable(http)
write(vlc, bytes)
n += length(bytes)
println("streamed \$n-bytes \$((100*n)÷l)%\\u1b[1A")
end
end
end
The problem is they have many layers of request processing. Not sure at which point things became @async
.
If close
on HTTP.open
is "@async
-safe", maybe you can do something like
HTTP.open(...) do http
put!(handles, http)
... # actual processing
end
where the handles
is the Channel
in my example.
Sebastian Pfitzner said:
What I was trying to say is that the result is never read
Exception is a result internally. Here's Base.throwto
(similar code in schedule
too)
function throwto(t::Task, @nospecialize exc)
t.result = exc
t._isexception = true
set_next_task(t)
return try_yieldto(identity)
end
So, internally, the exception is just a result with _isexception
flag set. To throw an exception, the task needs to load it from the .result
field.
Ah, that makes sense.
I still don't know a) which consequences any data races could have in this case and b) what to do about it :stuck_out_tongue:
a) It can create an ill-formed Julia object. Since it can then corrupt the Julia runtime, anything can happen. You get a segfault if you are lucky. It's like a misplaced @inbounds
. But worse, since the error can be hard to reproduce.
b) Supporting cancellation is tedious ATM, unfortunately. You can keep around close
-able objects like my example above. Or, pass around Threads.Atomic{Bool}
and check it from time to time.
I wonder, how structured concurrency is possible then (or how trio framework is working). If I get it right, nursery gracefully shutdown all children, but it looks impossible in current Julia implementation?
I mean, you have a function, function is wrapped in task. The only way to stop the task externally is to set this isexception field and then it removes itself from the schedule. But this wouldn't gracefully unallocate all resources, right?
So, how on earth, one can externally close the task in such a way that it would gracefully release all the resources? Task should keep track of all allocated resources (like all open files), but there is no way to do it, since there is no direct communication between function and wrapping task.
Sorry maybe I mixing different concepts together, but it really looks more complicated now, then I thought before.
Yeah, it is implemented in other systems. It should also be possible to implement this as a package given all possibly blocking operations go through a consistent API. I think it is possible to do this in the Julia runtime with some manageable surgery. The main issue probably is convincing core devs that this is worth doing.
Ah, but how it can be implemented? I am sorry, my knowledge of those things is too little.
Should it be made in such a way, that for example open(io)
should be rewritten in such a way, that it leaves trace in wrapping Task? So that if task is interrupted it can call close
on all such objects? It is it something different entirely (idk, some magic with stacktraces or may be even with garbage collector).
I mean, it would be interesting to see this technique implemented, even as a package prototype.
I also seen word Tapir when things like this is discussed, but I do not know, is it relevant or not.
Yeah, that's the idea. I think creating a package for this would be useful.
(It is limited in the sense that you can't keep channel/timer/file/etc. open upon task termination, compared to full-blown Structured Concurrency systems like Trio. For this, you'd need to touch the runtime or have something like Go's select.)
Tapir is not related to the cancellation story.
Sorry for keeping asking you questions, but what do you mean by "touching runtime"? I mean all things like closing/deallocating resources happens in a runtime, how is it diiferent?
And what is the technique behind Go select
? Where one can read about it? Can it be implemented in Julia in some way (I mean, maybe even through touching low level things in LLVM somehow)? Or it will go very deep in Julia semantics?
I'm happy to nerd out about this :)
By "touching runtime," I meant inserting cancellation point for each yield to the runtime; e.g., read
ing a socket, take!
on a channel, lock
, etc. These operations need to "listen to" the actual I/O outcome (e.g., data read from the socket) and the cancellation signal.
Andrey Oskin said:
And what is the technique behind Go
select
? Where one can read about it?
See how Go's context
https://pkg.go.dev/context (coupled with errgroup
https://pkg.go.dev/golang.org/x/sync/errgroup) implements (something equivalent to) structured concurrency with the "Done
channel idiom."
Andrey Oskin said:
Can it be implemented in Julia in some way (I mean, maybe even through touching low level things in LLVM somehow)?
I think it's totally implementable by just writing plain Julia code. No need to use LLVM hacking or even any kind of metaprogramming.
But the catch is that you'd need to re-implement all concurrent communication primitives (lock, condition variable, channel, ...) if you want to support cancellation (or select
-like operations) on them. It is doable, and in fact I'm experimenting with it a bit.
It's probably easier to just directly implement channel and select
only for channel (Go's strategy). But I have a bit more roundabout idea: (1) Implement Reagent (https://doi.org/10.1145/2254064.2254084) (2) Implement Concurrent ML (https://en.wikipedia.org/wiki/Concurrent_ML) on top of it (3) Implement Racket's Kill-safe synchronization abstractions (https://doi.org/10.1145/996893.996849) on top of it. This hopefully gives us very extensible concurrent synchronization mechanism which then let us create very user-friendly concurrency API like Trio on top of it.
More resources:
A talk on OCaml's Reagent implementation. A nice high-level overview:
LDN Functionals #8 KC Sivaramakrishnan: OCaml multicore and programming with Reagents - YouTube
A talk on the high-level idea of Concurrent ML. The code is kinda hard to read which is rather sad. But I think it's a nice accessible introduction. I learnt about Reagent via this talk.
Michael Sperber - Concurrent ML - The One That Got Away - Code Mesh 2017 - YouTube
Another talk on Concurrent ML. It was kinda hard to follow for me. But it goes into some implementation details.
Andy Wingo - Channels, Concurrency, and Cores: A new Concurrent ML implementation - YouTube
But, as I said before, it's totally possible that a package that automates the close
-on-cancel pattern is sufficient for many practical problems. I know I'm in a rather deep stack of yak shaving :)
Haha, deep indeed. The other day I was reading https://carllerche.com/2021/06/17/six-ways-to-make-async-rust-easier/ which discusses kill safety (or the lack of it) in Rust. It's a good thoughtful writeup of the current state of some of these things in Rust. We don't have cancellation in Julia, so we don't have some of their problems yet. But otoh we have all the other problems that lack of cancellation causes :sweat_smile:
Related: CancellationTokens.jl
So, here's the first step in my attempt to support cancellation in a rather roundabout way: https://github.com/tkf/Reagents.jl
I tired to summarize what I was saying above here: https://github.com/tkf/Reagents.jl/discussions/5
Oh, I am so anxious to give it a spin :-)
That nice feeling of christmas present unpacking :-D
This is still a very low-level infrastructure. So don't hold your breath yet :)
The rust link from @Chris Foster seems to be proposing the fix for all their issues is to do what we do now in Julia (with implicit cancellation tokens on resources):
Cancel spawned tasks by disabling resources from completing.
@Takafumi Arakaki (tkf) I like that code snippet you posted at the top. Have you ever thought about how we could automate that better? I have wanted to make @sync
also be able to manage in-flight resources like that, but been unsure where to start on it.
Good to know that you like the example! It was basically an outcome of me trying to understand your philosophy of the error handling/cancellation of concurrent programs :), to figure out the best practice for doing this in current Julia.
To add something like handle
to @sync
, I think the main questions are:
(1) Should handle
be dynamically or lexically scoped? I prefer dynamically scoped direction but then we'd need to use something like task-local or context variable or this.
Or maybe it can be done lexically by default but we can also add a macro @syncgorup()
that returns an object wrapping the current sync_varname
and handle
. We can then bind
a closable object to the "syncgroup" object. It's still a bit manual but it's nice to automate the try
-catch
dance. In the lexical environment of @sync
, we can simply use (say) @syncbind
or something.
(2) Should the exit be non-local? In my code I used return
s like this
try
wait(timer)
catch err
err isa EOFError && return # closed by another task
rethrow()
end
I could use return
here since I know it'd end @async
. But we can't do this for arbitrary code (e.g., the user might want to push an object to a deal with the timer
inside a open(...) do
block). If we go this direction, I think we'd need something like a Cancelled
exception.
Once these design decisions are address, I imagine implementation would be straightforward.
Also, FYI, lexically scoped non-local cancellation is now implemented in https://github.com/tkf/Julio.jl
Just reading the Rust blog, I didn't realize Tokio's select
does not guarantee atomicity. It seems like an unfortunate design... (The blog post also discuss restricting it.)
Reagents actually help writing something like this. Julio.select
does not consume an IO
if it's not selected: https://tkf.github.io/Julio.jl/dev/tutorials/select/#Selecting-an-arbitrary-event (It's done using a manager task ATM but I think it's possible to avoid this by touching lower-level interface)
Last updated: Nov 22 2024 at 04:41 UTC