ParametricProcesses
offers a parametric Worker
type and a ProcessManager
API capable of facilitating multiple forms of parallel processing and high-level declarative Distributed
worker management.
using Pkg; Pkg.add("ParametricProcesses")
# Unstable:
using Pkg; Pkg.add("ParametricProcesses", rev = "Unstable")
Before trying to use threaded Workers
(Workers{Threaded}
), make sure to start julia with multiple threads!
julia --threads 6
- For a full list of exports, try
?ParametricProcesses
using ParametricProcesses
procs = processes(5)
x = 5
firstjob = new_job(x) do x::Int64
for n in 1:x
println("hello")
sleep(2)
end
end
secondjob = new_job(x) do x::Int64
sleep(1)
for n in 1:x
println("world")
sleep(2)
end
end
distribute!(procs, firstjob, secondjob)
julia> distribute!(procs, firstjob, secondjob)
2-element Vector{Int64}:
7
8
julia> From worker 7: hello
From worker 8: world
From worker 7: hello
From worker 8: world
From worker 7: hello
From worker 8: world
From worker 7: hello
From worker 8: world
From worker 7: hello
From worker 8: world
The typical ParametricProcesses
workflow involves creating a process manager with workers, then creating jobs and distributing them amongst those workers using assign!
and distribute!
. To get started, we can create a ProcessManager
by using the processes
Function. This Function
will take an Int64
and optionally, a Process
type. The default process type will be Threaded
, so ensure you have multiple threads for the following example:
procs = processes(5)
We can create a process manager with workers of any type using this same Function
, processes
.
async_procs = processes(2, Async)
Workers
are held in the ProcessManager.workers
field, we can also add workers directly with the add_workers!
function, or create workers manually and push!
them.
julia> add_workers!(pm, 1, Threaded, "emma the worker <3")
2 |Threaded process: emma the worker <3 (inactive)
julia> w = Worker{Async}("steve the worker", 20)
20 |Async process: steve the worker (inactive)
julia> push!(pm, w)
2 |Threaded process: emma the worker <3 (inactive)
20 |Async process: steve the worker (inactive)
join("$(w.name)\n" for w in pm.workers)
"emma the worker <3
steve the worker
"
Workers
can be indexed by their name or their pid.
julia> pm["steve the worker"]
20 |Async process: steve the worker (inactive)
julia> pm[2]
2 |Threaded process: emma the worker <3 (inactive)
Here is a list of other functions used to manage workers.
close(pm::ProcessManager)
- closes all activeWorkers
inpm
.delete!(pm::ProcessManager, pid::Int64)
- closesWorker
bypid
delete!(pm::ProcessManager, name::String)
- closesWorker
byname
.worker_pids(pm::ProcessManager)
- returns worker process identifiers for allWorkers
inpm.workers
waitfor(pm::ProcessManager, pids::Any ...)
- waits forpids
to finish, then returns their returns in aVector{Any}
put!(pm::ProcessManager, pids::Vector{Int64}, vals ...)
- serializes data and defines in in theMain
of each process inpids
.
There is also @everywhere
used to define functions and modules across all workers, as well as @distribute
to use all available workers for iteration.
@time @distribute for x in 1:5
sleep(3)
end
@time for x in 1:5
sleep(3)
end
@everywhere
is the more important of the two. put!
can be used to transmit data, but this will not work for functions or modules -- @everywhere
must be used for this, after the workers are open.
using ParametricProcesses
# make workers first
pm = processes(2)
# using a `Module`
@everywhere using JSON
# using a `Function`
@everywhere function sample()
println("sample")
end
jbs = (new_job(JSON.parse, "{\"x\":5}"), new_job(sample))
pids = distribute!(pm, jbs ...)
# -- v output
From worker 3: sample
2-element Vector{Int64}:
2
3
# --
rets = waitfor(pm, pids ...)
println("x is $(rets[1]["x"])")
# - v output
x is 5
- For a full list of exports, try
?ParametricProcesses
In order to use our threads to complete tasks, we will need to construct a sub-type of AbstractJob
. The running type for this is ProcessJob
, which may be called from the new_job
binding. We provide this with a Function
that takes arguments, as well as the arguments we seek to provide to that Function
(if any).
new_job(f::Function, args ...; keyargs ...)
myjob = new_job(readdir, ".")
From here, we have access to the following functions to distribute our jobs amongst our Workers
.
distribute!
assign!
assign_open!
distribute_open!
waitfor
is used to wait for certain workers to finish their tasks, getting their returns as they complete.
Consider the following waitfor
example:
pm = processes(4)
jb = new_job() do
sleep(10)
@info "hello world!"
return 55
end
assign!(pm, 2, jb)
ret = waitfor(pm, 2); println("worker 2 completed, it returned: ", ret[1])
# From worker 2: [ Info: hello world!
# worker 2 completed, it returned: 55
Feasibly, you can pass the ProcessManager
to all workers and manage processes from different workers by using @everywhere
.
This simple example shows how jobs (which ideally would be more CPU intensive and less memory-intensive than this,) can easily be distributed amongst dependencies -- especially for simple Function
calls like parse_props
below:
using ParametricProcesses
using Test
procs = processes(2)
@everywhere function parse_props(s::String)
propkeys = split(s, ";")
filter!(t -> ~(isnothing(t)), [begin
splts = split(kp, ":")
if length(splts) < 2
nothing
else
splts[1] => splts[2]
end
end for kp in propkeys])
end
firstset = join("$(rand(500:5000)):$(rand(500:5000));" for n in 1:5000)
secondset = join("$(rand(500:5000)):$(rand(500:5000));" for n in 1:50000)
thirdset = join("$(rand(500:5000)):$(rand(500:5000));" for n in 1:5000)
fourthset = join("$(rand(500:5000)):$(rand(500:5000));" for n in 1:100000)
fifthset = join("$(rand(500:5000)):$(rand(500:5000));" for n in 1:50000)
sets = (firstset, secondset, thirdset, fourthset, fifthset)
ret = vcat([parse_props(set) for set in sets] ...)
jbs = (new_job(parse_props, set) for set in sets)
ids = distribute!(procs, worker_pids(procs), jbs ...)
mret = vcat(waitfor(procs, ids ...) ...)
@test length(ret) == length(mret)
In the above example, distribute!
is used to perform the tasks on 5 threads instead of one. While this does not necessarily offer a huge benefit to performance as parsing CSS is pretty simple and it is more CPU work to serialize the data for the thread, this examples does show pretty well how to easily replicate tasks across several workers.
There are several ways to contribute to the ParametricProcesses
package.
- submitting issues (guidelines)
- creating
Worker
extensions. - forking and pull-requestion (guildelines)
- trying other chifi projects.
- contributing to other chifi projects (gives more attention here).
Adding your own Workers
is pretty straightforward. We can create new functionality by creating a new <: Process
or a new <: AbstractWorker
. A Process
is used to change the functionality of a Worker
, an AbstractWorker
extension usually means we need to facilitate different types of Worker
data or ProcessManager
functionality. Creating a Process
is very simple, as a Process
is simply an abstract
type.
abstract type CUDA <: Process end
From here, we have a few bindings which will need to be defined:
close(w::Worker{Process})
create_workers(n::Int64, of::Type{Process},
names::Vector{String} = ["$e" for e in 1:n])
assign!(assigned_worker::Worker{Process}, job::AbstractJob)
Pretty simple; these are the main functionality that changes when we are using different hardware -- allocating jobs, creating workers to do the jobs, and closing the workers will all be different depending on what Process
we are using. Fortunately, a Worker
will fit entirely into the API by simply extending these three, so with these simple functions we can easily create high-level bindings to distribute our jobs over a myriad of different worker types. If we wanted to create our own Worker
, things would get a little more complicated. It is also possible to make your own sub-type of AbstractProcessManager
or AbstractJob
and extend that way. All of the information needed to follow consistencies for these super-types are available in the documentation.
We are not super picky on contributions, as the goal of chifi is to get more people involved in computing. However, if you want your code merged there are definitely a few things to be aware of before contributing to this package.
- If there is no issue for what you want to do, create an issue
- If you have multiple issues, submit multiple issues rather than typing each issue into one issue.
- Make sure the issue you are solving or feature you want to implement is still feasible on
Unstable
-- this is the top-level development branch which represents the latest unstable changes. - Please format your documentation using the technique presented in the rest of the file.
- Make sure
Pkg.test("ParametricProcesses")
works with your version ofParametricProcesses
before making a pull-request.