Stream: helpdesk (published)

Topic: Pipe output using Distributed.jl


view this post on Zulip Lilith Hafner (Apr 10 2024 at 00:28):

Is it possible to pipe text in realtime from one Julia process to another using Distributed.jl?

I'd like output to become available in the parent process as soon as it is produced in the child process and not have to wait for a pipe or channel to be closed.

Bonus points for being able to use the ::IO interface on both ends.

view this post on Zulip Sukera (Apr 10 2024 at 07:05):

You could build something with shared memory, but that's probably quite a bit more work than necessary. The "canonical" IO-like interface for inter process communication is a Pipe, which lets the OS handle buffering etc. Distributed.jl itself doesn't provide such functionality, other than through RemoteChannel (the default output streams of workers are used for setup/regular IO only).

view this post on Zulip Daniel Wennberg (Apr 10 2024 at 18:28):

I don't think a channel needs to be closed for the data to be available to other processes?

view this post on Zulip Lilith Hafner (Apr 10 2024 at 19:33):

Yeah, so how do I redirect stdout to a channel?

view this post on Zulip Daniel Wennberg (Apr 10 2024 at 20:48):

This isn't really my wheelhouse, but loosely inspired by how Pluto captures stdout (https://github.com/fonsp/Pluto.jl/blob/main/src/runner/PlutoRunner/src/PlutoRunner.jl#L2772-L2845) + some experience with channels, the following seems to work (obviously replace Channel with RemoteChannel for multiprocessing):

ch = Channel{UInt8}()
pipe = Pipe()
Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)

Threads.@spawn begin
    pipe_reader = Base.pipe_reader(pipe)
    while !eof(pipe_reader)
        for c in readavailable(pipe_reader)
            put!(ch, c)
        end
    end
    println("Pipe closed, pipe reader closing channel")
    close(ch)
    println("Pipe reader done")
end

Threads.@spawn begin
    for c in ch
        println(c)
    end
    println("Pipe writer done")
end

redirect_stdout(Base.pipe_writer(pipe)) do
    println("Hello, world!")
end
println("Main task closing pipe")
close(Base.pipe_writer(pipe))
julia> include("pipe2channel.jl")
Main task closing pipe
72

julia> 101
julia> 108
108
111
44
32
119
111
114
108
100
33
10
Pipe closed, pipe reader closing channel
Pipe reader done
Pipe writer done

view this post on Zulip Daniel Wennberg (Apr 10 2024 at 23:01):

Here's a nicer example using RemoteChannel. Note that there's a bug in RemoteChannels implementation of the iterator interface, so you can't simply do bytes = collect(ch). Fixed in this PR: https://github.com/JuliaLang/Distributed.jl/pull/100.

using Distributed

@everywhere function redirect_stdout_to_channel(f, ch)
    pipe = Pipe()
    Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)
    @sync begin
        Threads.@spawn begin
            pipe_reader = Base.pipe_reader(pipe)
            while !eof(pipe_reader)
                for c in readavailable(pipe_reader)
                    put!(ch, c)
                end
            end
        end
        pipe_writer = Base.pipe_writer(pipe)
        redirect_stdout(f, pipe_writer)
        close(pipe_writer)
    end
end

ch = RemoteChannel(() -> Channel{UInt8}())

@sync begin
    Threads.@spawn begin
        bytes = UInt8[]
        for c in ch
            push!(bytes, c)
        end
        @show String(bytes)
    end
    Threads.@spawn begin
        @sync @spawnat :any redirect_stdout_to_channel(ch) do
            println("Hello from process $(myid())")
        end
        close(ch)
    end
end
julia> include("pipe2channel.jl")
String(bytes) = "Hello from process 2\n"

view this post on Zulip Lilith Hafner (Apr 11 2024 at 01:08):

Thank you!

view this post on Zulip Daniel Wennberg (Apr 12 2024 at 03:13):

For completeness, since you asked for IO interface on both ends:

using Distributed

@everywhere function redirect_stdout_to_channel(f, ch)
    pipe = Pipe()
    Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)
    @sync begin
        Threads.@spawn begin
            while !eof(Base.pipe_reader(pipe))
                for c in readavailable(Base.pipe_reader(pipe))
                    put!(ch, c)
                end
            end
        end
        redirect_stdout(f, Base.pipe_writer(pipe))
        close(Base.pipe_writer(pipe))
    end
end

function redirect_stdin_from_channel(f, ch)
    pipe = Pipe()
    Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)
    @sync begin
        Threads.@spawn begin
            for c in ch
                write(Base.pipe_writer(pipe), c)
            end
            close(Base.pipe_writer(pipe))
        end
        redirect_stdin(f, Base.pipe_reader(pipe))
    end
end

@sync begin
    ch = RemoteChannel(() -> Channel{UInt8}(64))
    Threads.@spawn redirect_stdin_from_channel(ch) do
        while !eof(stdin)
            @show readline(stdin)
        end
    end
    Threads.@spawn begin
        @sync @spawnat :any redirect_stdout_to_channel(ch) do
            println("Hello from process $(myid()) 👷")
            print("We've got cookies 🍪")
        end
        close(ch)
    end
end
julia> include("pipethroughchannel.jl")
readline(stdin) = "Hello from process 2 👷"
readline(stdin) = "We've got cookies 🍪"
Task (done) @0x00007b74e6e43d00

view this post on Zulip Lilith Hafner (Apr 12 2024 at 14:07):

Thanks!


Last updated: Nov 22 2024 at 04:41 UTC