run( `ls` → gzip() → "ls.gz")
ChannelBuffers package integrates the concept of commandline pipelines into
Julia. It is not only possible to execute external commands in parallel, but to mix them with internal
If the user provides functions
h, of the form
f(input::IO, output::IO, args...), which read from in input stream and write their
results to an output stream, they can execute the functions in parallel tasks.
Input/Output redirection is denoted by
\rightarrow), which indicates the direction of data flow.
Besides that we support
| to denote task pipelines. The symbols
> known from commandline shells cannot be used,
because they bear the semantics of comparison operators in
tl = run("afile" → closure(f, fargs...) → closure(g, gargs...) → "bfile") wait(tl)
Some standard closures are predefined, which make that possible:
tl = run( curl("https::/myurltodownloadfrom.tgz") | gunzip() | tarx("targetdir") )
a = my_object run( serializer(a) → "file") |> wait b = open("file") do cin run(cin → deserializer()) |> fetch end
tarx(dir) # read files in input directory and write to output stream tarc(dir) # read input stream and create files in target directory gzip() # read input stream and write compressed data to output stream gunzip() # reverse of gzip transcoder(::Codec) # generalization for other kinds of TranscoderStreams curl(URL) # download file from URL and write to output stream serializer(obj) # write serialized for of input object to output stream deserializer() # read input stream and reconstruct serialized object
To create a user defined task, a function with the signature
f(cin::IO, cout::IO, args...) is required.
It can be transformed into a
fc = closure(f, args...)::BClosure
which can be run alone or combined with other closures and input/output specifiers.
Base functions are redefined.
Base: |, run, pipeline, wait, fetch
which are used as in
tl = run(fc::BClosure)::BTaskList pl = in → fc → gc → hc → out pl = pipeline(fc, gc, hc, stdin=in stdout=out)::BClosureList tl = run(pl::BClosureList)::BTaskList
The assignments to
pl are equivalent.
The pipelined tasks are considered finished, when the statically last task in the list terminates. The calling task can wait for this event with
If the last task in the pipeline calculates a value, if can be waited for and obtained by
TaskFailedException if the last task in the list failed.
The internal pipes are implemented by
ChannelIO <: IO which uses
Channel objects to transport data between tasks.
The tasks are spawned on different threads, if multithreading is available (
JULIA_NUM_THREADS > 1).
Communication endpoints of the pipeline can be arbitrary
IO objects or
AbstractStrings denoting file names.
The files given as strings are appropriately opened and closed.
Element type of
BTask, a tagging wrapper around
Task. It delegates the most important
The full functionality of
Base.pipelines is extended with the integration of