JobSchedulers.jl
A Julia-based job scheduler and workload manager inspired by Slurm and PBS.
Package Features
- Job and task scheduler.
- Local workload manager.
- Support CPU, memory, run time management.
- Support running a job at specific time, or a period after creating (schedule).
- Support deferring a job until specific jobs reach specific states (dependency).
- Support automatic backup and reload.
Future development
- Support command-line scheduler by using DaemonMode.jl.
- Use Documenter.jl for documentation.
Installation
JobSchedulers.jl can be installed using the Julia package manager. From the Julia REPL, type ] to enter the Pkg REPL mode and run
pkg> add JobSchedulers
To use the package, type
using JobSchedulers
Quick start
using JobSchedulers, Dates
Scheduler Controls
scheduler_start()
# [ Info: Scheduler starts.
scheduler_status()
# ┌ Info: Scheduler is running.
# │ SCHEDULER_MAX_CPU = 32
# │ SCHEDULER_MAX_MEM = 121278191616
# │ SCHEDULER_UPDATE_SECOND = 5.0
# │ JOB_QUEUE_MAX_LENGTH = 10000
# └ SCHEDULER_TASK = Task (runnable) @0x00007fe205052e60
# scheduler_stop() # NO RUN
Scheduler Settings
Set the maximum CPU that the scheduler can use:
set_scheduler_max_cpu() # use all available CPUs
set_scheduler_max_cpu(4) # use 4 CPUs
set_scheduler_max_cpu(0.5) # use 50% of CPUs
Set the maximum RAM the scheduler can use:
set_scheduler_max_mem() # use 80% of total memory
set_scheduler_max_mem(4GB) # use 4GB memory
set_scheduler_max_mem(4096MB)
set_scheduler_max_mem(4194304KB)
set_scheduler_max_mem(4294967296B)
set_scheduler_max_mem(0.5) # use 50% of total memory
Set the update interval of job queue:
set_scheduler_update_second(5.0) # update job queue every 5 seconds
Set the maximum number of finished jobs:
set_scheduler_max_job(10000) # If number of finished jobs > 10000, the oldest ones will be removed.
# It does not affect queuing or running jobs.
Job Controls
A Job
is the wrapper of AbstractCmd
or Task
:
command_job = Job(
`echo command job done` # AbstractCmd to run
)
task_job = Job(
@task(println("task job done")) # Task to run
)
job_with_args = Job(
@task(println("job_with_args done")); # Task to run
name = "job_with_args", # job name.
user = "me", # Job owner.
ncpu = 1, # Number of CPU required.
mem = 1KB, # Number of memory required (unit: TB, GB, MB, KB, B).
schedule_time = Second(3), # Run after 3 seconds; can be DateTime or Period.
wall_time = Hour(1), # The maximum wall time to run the job.
priority = 20, # Lower = higher priority.
dependency = [ # Defer job until some jobs reach some states.
DONE => command_job.id, # Left can be DONE, FAILED, CANCELLED, or even
DONE => task_job.id # QUEUING, RUNNING.
] # Right is the job id.
)
Submit a job to queue:
submit!(command_job)
submit!(task_job)
submit!(job_with_args)
Cancel or interrupt a job:
cancel!(command_job)
Get the returned result:
result(job_with_args)
Queue
Show queue (all jobs):
all_queue()
queue(all=true)
# 3×16 DataFrame. Omitted printing of 10 columns
# │ Row │ id │ name │ user │ ncpu │ mem │ schedule_time │
# │ │ Int64 │ String │ String │ Int64 │ Int64 │ DateTime │
# ├─────┼─────────────────┼───────────────┼────────┼───────┼───────┼─────────────────────────┤
# │ 1 │ 314268209759432 │ │ │ 1 │ 0 │ 0000-01-01T00:00:00 │
# │ 2 │ 314268298112225 │ │ │ 1 │ 0 │ 0000-01-01T00:00:00 │
# │ 3 │ 314268353241057 │ job_with_args │ me │ 1 │ 1024 │ 2021-04-16T12:02:37.511 │
Show queue (running and queuing jobs only):
queue()
# 0×16 DataFrame
Job Query
job_query(314268353241057)
job_query_by_id(314268353241057)
# Job:
# id → 314268353241057
# name → "job_with_args"
# user → "me"
# ncpu → 1
# mem → 1024
# schedule_time → 2021-04-16T12:02:37.511
# create_time → 2021-04-16T12:02:40.587
# start_time → 2021-04-16T12:02:49.786
# stop_time → 2021-04-16T12:02:54.803
# wall_time → 1 hour
# state → :done
# priority → 20
# dependency → 2-element Array{Pair{Symbol,Int64},1}:
# :done => 314268209759432
# :done => 314268298112225
# task → Task (done) @0x00007fe7c027bd00
# stdout_file → ""
# stderr_file → ""
Backup
Set backup file:
set_scheduler_backup("/path/to/backup/file")
JobSchedulers writes to the backup file at exit. If the file exists, scheduler settings and job queue will be recovered from it automatically.
Stop backup and delete_old
backup:
set_scheduler_backup(delete_old=true)
Backup immediately:
backup()
Compatibility with Pipelines.jl
You can also create a Job
by using Program
types from Pipelines.jl:
Job(p::Program; kwargs...)
Job(p::Program, inputs::Dict{String}; kwargs...)
Job(p::Program, inputs::Dict{String}, outputs::Dict{String}; kwargs...)
kwargs...
include keyword arguments of Job(::Union{Base.AbstractCmd,Task}, ...)
and run(::Program, ...)
. Details can be found by typing
julia> using Pipelines, JobSchedulers
julia> ?Job
julia> ?run
Example
using Pipelines, JobSchedulers
scheduler_start()
p = CmdProgram(
inputs = ["IN1", "IN2"],
outputs = ["OUT"],
cmd = pipeline(`echo inputs are: IN1 and IN2` & `echo outputs are: OUT`)
)
inputs = Dict(
"IN1" => `in1`,
"IN2" => 2
)
outputs = Dict(
"OUT" => "out"
)
# native Pipelines.jl method to run the program
run(p, inputs, outputs;
touch_run_id_file = false # do not create a file which indicates the job is done and avoids re-run.
)
# inputs are: in1 and 2
# outputs are: out
# (true, Dict("OUT" => "out"))
# run the program by submitting to JobSchedulers.jl
program_job = Job(p, inputs, outputs; touch_run_id_file = false)
submit!(program_job)
# inputs are: in1 and 2
# outputs are: out
# get the returned result
result(program_job)
# (true, Dict("OUT" => "out"))