Skip to content

Instantly share code, notes, and snippets.

@koolay
Created July 28, 2020 04:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save koolay/19f4434143aaf9b1d5c7c8410f0c31d3 to your computer and use it in GitHub Desktop.
Save koolay/19f4434143aaf9b1d5c7c8410f0c31d3 to your computer and use it in GitHub Desktop.
demo of arrow-flight+dremio+vaex
from collections import namedtuple
import vaex
import time
import orjson
import os
import psutil
from pyarrow import flight
import pyarrow as pa
from graphql.error import GraphQLError, format_error as format_error_default
from graphql.execution import ExecutionResult, execute
from typing import Any, Callable, Collection, Dict, List, Optional, Type, Union
# import vaex, pandas as pd
# df_pandas = pd.from_csv('test.csv')
FormattedResult = namedtuple("FormattedResult", "result status_code")
ServerResponse = namedtuple("ServerResponse", "body status_code")
GraphQLResponse = namedtuple("GraphQLResponse", "results params")
process = psutil.Process(os.getpid())
class HttpDremioClientAuthHandler(flight.ClientAuthHandler):
def __init__(self, username, password):
super(flight.ClientAuthHandler, self).__init__()
self.basic_auth = flight.BasicAuth(username, password)
self.token = None
def authenticate(self, outgoing, incoming):
auth = self.basic_auth.serialize()
outgoing.write(auth)
self.token = incoming.read()
def get_token(self):
return self.token
def read_from_dremio() -> vaex.dataframe.DataFrameArrays:
username = 'admin'
password = 'changeMe!'
sql = '''SELECT * FROM "local_mysql".imdb.title_basics where runtimeMinutes>0 limit 10000'''
client = flight.FlightClient('grpc+tcp://localhost:47470')
client.authenticate(HttpDremioClientAuthHandler(username, password))
info = client.get_flight_info(flight.FlightDescriptor.for_command(sql))
st = time.time()
reader = client.do_get(info.endpoints[0].ticket)
batches = []
while True:
try:
batch, metadata = reader.read_chunk()
batches.append(batch)
except StopIteration:
break
print('total: ', len(batches))
arrow_table = pa.Table.from_batches(batches)
t2 = time.time()
print('read take: ', t2 - st)
print('memory:', process.memory_info().rss) # in bytes
# df_pandas = arrow_table.to_pandas()
# df = vaex.from_pandas(df_pandas)
df = vaex.from_arrow_table(arrow_table)
t3 = time.time()
print('convert take:', t3 - t2)
print('memory:', process.memory_info().rss) # in bytes
print('print take:', time.time() - t3)
return df
def json_encode(data: Union[Dict, List], pretty: bool = False) -> str:
"""Serialize the given data(a dictionary or a list) using JSON.
The given data (a dictionary or a list) will be serialized using JSON
and returned as a string that will be nicely formatted if you set pretty=True.
"""
return orjson.dumps(data, option=orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY).decode('utf-8')
def encode_execution_results(
execution_results: List[Optional[ExecutionResult]],
format_error: Callable[[GraphQLError], Dict] = format_error_default,
is_batch: bool = False,
encode: Callable[[Dict], Any] = json_encode,
) -> ServerResponse:
"""Serialize the ExecutionResults.
This function takes the ExecutionResults that are returned by run_http_query()
and serializes them using JSON to produce an HTTP response.
If you set is_batch=True, then all ExecutionResults will be returned, otherwise only
the first one will be used. You can also pass a custom function that formats the
errors in the ExecutionResults, expecting a dictionary as result and another custom
function that is used to serialize the output.
Returns a ServerResponse tuple with the serialized response as the first item and
a status code of 200 or 400 in case any result was invalid as the second item.
"""
results = [format_execution_result(execution_result, format_error) for execution_result in execution_results]
result, status_codes = zip(*results)
status_code = max(status_codes)
if not is_batch:
result = result[0]
return ServerResponse(encode(result), status_code)
def format_error(error):
return {
"msg": error.message,
"loc": "{}:{}".format(error.locations[0].line, error.locations[0].column),
"pth": "/".join(error.path or []),
}
def format_execution_result(
execution_result: Optional[ExecutionResult],
format_error: Optional[Callable[[GraphQLError], Dict]] = format_error_default,
) -> FormattedResult:
"""Format an execution result into a GraphQLResponse.
This converts the given execution result into a FormattedResult that contains
the ExecutionResult converted to a dictionary and an appropriate status code.
"""
status_code = 200
response: Optional[Dict[str, Any]] = None
if execution_result:
if execution_result.errors:
fe = [format_error(e) for e in execution_result.errors] # type: ignore
response = {"errors": fe}
if execution_result.errors and any(not getattr(e, "path", None) for e in execution_result.errors):
status_code = 400
else:
response["data"] = execution_result.data
else:
response = {"data": execution_result.data}
return FormattedResult(response, status_code)
if __name__ == "__main__":
print(process.memory_info().rss) # in bytes
df = read_from_dremio()
t1 = time.time()
result = df.graphql.execute(
"""
{
df(where: {runtimeMinutes: {_gt: 0}}){
row(offset: 0, limit: 10) {
titleType
primaryTitle
originalTitle
isAdult
runtimeMinutes
}
}
}"""
)
t2 = time.time()
print('query take:', t2 - t1)
body, status_code = encode_execution_results([result], format_error, False)
print('encode take:', time.time() - t2)
print(body)
print(status_code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment