Assume Julia >= v1.10.
Suppose a file stores a large matrix of floating point numbers in column-major order.
I can read the file sequentially with
for j in 1:n
skip(io, offset)
for i in 1:m
data[i, j] = read(io, T)
end
end
How to leverage multiple threads to read the io in parallel?
I think you would be better off optimising your read function and running it on a single thread.
A float matrix requires no parsing and can be copied directly into memory, so you should be able to hit your drive's maximum speed doing that
In your example, it looks like every column is identical. So you can read the first column, then use memcpy to fill all the other columns
I simplified the actual code. I need to do some basic processing, including changing the endianess and converting to a different floating point type.
But will try to load everything into a large blob and convert later to the final type to compare the speed.
In a more complex example, I would have two channels, let's call them filled and emptied, both with buffers (i..e Vector{UInt8} or similar). One task takes a buffer from emptied and overwrites it with raw, unparsed data from the file, then puts it into filled. Then you have N consumed tasks that takes from filled, parses the content, fills in part of the matrix, then returns the buffer by putting it back into emptied
Could you please share a MWE with your idea @Jakob Nybo Andersen ?
Yeah, here:
It's not tested, nor with the requisite error checks, so just for illustration purposes. The actual file reading happens in only one thread, since it's essentially impossible to improve linear IO reading with multiple threads, but it's sped up by making sure the thread that does the IO doesn't need to do anything else, and all the parsing is offloaded to worker tasks
# Parse raw bytes from IO and put it into the matrix.
# This happens on multiple threads concurrently
function parse_into_matrix!(
dst::Matrix{Float64},
from::Int,
data::AbstractVector{UInt8},
)
(n_elem, rest) = divrem(length(data), sizeof(eltype(dst)))
checkbounds(dst, from:from+n_elem-1)
GC.@preserve dst data begin
unsafe_copyto!(pointer(dst, from), Ptr{eltype(dst)}(pointer(src)), n_elem)
end
return dst
end
# Each worker tasks runs this function
function worker_loop(
matrix::Matrix{Float64},
filled::Channel{Tuple{Vector{UInt8}}, Int},
emptied::Channel{Vector{UInt8}},
)
for (vector, start) in filled
parse_into_matrix!(matrix, from, vector)
put!(emptied, vector)
end
end
function main(io::IO, tasks::Int)
matrix = Matrix{Float64}(undef, 1000, 1000)
filled = Channel{Tuple{Vector{UInt8}}, Int}(Inf)
emptied = Channel{Vector{UInt8}}(Inf)
# Initialize worker tasks
for _ in 1:tasks
Threads.@spawn worker_loop(matrix, filled, emptied)
end
# Initialize buffers used by both main and worker tasks
lock(emptied) do
for _ in 1:tasks + 3
put!(emptied, UInt8[])
end
end
# Read data in main task, then ship to worker tasks
idx = 1
while !eof(io)
v = take!(emptied)
resize!(v, 8 * 1024)
resize!(v, readbytes!(io, v))
put!(filled, (v, idx))
div(length(v), sizeof(Float64))
end
return matrix
end
Wow, that is too low-level compared to what I had in mind :sweat_smile:
Yeah, multithreading IO is pretty difficult
Last updated: Nov 27 2025 at 04:44 UTC