This package is for implementing straggler-resilient distributed algorithms and methods, e.g., stochastic gradient descent, in systems composed of a coordinator process and multiple worker processes. Stragglers are workers that experience bursts of high latency, e.g., due to network congestion. These workers may, e.g., significantly reduce the rate of convergence of distributed optimization methods, unless the implementation is resilient against stragglers.
To initiate communication, one creates an MPIAsyncPool at the coordinator, which handles connecting to the workers via MPI.Irecv! and MPI.Isend. For example:
pool = MPIAsyncPool([1, 2, 4]) # pool consisting of nodes with MPI ranks 1, 2, 4
pool = MPIAsyncPool(4) # pool consisting of nodes with MPI ranks 1, 2, 3, 4Next, the asyncmap! function is used to broadcast data, e.g., an iterate (in the case of gradient descent), to the workers and to collect responses. asyncmap! returns once results have been received from the nwait fastest workers. Alternatively, one can define a custom condition, e.g., to always wait for worker 1.
The workers communicate using regular MPI.Irecv! and MPI.Isend; see the MPI.jl documentation, and the examples directory for a complete example of how to use asyncmap!.
The docstring of asyncmap! is:
asyncmap!(pool::MPIAsyncPool, sendbuf::AbstractArray, recvbuf::AbstractArray, isendbuf::AbstractArray, irecvbuf::AbstractArray, comm::MPI.Comm; nwait::Union{<:Integer,Function}=pool.nwait, epoch::Integer=pool.epoch+1, tag::Integer=0)Send the data in
sendbufasynchronously (viaMPI.Isend) to all workers and wait for some of them to respond (via a correspondingMPI.Isend). Ifnwaitis an integer, this function returns when at leastnwaitworkers have responded. On the other hand, ifnwaitis a function, this function returns oncenwait(epoch, repochs)evaluates totrue. The second argumentrepochsis a vector of length equal to the number of workers in the pool, where thei-th element is theepochthat transmission to the corresponding worker was initiated at.Returns the
repochsvector. Similarly to theMPI.Gather!function, the elements ofrecvbufare partitioned into a number of chunks equal to the number of workers (hence, the length ofrecvbufmust be divisible by the number of workers) and the data recieved from thej-th worker is stored in thej-th partition.The buffers
isendbufandirecvbufare for internal use by this function only and should never be changed or accessed outside of it. The length ofisendbufmust be equal to the length ofsendbufmultiplied by the number of workers, andirecvbufmust have length equal to that ofrecvbuf.