Created
May 22, 2020 14:24
-
-
Save aviatesk/fe3d41105d6b88861581489c93ccbbca to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module BQUtils | |
using CSV | |
using MacroTools: @>, @>> | |
struct Query | |
s::String | |
end | |
Query(q::Query) = q | |
# for embedded syntax highlight, and serves as an wrapper | |
macro bq_str(s) Query(s) end | |
read_query(q::Query; kwargs...) = _read_query(q.s; kwargs...) | |
read_query(path; kwargs...) = _read_query(read(String, path); kwargs...) | |
validate_query(q::Query; kwargs...) = _validate_query(q.s; kwargs...) | |
validate_query(path; kwargs...) = _validate_query(read(String, path); kwargs...) | |
const OptionDict = Dict{Symbol,Any} | |
const DEFAULT_GLOBAL_OPTIONS = OptionDict( | |
:job_id => nothing, # to be overwrote | |
:format => "csv", | |
) | |
const DEFAULT_QUERY_OPTIONS = OptionDict( | |
:use_legacy_sql => false, | |
:use_cache => true, | |
:max_rows => 10^8, | |
:job_timeout_ms => 10 * 60 * 1000, # over 10 mins | |
) | |
const DEFAULT_CSV_OPTIONS = OptionDict() | |
function _read_query( | |
query; | |
ignore_cache = false, sym = :bq_avijulia, validate = true, maxbyte = 10^12, | |
global_options = OptionDict(), | |
query_options = OptionDict(), | |
csv_options = OptionDict(), | |
) | |
lookup_path = joinpath(tempdir(), string(sym, '_', hash(normalize_query(query)), ".csv")) # simple local caching | |
csv_options = merge(DEFAULT_CSV_OPTIONS, csv_options) | |
if !ignore_cache && isfile(lookup_path) | |
@info "cache found at $lookup_path" | |
return CSV.read(lookup_path; kwargs(csv_options)...) | |
end | |
validate && _validate_query(query; maxbyte = maxbyte, global_options = global_options, query_options = query_options) | |
job_id = create_job_id(sym) | |
out = try_run_cmd(query_cmd(query, job_id, global_options, query_options), cancel_cmd(job_id)) | |
write(lookup_path, out) | |
return CSV.read(lookup_path; kwargs(csv_options)...) | |
end | |
function try_run_cmd(cmd, cmd_on_interrupt = nothing) | |
local query_process, result | |
try | |
open(cmd) do p | |
try | |
query_process = p | |
result = read(p) | |
catch err | |
if !isnothing(cmd_on_interrupt) && err isa InterruptException | |
kill(p) | |
println(); @info "cancelling the process $p ..." | |
run(cmd_on_interrupt) | |
else | |
rethrow(err) | |
end | |
end | |
end | |
catch err | |
err isa ProcessFailedException && @isdefined(query_process) && query_process in err.procs && | |
@isdefined(result) && rethrow(ErrorException(String(result))) | |
rethrow(err) | |
end | |
return result | |
end | |
function kwargs(dict) | |
ns = (Symbol.(keys(dict))...,) | |
vs = (collect(values(dict))...,) | |
return pairs(NamedTuple{ns}(vs)) | |
end | |
normalize_query(q::Query) = Query(normalize_query(q.s)) | |
normalize_query(query) = | |
@>> split(query, '\n') map(strip∘strip_comment) filter(!isempty) lines->join(lines, ' ') normalize_symbols minimize_whitespaces | |
strip_comment(line) = replace(line, r"\s?--.+$" => "") | |
normalize_symbols(s) = replace(s, r"\s*(?<punc>[\,\(\)\<\>\+\-\*\/\=`])\s*" => s"\1") | |
minimize_whitespaces(s) = replace(s, r"\s+" => " ") | |
function _validate_query(query; maxbyte = 10^12, global_options = OptionDict(), query_options = OptionDict()) | |
out = String(try_run_cmd(validate_query_cmd(query, global_options, query_options))) | |
(m = match(r"(?<bytes>\d+)\sbytes", out)) === nothing && error("invalid dry run: $out") | |
bytes = parse(Int, m[:bytes]) | |
sizes = bytes < 10^9 ? "$(bytes÷10^6) MB" : | |
bytes < 10^12 ? "$(bytes÷10^9) GB" : | |
"$(bytes/10^12) TB" | |
bytes ≥ maxbyte && error("too much processed byte expected: $bytes (maxbyte: $maxbyte)") | |
@info "This query will process $sizes when run." | |
return Query(query) | |
end | |
function validate_query_cmd(query, global_options, query_options) | |
exec = ["bq"] | |
global_options = merge(DEFAULT_GLOBAL_OPTIONS, global_options) | |
for (k,v) in global_options | |
k in (:job_id, :format) && continue | |
push!(exec, string("--", k, '=', v)) | |
end | |
push!(exec, "query", "--dry_run=true") | |
query_options = merge(DEFAULT_QUERY_OPTIONS, query_options) | |
for (k,v) in query_options | |
k in (:job_timeout_ms, ) && continue | |
push!(exec, string("--", k, '=', v)) | |
end | |
push!(exec, query) | |
Cmd(exec) | |
end | |
create_job_id(sym) = string(sym, '_', hash(gensym(sym)), '_', hash(time())) | |
function query_cmd(query, job_id, global_options, query_options) | |
exec = ["bq"] | |
global_options = merge(DEFAULT_GLOBAL_OPTIONS, global_options) | |
push!(global_options, :job_id => job_id) | |
for (k,v) in global_options | |
push!(exec, string("--", k, '=', v)) | |
end | |
push!(exec, "query") | |
query_options = merge(DEFAULT_QUERY_OPTIONS, query_options) | |
for (k,v) in query_options | |
push!(exec, string("--", k, '=', v)) | |
end | |
push!(exec, query) | |
Cmd(exec) | |
end | |
cancel_cmd(job_id) = Cmd(["bq", "--synchronous_mode=false", "cancel", job_id]) | |
export @bq_str, read_query, validate_query | |
end # module |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
BQUtils.jl
BQUtils
provides utility functions to interact with Big Query command line tools.features:
read_query