DistributedQuery.jl

Author asaxton
Popularity
0 Stars
Updated Last
1 Year Ago
Started In
February 2023

jlDistributedQuery

DistributedQuery Is meant to host large datasets in partitioned across multiple workers with queryable access. Idealy, worker will be remote worker with their own memory. has 3 major part, deployDataStore(), sentinel(), make_query_channels(), query_client(). The basic exicution is the following,

using Test
using Distributed
using ClusterManagers
using DistributedQuery

if Base.current_project() != nothing
    proj_path = joinpath(["/", split(Base.current_project(), "/")[1:end-1]...])
    p = addprocs(SlurmManager(3),
                 time="00:30:00",
                 exeflags="--project=$(proj_path)", ntasks_per_node=1)
else
    p = addprocs(SlurmManager(3),
                 time="00:30:00",
                 ntasks_per_node=1)
end

@everywhere using DistributedQuery
@everywhere using DataFrames
@everywhere using CSV
#proc_chan, data_chan = make_query_channels(p, [1], chan_depth::Int=5);

_shard_file_list = ["../mockData/iris_df_1.jlb",
                    "../mockData/iris_df_2.jlb",
                    "../mockData/iris_df_3.jlb"]

shard_file_list = [joinpath(dirname(pathof(DistributedQuery)), sf) for sf in _shard_file_list]
serialized_file_list = shard_file_list
data_worker_pool = p
proc_worker_pool = [myid()]
fut = DistributedQuery.deployDataStore(data_worker_pool, serialized_file_list)

test_res = [fetch(fut[p]) == @fetchfrom p DistributedQuery.DataContainer for p in data_worker_pool]

@test all(test_res)

if all(test_res)
    print("DistributedQuery.deployDataStore passed\n")
else
    print("DistributedQuery.deployDataStore Failed\n test_res: $(test_res)")
end


proc_chan, data_chan = DistributedQuery.make_query_channels(data_worker_pool, proc_worker_pool)

status_chan = RemoteChannel(()->Channel{Any}(10000), myid())

query_f = (data, column) -> data[:, column]

query_args = ["sepal_l"]

agrigate_f = (x...) -> sum(vcat(x...))

sentinel_fut =
    [@spawnat p DistributedQuery.sentinel(DistributedQuery.DataContainer,
                                          data_chan[myid()] ,proc_chan,
                                          status_chan)
     for p in data_worker_pool]




query_task = @async DistributedQuery.query_client(data_chan, proc_chan[1], agrigate_f, query_f, query_args...)

[take!(status_chan) for i in 1:1000 if isready(status_chan)]
[put!(v, "Done") for (k,v) in data_chan]
[wait(f) for f in sentinel_fut]
[take!(status_chan) for i in 1:1000 if isready(status_chan)]

sentinel_fut =
    [@spawnat p DistributedQuery.sentinel(DistributedQuery.DataContainer,
                                          data_chan[myid()] ,proc_chan,
                                          status_chan)
     for p in data_worker_pool]

query_task = @async DistributedQuery.query_client(data_chan, proc_chan[1], agrigate_f, query_f, query_args...)
query_timeout_in_s = 10
sleep(query_timeout_in_s)


if istaskdone(query_task)
    local_result = sum([sum(fetch(f[2])[:, query_args[1]]) for f in fut])
    query_result = fetch(query_task)
    @test local_result == query_result
else
    print("test query was not done in query_timeout_in_s: $query_timeout_in_s")
    @test false
end


#rmprocs(p);

Used By Packages

No packages found.