Apache Accumulo is a database based on Google’s BigTable
Accumulo.jl is a client library for Apache Accumulo, built using the Accumulo Thrift Proxy API.
To connect to the server, create an instance of AccumuloSession.
session = AccumuloSession()
Without any parameters, this attempts to connect to a server running on
A remote server can be connected to by specifying the hostname and port number.
session = AccumuloSession("localhost", 42424)
As of now only SASL-Plain authentication is supported, without any
qop. The default implementation
authenticates with the same user-id as that of the login shell. That can be overridden by providing
an appropriate instance of
session = AccumuloSession("localhost", 42424, AccumuloAuthSASLPlain("uid", "pwd", "zid"))
TCompactProtocol is used by default, which is also the default for the server setup.
Other protocols can be used by specifying the optional named parameter
As of now,
:compact protocols are supported.
session = AccumuloSession("localhost", 10000; tprotocol=:compact)
All tables in a setup can be listed with a call to
Existence of a particular table can be checked with
Row/column identifiers and cell data are treated as raw bytes.
Any type that can be converted to bytes can be used wherever row/column identifiers or values are expected in the APIs.
Since Julia already includes this
convert method for strings, they can be used as it is. Any other type can also get the same treatment by defining either of
the following two methods:
Creating / Deleting
A table can be created, deleted or renamed by calling the corresponding function passing the table name.
table_create(session, tablename; versioning=true, logical_timestamp=false) table_delete(session, tablename) table_rename(session, tablename)
By default, tables are configured to use the
VersioningIterator and keep one version.
The version policy can be changed by changing the
VersioningIterator options with the
Using logical timestamps ensures timestamps always move forward. Though the actual millisecond time will not be used anymore, with this, tablet servers need not be time synchronized pefectly and multile updates at the same millisecond do not cause any issue.
A table can be cloned with:
table_clone(session, tablename, newtablename; flush=true, set_properties=Dict(), exclude_properties=())
If the flush option is not enabled, then any data the source table currently has in memory will not exist in the clone. A cloned table copies the configuration of the source table. However the permissions of the source table are not copied to the clone. After a clone is created, only the user that created the clone can read and write to it.
To get or set table specific properties:
table_config(session, tablename) table_config!(session, tablename, property::AbstractString, value::AbstractString)
table_versions! changes the number of versions the
table_versions!(session, tablename, nversions::Integer)
table_du prints how much space, in bytes, is used by files referenced by a table. When multiple tables are specified it prints how much space, in bytes, is used by files shared between tables, if any.
Constraints are applied on mutations at insert time and only those that satify them are allowed. Constraints must be implemented as a Java class and included in Accumulo classpath.
# list constraints as a map of the class name and an integer ID constraints(session, tablename) # add and remove constraints add_constraints(session, tablename, names::AbstractString...) remove_constraints(session, tablename, ids::Integer...)
Tablets constituting a table are automatically split based on a size threshold configurable per table. Tablets can also be manually split or merged through the following APIs to tweak performance. Split points are specified as row values.
# list current splits (upto max_splits) table_splits(session, tablename, max_splits=1024) # manually split / merge tablets table_split(session, tablename, splits...) table_merge(session, tablename, start_split, end_split)
Exporting / Importing
Tables can be moved across clusters by exporting them from the source, copying them via the hadoop
distcp command, and importing them at the destination.
Exporting and importing tables preserves the tables configuration, splits, and logical time.
table_export(session, tablename, export_dir::AbstractString) table_import(session, tablename, import_dir::AbstractString)
Table must be kept offline during an export while discp runs (to prevent files from being deleted). A table can be cloned and the clone taken offline to be able to access the table during an export.
table_offline(session, tablename; wait=true) table_online(session, tablename; wait=true)
Iterators are executed by Accumulo while scanning or compacting tables. They are a way of doing distributed operations on tables.
Iterators must be implemented as a Java class and included in Accumulo classpath.
They can be configured to be used during scanning or compaction events. The
IteratorScope enum lists allowed scopes.
# list all iterators as a dict of class name and the scopes registered iters(session, tablename) # retrieve a single matching iterator with its setting iter(session, tablename, itername::AbstractString, scope::Integer)
The below APIs allow configuring iterators for a table.
# create an iterator setting iter(name, class, priority::Integer, properties::Dict=Dict()) # verify whether the iterator settings will be valid when applied on a table check_iter(session, tablename, iter_setting::IteratorSetting, scopes) # add or remote iterators configured on a table add_iter(session, tablename, iter_setting::IteratorSetting, scopes; check=true) remove_iter(session, tablename, iter_setting::IteratorSetting, scopes) remove_iter(session, tablename, iter_name::AbstractString, scopes)
Data is written to tables as batches of updates.
# initialize a batch updates = batch() # add mutations to the batch update(updates, row, col_family, col_qualifier, col_visibility, value, timestamp::Integer=time_ms()) delete(updates, row, col_family, col_qualifier) # update table update(session, tablename, updates)
Updates can also be conditional. A condition is created using
# initialize a conditional batch updates = conditional_batch() # add a condition; comparing values, timestamps and also running additional iterators to evaluate complex conditions where(updates, row, col_family, col_qualifier, col_visibility; value=nothing, timestamp=-1, iterators=IteratorSetting) # add mutations update(updates, row, col_family, col_qualifier, col_visibility, value, timestamp::Integer=time_ms()) delete(updates, row, col_family, col_qualifier) # update the table update(session, tablename, updates)
Multiple batch updates can be applied by first obtaining a table writer.
batch_writer(session, tablename) do writer for i in 1:10 updates = batch() for rownum in 1:N update(upd, "row$rownum", "colf", "colq", "", "value$rownum") end update(writer, upd) flush(writer) end end
And similarly for conditional updates:
conditional_batch_writer(session, tablename) do writer for i in 1:10 upd = conditional_batch() for rownum in 1:N row = "row$rownum" val = "value$rownum" update(where(upd, row, "colf", "colq"; value=val), "colf", "colq", "", "newvalue$rownum") end update(writer, upd) flush(writer) end end
Data can be read from tables using a scanner and iterating over the records.
# create a scanner scanner(session, tablename) do scan # iterate over the records for rec in records(scan) # each record has key and value key = rec.key val = rec.value # the key consists of the row and column identifiers row = key.row colfam = key.colFamily colqual = key.colQualifier colvis = key.colVisibility # ... end end
Only a subset of the columns can be fetched by specifying the list of columns as:
scanner(session, tablename; columns=[("colfam1","colqual1"), ("colfam2","colqual2")])
Additional iterators can be specified to be used with the scanner:
# iterator settings iter1 = iter(name, class, priority::Integer, properties::Dict=Dict()) iter2 = iter(name, class, priority::Integer, properties::Dict=Dict()) # specify additional iterators to be used scanner(session, tablename; iterators=[iter1, iter2])
The scanner can be restricted to only a single row or a subset of matching rows by specifying a range of keys to iterate over.
# assign appropriate key or range of keys to rng scanner(session, tablename; rng=:())
A scanner key can be created using:
scanner_key(row; col_family="", col_qualifier="", col_visibility="", timestamp=0x7FFFFFFFFFFFFFFF)
A range of keys can be specified as expressions of the form
k1 <= x < k2 where:
k2are the bounds. They can either be just the row value, or a key created using
>=help specify the lower and upper bounds and their inclusivity
xis just any symbol to complete the expression syntax
The range bounds (
k2 above) can be one of the following:
- just the row (bytes or type that can be converted to bytes)
- a key created by
- an expression that evaluates to one of the above
- suppport for
- table compactions
- locality groups
- additional authentication methods
- nicer API