Skip to content

Instantly share code, notes, and snippets.

@aviatesk
Created May 22, 2020 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aviatesk/fe3d41105d6b88861581489c93ccbbca to your computer and use it in GitHub Desktop.
Save aviatesk/fe3d41105d6b88861581489c93ccbbca to your computer and use it in GitHub Desktop.
module BQUtils
using CSV
using MacroTools: @>, @>>
struct Query
s::String
end
Query(q::Query) = q
# for embedded syntax highlight, and serves as an wrapper
macro bq_str(s) Query(s) end
read_query(q::Query; kwargs...) = _read_query(q.s; kwargs...)
read_query(path; kwargs...) = _read_query(read(String, path); kwargs...)
validate_query(q::Query; kwargs...) = _validate_query(q.s; kwargs...)
validate_query(path; kwargs...) = _validate_query(read(String, path); kwargs...)
const OptionDict = Dict{Symbol,Any}
const DEFAULT_GLOBAL_OPTIONS = OptionDict(
:job_id => nothing, # to be overwrote
:format => "csv",
)
const DEFAULT_QUERY_OPTIONS = OptionDict(
:use_legacy_sql => false,
:use_cache => true,
:max_rows => 10^8,
:job_timeout_ms => 10 * 60 * 1000, # over 10 mins
)
const DEFAULT_CSV_OPTIONS = OptionDict()
function _read_query(
query;
ignore_cache = false, sym = :bq_avijulia, validate = true, maxbyte = 10^12,
global_options = OptionDict(),
query_options = OptionDict(),
csv_options = OptionDict(),
)
lookup_path = joinpath(tempdir(), string(sym, '_', hash(normalize_query(query)), ".csv")) # simple local caching
csv_options = merge(DEFAULT_CSV_OPTIONS, csv_options)
if !ignore_cache && isfile(lookup_path)
@info "cache found at $lookup_path"
return CSV.read(lookup_path; kwargs(csv_options)...)
end
validate && _validate_query(query; maxbyte = maxbyte, global_options = global_options, query_options = query_options)
job_id = create_job_id(sym)
out = try_run_cmd(query_cmd(query, job_id, global_options, query_options), cancel_cmd(job_id))
write(lookup_path, out)
return CSV.read(lookup_path; kwargs(csv_options)...)
end
function try_run_cmd(cmd, cmd_on_interrupt = nothing)
local query_process, result
try
open(cmd) do p
try
query_process = p
result = read(p)
catch err
if !isnothing(cmd_on_interrupt) && err isa InterruptException
kill(p)
println(); @info "cancelling the process $p ..."
run(cmd_on_interrupt)
else
rethrow(err)
end
end
end
catch err
err isa ProcessFailedException && @isdefined(query_process) && query_process in err.procs &&
@isdefined(result) && rethrow(ErrorException(String(result)))
rethrow(err)
end
return result
end
function kwargs(dict)
ns = (Symbol.(keys(dict))...,)
vs = (collect(values(dict))...,)
return pairs(NamedTuple{ns}(vs))
end
normalize_query(q::Query) = Query(normalize_query(q.s))
normalize_query(query) =
@>> split(query, '\n') map(strip∘strip_comment) filter(!isempty) lines->join(lines, ' ') normalize_symbols minimize_whitespaces
strip_comment(line) = replace(line, r"\s?--.+$" => "")
normalize_symbols(s) = replace(s, r"\s*(?<punc>[\,\(\)\<\>\+\-\*\/\=`])\s*" => s"\1")
minimize_whitespaces(s) = replace(s, r"\s+" => " ")
function _validate_query(query; maxbyte = 10^12, global_options = OptionDict(), query_options = OptionDict())
out = String(try_run_cmd(validate_query_cmd(query, global_options, query_options)))
(m = match(r"(?<bytes>\d+)\sbytes", out)) === nothing && error("invalid dry run: $out")
bytes = parse(Int, m[:bytes])
sizes = bytes < 10^9 ? "$(bytes÷10^6) MB" :
bytes < 10^12 ? "$(bytes÷10^9) GB" :
"$(bytes/10^12) TB"
bytes ≥ maxbyte && error("too much processed byte expected: $bytes (maxbyte: $maxbyte)")
@info "This query will process $sizes when run."
return Query(query)
end
function validate_query_cmd(query, global_options, query_options)
exec = ["bq"]
global_options = merge(DEFAULT_GLOBAL_OPTIONS, global_options)
for (k,v) in global_options
k in (:job_id, :format) && continue
push!(exec, string("--", k, '=', v))
end
push!(exec, "query", "--dry_run=true")
query_options = merge(DEFAULT_QUERY_OPTIONS, query_options)
for (k,v) in query_options
k in (:job_timeout_ms, ) && continue
push!(exec, string("--", k, '=', v))
end
push!(exec, query)
Cmd(exec)
end
create_job_id(sym) = string(sym, '_', hash(gensym(sym)), '_', hash(time()))
function query_cmd(query, job_id, global_options, query_options)
exec = ["bq"]
global_options = merge(DEFAULT_GLOBAL_OPTIONS, global_options)
push!(global_options, :job_id => job_id)
for (k,v) in global_options
push!(exec, string("--", k, '=', v))
end
push!(exec, "query")
query_options = merge(DEFAULT_QUERY_OPTIONS, query_options)
for (k,v) in query_options
push!(exec, string("--", k, '=', v))
end
push!(exec, query)
Cmd(exec)
end
cancel_cmd(job_id) = Cmd(["bq", "--synchronous_mode=false", "cancel", job_id])
export @bq_str, read_query, validate_query
end # module
@aviatesk
Copy link
Author

BQUtils.jl

BQUtils provides utility functions to interact with Big Query command line tools.

features:

  • any bq command line tool option can be specified through the single interface, read_query
  • dry run and validate queries; you can set maximum processed bytes limit (default to 1TB)
  • cache query results in local temporary folders, which will get rid of duplicated network IO overhead
  • "normalize" queries before cache existence check, so caching is robust for changes of comments, newlines and whitespaces
  • cancel queries on ctrl-c process interruption

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment