Stream: helpdesk (published)

Topic: managing background task


view this post on Zulip Fredrik Ekre (May 18 2021 at 18:38):

In LokiLogger.jl, which pushes log messages using HTTP requests, I want to do some buffering and only "flush" the logger, i.e. fire off the HTTP request, every 5 minutes or so. What would be a good way to manage a background task doing this?
I am thinking I can just schedule a task that does something like

task = @async begin
    sleep(5 * 60)
    # flush the logger
end

and store task in the logger struct, but how do I gracefully shut down that task?

view this post on Zulip Mason Protter (May 18 2021 at 18:41):

One option would be Base.throwto in order to shut it down

view this post on Zulip Mason Protter (May 18 2021 at 18:42):

There's probably a more blessed way to do it though

view this post on Zulip Fredrik Ekre (May 18 2021 at 18:42):

Right, I have seen that trick but doesn't seem very "graceful" :)

view this post on Zulip Mason Protter (May 18 2021 at 18:45):

Hmm, what about schedule(task, InterruptException(), error=true)? I'm not sure if that's any different than just throwto,

view this post on Zulip Fredrik Ekre (May 18 2021 at 18:46):

Does that wake up the task if it is currently sleeping?

view this post on Zulip Mason Protter (May 18 2021 at 18:47):

Hm, reading https://discourse.julialang.org/t/stop-terminate-a-sub-task-started-with-async/32193/10 @Jameson Nash is saying that

No, there’s no generally safe way to interrupt a task. But you can interrupt any waitable object (by closing it).

view this post on Zulip Mason Protter (May 18 2021 at 18:48):

Fredrik Ekre said:

Does that wake up the task if it is currently sleeping?

yes

view this post on Zulip Mason Protter (May 18 2021 at 18:49):

julia> task = @async begin
           sleep(5 * 60)
           # flush the logger
       end
Task (runnable) @0x00007fa6d39457b0

julia> task |> istaskstarted
true

julia> task |> istaskfailed
false

julia> schedule(task, InterruptException(), error=true)
Task (failed) @0x00007fa6d39457b0
InterruptException:
Stacktrace:
 [1] try_yieldto(undo::typeof(Base.ensure_rescheduled))
   @ Base ./task.jl:705
 [2] wait
   @ ./task.jl:764 [inlined]
 [3] wait(c::Base.GenericCondition{Base.Threads.SpinLock})
   @ Base ./condition.jl:106
 [4] _trywait(t::Timer)
   @ Base ./asyncevent.jl:111
 [5] wait
   @ ./asyncevent.jl:129 [inlined]
 [6] sleep(sec::Int64)
   @ Base ./asyncevent.jl:214
 [7] macro expansion
   @ ./REPL[8]:2 [inlined]
 [8] (::var"#4#5")()
   @ Main ./task.jl:406

julia> task |> istaskfailed
true

view this post on Zulip Fredrik Ekre (May 18 2021 at 18:50):

Right, so for example Channel is a waitable object, and I think I have seen something like

julia> ch = Channel() do ch
           FLUSHTOKEN = 1
           SHUTDOWNTOKEN = 0
           for token in ch
               if token == FLUSHTOKEN
                   println("flushing...")
               elseif token == SHUTDOWNTOKEN
                   println("shutting down...")
                   break
               end
           end
       end
Channel{Any}(0) (empty)

julia> put!(ch, 1);
flushing...

julia> put!(ch, 0);
shutting down...

suggested before. You could then (in a finalizer?) do

put!(ch, SHUTDOWNTOKEN)
wait(ch)

view this post on Zulip Fredrik Ekre (May 18 2021 at 18:51):

But then I need another task that, every 5 minutes, does put!(ch, FLUSHTOKEN) :P

view this post on Zulip Mason Protter (May 18 2021 at 18:53):

This method only uses one task and one channel: https://discourse.julialang.org/t/how-to-kill-thread/34236/8?u=mason

view this post on Zulip Mason Protter (May 18 2021 at 18:54):

That is, just put a while loop inside the Channel

view this post on Zulip Fredrik Ekre (May 18 2021 at 18:54):

Yea, but that just solves the problem by sleeping shorter (kinda). Perhaps that is fine.

view this post on Zulip Mason Protter (May 18 2021 at 18:54):

Couldn't you just sleep the full 5 minutes and then at the end of the 5 minutes check if you put in the STOP token and break?

view this post on Zulip Fredrik Ekre (May 18 2021 at 18:55):

Don't I risk having to wait 5 minutes in the worst case scenario then?

view this post on Zulip Jameson Nash (May 18 2021 at 18:57):

There's not really any cost to finishing waiting, but there is a cost to killing the task

view this post on Zulip Jameson Nash (May 18 2021 at 18:57):

That said, it sounds like you want close(ch), so you can know to end?

view this post on Zulip Fredrik Ekre (May 18 2021 at 18:59):

In the for loop example you mean? In that case close(ch) would terminate directly IIUC. But when using for loop I can only send the flush-token whenever there is a new log message beeing processed I guess.

view this post on Zulip Fredrik Ekre (May 18 2021 at 19:02):

So yea, that could be written like

julia> ch = Channel() do ch
           for _ in ch
               println("flushing...")
           end
       end

I guess, and then keep track of time on the outside and put!ing when it is time.

view this post on Zulip Fredrik Ekre (May 18 2021 at 19:12):

I guess that can just as well be spelled as:

function handle_message(...)
    if time_to_flush
        task = @async do_flush(...)
        yield()
        wait(task)
    end
end

view this post on Zulip Takafumi Arakaki (tkf) (May 19 2021 at 01:23):

For the question in the OP, it seems to be easier to use Timer(do_flush, 5 * 60; interval = 5 * 60), since you can close the timer to shut it down.

(Generic shutdown requires a manual cancellation token in Julia and is a much harder problem.)

view this post on Zulip Jameson Nash (May 19 2021 at 02:32):

True, in this case the Timer seems to be the actual resource


Last updated: Oct 02 2023 at 04:34 UTC