Is it possible to pipe text in realtime from one Julia process to another using Distributed.jl?
I'd like output to become available in the parent process as soon as it is produced in the child process and not have to wait for a pipe or channel to be closed.
Bonus points for being able to use the ::IO interface on both ends.
You could build something with shared memory, but that's probably quite a bit more work than necessary. The "canonical" IO-like interface for inter process communication is a Pipe
, which lets the OS handle buffering etc. Distributed.jl itself doesn't provide such functionality, other than through RemoteChannel
(the default output streams of workers are used for setup/regular IO only).
I don't think a channel needs to be closed for the data to be available to other processes?
Yeah, so how do I redirect stdout to a channel?
This isn't really my wheelhouse, but loosely inspired by how Pluto captures stdout (https://github.com/fonsp/Pluto.jl/blob/main/src/runner/PlutoRunner/src/PlutoRunner.jl#L2772-L2845) + some experience with channels, the following seems to work (obviously replace Channel with RemoteChannel for multiprocessing):
ch = Channel{UInt8}()
pipe = Pipe()
Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)
Threads.@spawn begin
pipe_reader = Base.pipe_reader(pipe)
while !eof(pipe_reader)
for c in readavailable(pipe_reader)
put!(ch, c)
end
end
println("Pipe closed, pipe reader closing channel")
close(ch)
println("Pipe reader done")
end
Threads.@spawn begin
for c in ch
println(c)
end
println("Pipe writer done")
end
redirect_stdout(Base.pipe_writer(pipe)) do
println("Hello, world!")
end
println("Main task closing pipe")
close(Base.pipe_writer(pipe))
julia> include("pipe2channel.jl")
Main task closing pipe
72
julia> 101
julia> 108
108
111
44
32
119
111
114
108
100
33
10
Pipe closed, pipe reader closing channel
Pipe reader done
Pipe writer done
Here's a nicer example using RemoteChannel
. Note that there's a bug in RemoteChannel
s implementation of the iterator interface, so you can't simply do bytes = collect(ch)
. Fixed in this PR: https://github.com/JuliaLang/Distributed.jl/pull/100.
using Distributed
@everywhere function redirect_stdout_to_channel(f, ch)
pipe = Pipe()
Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)
@sync begin
Threads.@spawn begin
pipe_reader = Base.pipe_reader(pipe)
while !eof(pipe_reader)
for c in readavailable(pipe_reader)
put!(ch, c)
end
end
end
pipe_writer = Base.pipe_writer(pipe)
redirect_stdout(f, pipe_writer)
close(pipe_writer)
end
end
ch = RemoteChannel(() -> Channel{UInt8}())
@sync begin
Threads.@spawn begin
bytes = UInt8[]
for c in ch
push!(bytes, c)
end
@show String(bytes)
end
Threads.@spawn begin
@sync @spawnat :any redirect_stdout_to_channel(ch) do
println("Hello from process $(myid())")
end
close(ch)
end
end
julia> include("pipe2channel.jl")
String(bytes) = "Hello from process 2\n"
Thank you!
For completeness, since you asked for IO interface on both ends:
using Distributed
@everywhere function redirect_stdout_to_channel(f, ch)
pipe = Pipe()
Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)
@sync begin
Threads.@spawn begin
while !eof(Base.pipe_reader(pipe))
for c in readavailable(Base.pipe_reader(pipe))
put!(ch, c)
end
end
end
redirect_stdout(f, Base.pipe_writer(pipe))
close(Base.pipe_writer(pipe))
end
end
function redirect_stdin_from_channel(f, ch)
pipe = Pipe()
Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)
@sync begin
Threads.@spawn begin
for c in ch
write(Base.pipe_writer(pipe), c)
end
close(Base.pipe_writer(pipe))
end
redirect_stdin(f, Base.pipe_reader(pipe))
end
end
@sync begin
ch = RemoteChannel(() -> Channel{UInt8}(64))
Threads.@spawn redirect_stdin_from_channel(ch) do
while !eof(stdin)
@show readline(stdin)
end
end
Threads.@spawn begin
@sync @spawnat :any redirect_stdout_to_channel(ch) do
println("Hello from process $(myid()) 👷")
print("We've got cookies 🍪")
end
close(ch)
end
end
julia> include("pipethroughchannel.jl")
readline(stdin) = "Hello from process 2 👷"
readline(stdin) = "We've got cookies 🍪"
Task (done) @0x00007b74e6e43d00
Thanks!
Last updated: Dec 28 2024 at 04:38 UTC