In LokiLogger.jl, which pushes log messages using HTTP requests, I want to do some buffering and only "flush" the logger, i.e. fire off the HTTP request, every 5 minutes or so. What would be a good way to manage a background task doing this?
I am thinking I can just schedule a task that does something like
task = @async begin
sleep(5 * 60)
# flush the logger
end
and store task
in the logger struct, but how do I gracefully shut down that task?
One option would be Base.throwto
in order to shut it down
There's probably a more blessed way to do it though
Right, I have seen that trick but doesn't seem very "graceful" :)
Hmm, what about schedule(task, InterruptException(), error=true)
? I'm not sure if that's any different than just throwto
,
Does that wake up the task if it is currently sleeping?
Hm, reading https://discourse.julialang.org/t/stop-terminate-a-sub-task-started-with-async/32193/10 @Jameson Nash is saying that
No, there’s no generally safe way to interrupt a task. But you can interrupt any waitable object (by closing it).
Fredrik Ekre said:
Does that wake up the task if it is currently sleeping?
yes
julia> task = @async begin
sleep(5 * 60)
# flush the logger
end
Task (runnable) @0x00007fa6d39457b0
julia> task |> istaskstarted
true
julia> task |> istaskfailed
false
julia> schedule(task, InterruptException(), error=true)
Task (failed) @0x00007fa6d39457b0
InterruptException:
Stacktrace:
[1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
@ Base ./task.jl:705
[2] wait
@ ./task.jl:764 [inlined]
[3] wait(c::Base.GenericCondition{Base.Threads.SpinLock})
@ Base ./condition.jl:106
[4] _trywait(t::Timer)
@ Base ./asyncevent.jl:111
[5] wait
@ ./asyncevent.jl:129 [inlined]
[6] sleep(sec::Int64)
@ Base ./asyncevent.jl:214
[7] macro expansion
@ ./REPL[8]:2 [inlined]
[8] (::var"#4#5")()
@ Main ./task.jl:406
julia> task |> istaskfailed
true
Right, so for example Channel
is a waitable object, and I think I have seen something like
julia> ch = Channel() do ch
FLUSHTOKEN = 1
SHUTDOWNTOKEN = 0
for token in ch
if token == FLUSHTOKEN
println("flushing...")
elseif token == SHUTDOWNTOKEN
println("shutting down...")
break
end
end
end
Channel{Any}(0) (empty)
julia> put!(ch, 1);
flushing...
julia> put!(ch, 0);
shutting down...
suggested before. You could then (in a finalizer?) do
put!(ch, SHUTDOWNTOKEN)
wait(ch)
But then I need another task that, every 5 minutes, does put!(ch, FLUSHTOKEN)
:P
This method only uses one task and one channel: https://discourse.julialang.org/t/how-to-kill-thread/34236/8?u=mason
That is, just put a while
loop inside the Channel
Yea, but that just solves the problem by sleeping shorter (kinda). Perhaps that is fine.
Couldn't you just sleep the full 5 minutes and then at the end of the 5 minutes check if you put in the STOP
token and break
?
Don't I risk having to wait 5 minutes in the worst case scenario then?
There's not really any cost to finishing waiting, but there is a cost to killing the task
That said, it sounds like you want close(ch)
, so you can know to end?
In the for
loop example you mean? In that case close(ch)
would terminate directly IIUC. But when using for
loop I can only send the flush-token whenever there is a new log message beeing processed I guess.
So yea, that could be written like
julia> ch = Channel() do ch
for _ in ch
println("flushing...")
end
end
I guess, and then keep track of time on the outside and put!
ing when it is time.
I guess that can just as well be spelled as:
function handle_message(...)
if time_to_flush
task = @async do_flush(...)
yield()
wait(task)
end
end
For the question in the OP, it seems to be easier to use Timer(do_flush, 5 * 60; interval = 5 * 60)
, since you can close
the timer to shut it down.
(Generic shutdown requires a manual cancellation token in Julia and is a much harder problem.)
True, in this case the Timer seems to be the actual resource
Last updated: Dec 28 2024 at 04:38 UTC