Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
'

echo "+++ Run tests"
export JULIA_MPI_TEST_EXCLUDE="test_allreduce.jl,test_reduce.jl,test_scan.jl"
export JULIA_MPI_TEST_EXCLUDE="test_scan.jl"
julia --color=yes --project=. -e '
import Pkg
Pkg.test("MPI"; test_args=["--backend=AMDGPU"])
Expand Down
3 changes: 3 additions & 0 deletions docs/src/reference/collective.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ MPI.Ibarrier
MPI.Bcast!
MPI.Bcast
MPI.bcast
MPI.Ibcast!
```

## Gather/Scatter
Expand Down Expand Up @@ -55,8 +56,10 @@ MPI.Neighbor_alltoallv!
```@docs
MPI.Reduce!
MPI.Reduce
MPI.Ireduce!
MPI.Allreduce!
MPI.Allreduce
MPI.Iallreduce!
MPI.Scan!
MPI.Scan
MPI.Exscan!
Expand Down
113 changes: 113 additions & 0 deletions src/collective.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,29 @@ function bcast(obj, root::Integer, comm::Comm)
return obj
end


"""
Ibcast!(buf, comm::Comm; root::Integer=0[, req::AbstractRequest = Request()])

Broadcast the buffer `buf` from `root` to all processes in `comm`.

# External links
$(_doc_external("MPI_Ibcast"))
"""
Ibcast!(buf, comm::Comm; root::Integer=Cint(0)) =
Ibcast!(buf, root, comm)

function Ibcast!(buf::Buffer, root::Integer, comm::Comm, req::AbstractRequest = Request())
# int MPI_Ibcast(void *buffer, int count, MPI_Datatype datatype, int root,
# MPI_Comm comm, MPI_Request *request)
API.MPI_Ibcast(buf.data, buf.count, buf.datatype, root, comm, req)
return req
end
function Ibcast!(data, root::Integer, comm::Comm)
Ibcast!(Buffer(data), root, comm)
end


"""
Scatter!(sendbuf::Union{UBuffer,Nothing}, recvbuf, comm::Comm;
root::Integer=0)
Expand Down Expand Up @@ -716,6 +739,57 @@ function Reduce(object::T, op, root::Integer, comm::Comm) where {T}
end
end

## Ireduce

"""
Ireduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0)
Ireduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0)

Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf` and
stores the result in `recvbuf` on the process of rank `root`.

On non-root processes `recvbuf` is ignored, and can be `nothing`.

To perform the reduction in place, provide a single buffer `sendrecvbuf`.

Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction.

# See also
- [`Reduce!`](@ref) the equivalent blocking operation.
- [`Iallreduce!`](@ref) to send reduction to all ranks.
- [`Op`](@ref) for details on reduction operators.

# External links
$(_doc_external("MPI_Ireduce"))
"""
Ireduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) =
Ireduce!(sendrecvbuf, op, root, comm, req)
Ireduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) =
Ireduce!(sendbuf, recvbuf, op, root, comm, req)

function Ireduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, root::Integer, comm::Comm, req::AbstractRequest=Request())
# int MPI_Ireduce(const void* sendbuf, void* recvbuf, int count,
# MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm,
# MPI_Request* req)
API.MPI_Ireduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, root, comm, req)
setbuffer!(req, rbuf)
return req
end

Ireduce!(rbuf::RBuffer, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) =
Ireduce!(rbuf, Op(op, eltype(rbuf)), root, comm, req)
Ireduce!(sendbuf, recvbuf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) =
Ireduce!(RBuffer(sendbuf, recvbuf), op, root, comm, req)

# inplace
function Ireduce!(buf, op, root::Integer, comm::Comm, req::AbstractRequest=Request())
if Comm_rank(comm) == root
Ireduce!(IN_PLACE, buf, op, root, comm, req)
else
Ireduce!(buf, nothing, op, root, comm, req)
end
end

## Allreduce

# mutating
Expand Down Expand Up @@ -775,6 +849,45 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) =
Allreduce(obj::T, op, comm::Comm) where {T} =
Allreduce!(Ref(obj), Ref{T}(), op, comm)[]

## Iallreduce

"""
Iallreduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()])
Iallreduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()])

Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf`, storing
the result in the `recvbuf` of all processes in the group.

If only one `sendrecvbuf` buffer is provided, then the operation is performed in-place.

Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction.

# See also
- [`Allreduce!`](@ref) the equivalent blocking operation.
- [`Ireduce!`](@ref) to send reduction to a single rank.
- [`Op`](@ref) for details on reduction operators.

# External links
$(_doc_external("MPI_Iallreduce"))
"""
function Iallreduce!(rbuf::RBuffer, op::Union{Op, MPI_Op}, comm::Comm, req::AbstractRequest=Request())
@assert isnull(req)
# int MPI_Iallreduce(const void* sendbuf, void* recvbuf, int count,
# MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
# MPI_Request* req)
API.MPI_Iallreduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, comm, req)
setbuffer!(req, rbuf)
return req
end
Iallreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) =
Iallreduce!(rbuf, Op(op, eltype(rbuf)), comm, req)
Iallreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) =
Iallreduce!(RBuffer(sendbuf, recvbuf), op, comm, req)

# inplace
Iallreduce!(rbuf, op, comm::Comm, req::AbstractRequest=Request()) =
Iallreduce!(IN_PLACE, rbuf, op, comm, req)

## Scan

# mutating
Expand Down
41 changes: 41 additions & 0 deletions test/mpi_support_test.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
include("common.jl")

MPI.Init()

# Those MPI calls may be unsupported features (e.g. for GPU backends), and will raise SIGSEGV
# (or a similar signal) when called, which cannot be handled in Julia in a portable way.

op = ARGS[1]
if op == "Allreduce"
# Allreduce is unsupported for AMDGPU with UCX
send_arr = ArrayType(zeros(Int, 1))
recv_arr = ArrayType{Int}(undef, 1)
synchronize()
MPI.Allreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD)

elseif op == "Iallreduce"
# Iallreduce is unsupported for CUDA with OpenMPI 5 + UCX
send_arr = ArrayType(zeros(Int, 1))
recv_arr = ArrayType{Int}(undef, 1)
synchronize()
req = MPI.Iallreduce!(send_arr, recv_arr, +, MPI.COMM_WORLD)
MPI.Wait(req)

elseif op == "Reduce"
# Reduce is unsupported for AMDGPU with UCX
send_arr = ArrayType(zeros(Int, 1))
recv_arr = ArrayType{Int}(undef, 1)
synchronize()
MPI.Reduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0)

elseif op == "Ireduce"
# Ireduce is unsupported for CUDA with OpenMPI 5 + UCX
send_arr = ArrayType(zeros(Int, 1))
recv_arr = ArrayType{Int}(undef, 1)
synchronize()
req = MPI.Ireduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=0)
MPI.Wait(req)

else
error("unknown test: $op")
end
17 changes: 17 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ if Sys.isunix()
include("mpiexecjl.jl")
end

function is_mpi_operation_supported(mpi_op, n=nprocs)
test_file = joinpath(@__DIR__, "mpi_support_test.jl")
cmd = `$(mpiexec()) -n $n $(Base.julia_cmd()) --startup-file=no $test_file $mpi_op`
cmd = pipeline(ignorestatus(cmd); stderr=devnull)
process = run(cmd)
supported = success(process)
!supported && @warn "$mpi_op is unsupported with $backend_name"
return supported
end

if ArrayType != Array # we expect that only GPU backends can have unsupported features
ENV["JULIA_MPI_TEST_ALLREDUCE"] = is_mpi_operation_supported("Allreduce")
ENV["JULIA_MPI_TEST_REDUCE"] = is_mpi_operation_supported("Reduce")
ENV["JULIA_MPI_TEST_IALLREDUCE"] = is_mpi_operation_supported("Iallreduce")
ENV["JULIA_MPI_TEST_IREDUCE"] = is_mpi_operation_supported("Ireduce")
end

excludefiles = split(get(ENV,"JULIA_MPI_TEST_EXCLUDE",""),',')

testdir = @__DIR__
Expand Down
25 changes: 25 additions & 0 deletions test/test_allreduce.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
include("common.jl")

allreduce_supported = get(ENV, "JULIA_MPI_TEST_ALLREDUCE", "true") == "true"
iallreduce_supported = get(ENV, "JULIA_MPI_TEST_IALLREDUCE", "true") == "true"
if !allreduce_supported
@warn "Skipping all tests in 'test_allreduce.jl' as reductions are unsupported"
exit(0)
end

MPI.Init()

comm_size = MPI.Comm_size(MPI.COMM_WORLD)
Expand All @@ -13,6 +20,7 @@ else
operators = [MPI.SUM, +, (x,y) -> 2x+y-x]
end


for T = [Int]
for dims = [1, 2, 3]
send_arr = ArrayType(zeros(T, Tuple(3 for i in 1:dims)))
Expand Down Expand Up @@ -43,6 +51,23 @@ for T = [Int]
vals = MPI.Allreduce(send_arr, op, MPI.COMM_WORLD)
@test vals isa ArrayType{T}
@test vals == comm_size .* send_arr

# Nonblocking
recv_arr = ArrayType{T}(undef, size(send_arr))
if iallreduce_supported
req = MPI.Iallreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD)
MPI.Wait(req)
@test recv_arr == comm_size .* send_arr
end

# Nonblocking (IN_PLACE)
recv_arr = copy(send_arr)
synchronize()
if iallreduce_supported
req = MPI.Iallreduce!(recv_arr, op, MPI.COMM_WORLD)
MPI.Wait(req)
@test recv_arr == comm_size .* send_arr
end
end
end
end
Expand Down
35 changes: 35 additions & 0 deletions test/test_ibcast.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
include("common.jl")
using Random

MPI.Init()

comm = MPI.COMM_WORLD
root = 0
matsize = (17,17)

for T in MPITestTypes
# This test depends on the stability of the rng and we have observed with
# CUDA.jl that it is not guaranteed that the same number of rand calls will
# occur on each rank. (This is a hypothesis). To be sure we shall seed the rng
# just before we call rand.
Random.seed!(17)
A = ArrayType(rand(T, matsize))
B = MPI.Comm_rank(comm) == root ? A : similar(A)
req = MPI.Ibcast!(B, comm; root=root)
sleep(rand())
MPI.Wait(req)
@test B == A
end

# Char
A = ['s', 't', 'a', 'r', ' ', 'w', 'a', 'r', 's']
B = MPI.Comm_rank(comm) == root ? A : similar(A)
req = MPI.Ibcast!(B, comm; root=root)
sleep(rand())
MPI.Wait(req)
@test B == A



MPI.Finalize()
@test MPI.Finalized()
36 changes: 36 additions & 0 deletions test/test_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ const can_do_closures =
Sys.ARCH !== :aarch64 &&
!startswith(string(Sys.ARCH), "arm")

reduce_supported = get(ENV, "JULIA_MPI_TEST_REDUCE", "true") == "true"
ireduce_supported = get(ENV, "JULIA_MPI_TEST_IREDUCE", "true") == "true"
if !reduce_supported
@warn "Skipping all tests in 'test_reduce.jl' as reductions are unsupported"
exit(0)
end

using DoubleFloats

MPI.Init()
Expand Down Expand Up @@ -116,6 +123,26 @@ for T = [Int]
@test recv_arr isa ArrayType{T}
@test recv_arr == sz .* view(send_arr, 2:3)
end

# Nonblocking
recv_arr = ArrayType{T}(undef, size(send_arr))
if ireduce_supported
req = MPI.Ireduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root)
MPI.Wait(req)
if isroot
@test recv_arr == sz .* send_arr
end
end

# Nonblocking (IN_PLACE)
recv_arr = copy(send_arr)
if ireduce_supported
req = MPI.Ireduce!(recv_arr, op, MPI.COMM_WORLD; root=root)
MPI.Wait(req)
if isroot
@test recv_arr == sz .* send_arr
end
end
end
end
end
Expand All @@ -131,6 +158,15 @@ else
@test result === nothing
end

recv_arr = isroot ? zeros(eltype(send_arr), size(send_arr)) : nothing
if ireduce_supported
req = MPI.Ireduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root)
MPI.Wait(req)
if rank == root
@test recv_arr ≈ [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64)
end
end

MPI.Barrier( MPI.COMM_WORLD )

GC.gc()
Expand Down
Loading