## ChannelBuffers.jl

Parallel tasks using pipe streams
Author KlausC
Popularity
4 Stars
Updated Last
6 Months Ago
Started In
November 2020

# ChannelBuffers

## Introduction

    run( ls → gzip() → "ls.gz")

The ChannelBuffers package integrates the concept of commandline pipelines into Julia. It is not only possible to execute external commands in parallel, but to mix them with internal Tasks and Threads.

If the user provides functions f, g, h, of the form f(input::IO, output::IO, args...), which read from in input stream and write their results to an output stream, they can execute the functions in parallel tasks.

Note:

Input/Output redirection is denoted by → (\rightarrow), which indicates the direction of data flow. Besides that we support | to denote task pipelines. The symbols < and > known from commandline shells cannot be used, because they bear the semantics of comparison operators in Julia.

## Examples

    tl = run("afile" → closure(f, fargs...) → closure(g, gargs...) → "bfile")
wait(tl)


Some standard closures are predefined, which make that possible:

    tl = run( curl("https::/myurltodownloadfrom.tgz") | gunzip() | tarx("targetdir") )

or

    a = my_object
run( serializer(a) → "file") |> wait

b = open("file") do cin
run(cin → deserializer()) |> fetch
end

## Predefined closures

    tarx(dir) # read files in input directory and write to output stream
tarc(dir) # read input stream and create files in target directory
gzip() # read input stream and write compressed data to output stream
gunzip() # reverse of gzip
transcoder(::Codec) # generalization for other kinds of TranscoderStreams
curl(URL) # download file from URL and write to output stream
serializer(obj) # write serialized for of input object to output stream
deserializer() # read input stream and reconstruct serialized object

## API

To create a user defined task, a function with the signature f(cin::IO, cout::IO, args...) is required. It can be transformed into a BClosure object

        fc = closure(f, args...)::BClosure

which can be run alone or combined with other closures and input/output specifiers. The following Base functions are redefined.

        Base: |, run, pipeline, wait, fetch

which are used as in

    tl = run(fc::BClosure)::BTaskList

pl = in → fc → gc → hc → out
pl = pipeline(fc, gc, hc, stdin=in stdout=out)::BClosureList

tl = run(pl::BClosureList)::BTaskList

The assignments to pl are equivalent.

The pipelined tasks are considered finished, when the statically last task in the list terminates. The calling task can wait for this event with

    wait(tl::BTaskList)::Nothing

If the last task in the pipeline calculates a value, if can be waited for and obtained by

    fetch(tl::BTaskList)::Any

Both wait and fetch throw TaskFailedException if the last task in the list failed.

## Implementation

The internal pipes are implemented by ChannelIO <: IO which uses Channel objects to transport data between tasks. The tasks are spawned on different threads, if multithreading is available (JULIA_NUM_THREADS > 1). Communication endpoints of the pipeline can be arbitrary IO objects or AbstractStrings denoting file names. The files given as strings are appropriately opened and closed.

Element type of BTaskList is BTask, a tagging wrapper around Task. It delegates the most important methods, like wait, fetch, istask....

The full functionality of Base.pipelines is extended with the integration of Base.Cmd and BClosure.

### Used By Packages

No packages found.