Skip to content

Instantly share code, notes, and snippets.

@moiristo
Last active August 31, 2021 10:13
Show Gist options
  • Save moiristo/c4e0bb492601323dc21e7559b8eb3442 to your computer and use it in GitHub Desktop.
Save moiristo/c4e0bb492601323dc21e7559b8eb3442 to your computer and use it in GitHub Desktop.
Simple BigQuery Ruby client using OAuth2
# frozen_string_literal: true
module BigQuery
class Client < Oauth2TokenStoreClient
ENDPOINT = 'https://bigquery.googleapis.com/bigquery/v2'
UPLOAD_ENDPOINT = 'https://bigquery.googleapis.com/upload/bigquery/v2'
MULTIPART_BOUNDARY = 'bex835900724346'
REQUIRED_SCOPE = 'https://www.googleapis.com/auth/bigquery'
def initialize
super(
client_id: 'UID',
client_secret: 'SECRET',
site: 'https://oauth2.googleapis.com',
redirect_uri: 'https://www.example.com',
authorize_url: 'https://accounts.google.com/o/oauth2/auth',
token_url: '/token',
token_store: BigQuery::TokenStore.new
)
end
def authorize_url(state: nil)
oauth2_client.auth_code.authorize_url(redirect_uri: redirect_uri, state: state, scope: REQUIRED_SCOPE, access_type: 'offline', prompt: 'consent')
end
def projects
parsed_response = request_json(:get, '/projects')
parsed_response['projects'].inject({}) { |m, project| m.merge(project.dig('projectReference', 'projectId') => project['friendlyName']) }
end
def dataset_ids(project_id:)
parsed_response = request_json(:get, "/projects/#{project_id}/datasets")
parsed_response['datasets'].map { |dataset| dataset.dig('datasetReference', 'datasetId') }
end
def table_ids(project_id:, dataset_id:)
parsed_response = request_json(:get, "/projects/#{project_id}/datasets/#{dataset_id}/tables")
parsed_response['tables'].map { |table| table.dig('tableReference', 'tableId') }
end
def create_dataset(dataset_id, project_id:)
dataset = load_resource_definitions(:dataset, dataset_id, project_id: project_id, dataset_id: dataset_id)
request_json(:post, "/projects/#{project_id}/datasets", options: { body: dataset })
end
# See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
def create_table(table_id, project_id:, dataset_id:)
table = load_resource_definitions(:table, table_id, project_id: project_id, dataset_id: dataset_id, table_id: table_id)
request_json(:post, "/projects/#{project_id}/datasets/#{dataset_id}/tables", options: { body: table })
end
def insert_all(rows = [], project_id:, dataset_id:, table_id:)
# Example: rows = [{ field1: 'Test successful?', field2: 'YES!' }]
row_data = { rows: rows.map { |row| { insertId: row[:id], json: row.except(:id) } } }
request_json(:post, "/projects/#{project_id}/datasets/#{dataset_id}/tables/#{table_id}/insertAll", options: { body: row_data })
end
def batch_insert_job(rows = [], wait_until_done: true, project_id:, dataset_id:, table_id:)
table_metadata = JSON.parse(load_resource_definitions(:table, table_id, project_id: project_id, dataset_id: dataset_id, table_id: table_id))
job_metadata = {
configuration: {
load: {
sourceFormat: 'NEWLINE_DELIMITED_JSON',
schema: table_metadata['schema'],
destinationTable: table_metadata['tableReference']
}
}
}
new_line_delimited_json = rows.map { |row| row.except(:id).to_json }.join("\n")
body = build_multipart_post(job_metadata: job_metadata, new_line_delimited_json: new_line_delimited_json)
execute_job(project_id: project_id, wait_until_done: wait_until_done) do
request(
:post, UPLOAD_ENDPOINT + "/projects/#{project_id}/jobs?uploadType=multipart",
options: {
headers: { 'Content-Type' => "multipart/related; boundary=#{MULTIPART_BOUNDARY}", 'Accept' => 'application/json' },
body: body
}
).parsed
end
end
def fetch_job(project_id:, job_id:)
request_json(:get, "/projects/#{project_id}/jobs/#{job_id}")
end
def execute_job(project_id:, wait_until_done: true, wait_interval: 2, &block)
job = block.call
if job.dig('status', 'state') == 'RUNNING' && wait_until_done
job_id = job.dig('jobReference', 'jobId')
loop do
job = fetch_job(project_id: project_id, job_id: job_id)
break if job.dig('status', 'state') != 'RUNNING'
sleep(wait_interval)
end
end
job
end
def query(sql, project_id:)
request_json(:post, "/projects/#{project_id}/queries", options: { body: { query: sql, useLegacySql: false } })
end
def query_to_table(sql, wait_until_done: true, project_id:, dataset_id:, table_id:)
table_metadata = JSON.parse(load_resource_definitions(:table, table_id, project_id: project_id, dataset_id: dataset_id, table_id: table_id))
job_metadata = {
configuration: {
query: {
query: sql,
useLegacySql: false,
destinationTable: table_metadata['tableReference'],
createDisposition: 'CREATE_IF_NEEDED',
writeDisposition: 'WRITE_TRUNCATE'
}
}
}
execute_job(project_id: project_id, wait_until_done: wait_until_done) do
request_json(:post, "/projects/#{project_id}/jobs", options: { body: job_metadata })
end
end
def query_results(project_id:, job_id:)
request_json(:get, "/projects/#{project_id}/queries/#{job_id}")
end
def request_json(http_method, path, *args)
super(http_method, ENDPOINT + path, *args)
end
private
def load_resource_definitions(resource_type, resource_id, **format_options)
resource_data = File.read(BigQuery::Engine.root.join("db/big_query/#{resource_type}_definitions/#{resource_id}.json"))
format(resource_data, **format_options)
end
def build_multipart_post(job_metadata:, new_line_delimited_json:)
template = File.read(BigQuery::Engine.root.join('db/big_query/multipart_template.txt'))
format(template, job_metadata: job_metadata.to_json, new_line_delimited_json: new_line_delimited_json)
end
end
end
# frozen_string_literal: true
class Oauth2TokenStoreClient
class Error < StandardError; end
attr_accessor :oauth2_client, :token_store, :redirect_uri, :client_id, :client_secret
def initialize(client_id:, client_secret:, token_store: nil, redirect_uri:, **oauth2_client_opts)
@client_id = client_id
@client_secret = client_secret
@token_store = token_store
@redirect_uri = redirect_uri
@oauth2_client = OAuth2::Client.new(
client_id, client_secret,
redirect_uri: redirect_uri,
**oauth2_client_opts
)
end
def get_token_for_authorization_code!(authorization_code, headers: {})
oauth2_client.auth_code.get_token(
authorization_code,
redirect_uri: redirect_uri,
headers: headers
)
end
def fetch_token!(force_refresh: false)
raise Error, 'no token store set!' if token_store.nil?
if force_refresh
token_store.with_lock { token_store.fetch_token!(oauth2_client) }
else
token_store.fetch_token!(oauth2_client)
end
end
def store_token!(token)
raise Error, 'no token store set!' if token_store.nil?
token_store.store_token!(token)
end
def current_token(force_refresh: false)
raise Error, 'no token store set!' if token_store.nil?
if (token = fetch_token!(force_refresh: force_refresh))
if token.expired?
begin
token_store.with_lock do
token = token.refresh!
store_token!(token)
end
rescue OAuth2::Error => e
if e.code == 'invalid_grant'
# This probably means that the token was already refreshed
token = fetch_token!(force_refresh: true)
# Raise when the token is still expired
raise e if token.expired?
else
raise
end
end
end
token
end
end
def request(http_method, endpoint, token: current_token, options: {})
raise Error, 'no token available!' if token.nil?
token.send(http_method, endpoint, options)
end
def request_json(http_method, endpoint, token: current_token, options: {})
options[:body] = options[:body].to_json if options[:body].is_a?(Hash)
options[:headers] ||= {}
options[:headers]['Accept'] ||= 'application/json'
options[:headers]['Content-Type'] ||= 'application/json'
request(http_method, endpoint, token: token, options: options).parsed
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment