This package is to be used with FluxML to train, evaluate (inference), and analyze models on a distributed cluster. At the moment only the Slurm cluster manager has been tested.
Comming soon
These examples assumes that you have already partitioned the data into multiple DataFrames and serialized them using Serialization package into shard_file_list
using Distributed
p = addProc(3)
@everywhere using DistributedFluxML
batch_size=8
epochs = 50
deser_fut = [@spawnat w global rawData = deserialize(f)
for (w, f) in zip(p, shard_file_list)]
for fut in deser_fut
wait(fut)
end
@everywhere p labels = ["Iris-versicolor", "Iris-virginica", "Iris-setosa"]
@everywhere p x_array =
Array(rawData[:,
[:sepal_l, :sepal_w,
:petal_l, :petal_w
]])
@everywhere p y_array =
Flux.onehotbatch(rawData[:,"class"],
labels)
@everywhere p dataChan = Channel(1) do ch
n_chunk = ceil(Int,size(x_array)[1]/$batch_size)
x_dat = Flux.chunk(transpose(x_array), n_chunk)
y_dat = Flux.chunk(y_array, n_chunk)
for epoch in 1:$epochs
for d in zip(x_dat, y_dat)
put!(ch, d)
end
end
put!(ch, :End)
end
@everywhere p datRemChan = RemoteChannel(() -> dataChan, myid())
trainWorkers_shift = circshift(p, 1)
# ^^^ shift workers to reuse workers as ^^^
# ^^^ remote data hosts ^^^
datRemChansDict = Dict(k => @fetchfrom w datRemChan for (k,w) in zip(p, trainWorkers_shift))
loss_f = Flux.Losses.logitcrossentropy
opt = Flux.Optimise.ADAM(0.001)
model = Chain(Dense(4,8),Dense(8,16), Dense(16,3))
empty!(status_array)
DistributedFluxML.train!(loss_f, model, datRemChansDict, opt, p; status_chan)
Argument grad_calc in train! is meant for novel nomalization schemes. For example, if your DataChannel returns a 3 touple, say (x, s, y), a desired grad calc coule be
function node_norm_grad_calc(xsy, loss_f, model; device=gpu)
x,s,y = xsy
loss_f(model(x), y, agg=sum)/s
end
using Distributed
p = addProc(3)
@everywhere using DistributedFluxML
DistributedFluxML.OptOutAllReduce.init(p)
mock_vals = [1,2,3]
allR_fut = [@spawnat w DistributedFluxML.OptOutAllReduce.allReduce(+, v)
for (v,w) in zip(mock_vals,p)]
[fetch(fut) for fut in allR_fut] # [(6,3), (6,3), (6,3)]
mock_vals = [1,:Skip,3]
allR_fut = [@spawnat w DistributedFluxML.OptOutAllReduce.allReduce(+, v)
for (v,w) in zip(mock_vals,p)]
[fetch(fut) for fut in allR_fut] # [(4,2), (4,2), (4,2)]