Skip to content

Instantly share code, notes, and snippets.

@IanCal
Created November 14, 2022 10:12
Show Gist options
  • Save IanCal/cd1160ebbf2faad216c18e13296ff9c6 to your computer and use it in GitHub Desktop.
Save IanCal/cd1160ebbf2faad216c18e13296ff9c6 to your computer and use it in GitHub Desktop.
import pyarrow.flight as fl
import json
client = fl.FlightClient("grpc://127.0.0.1:8815")
print(
client.do_get(
fl.Ticket(json.dumps({"contract": "0x", "event": "0x"}))
).read_pandas()
)
import pyarrow as pa
import duckdb
import json
from pyarrow.ipc import IpcWriteOptions
class FlightServer(pa.flight.FlightServerBase):
def __init__(self, location="grpc://0.0.0.0:8815", **kwargs):
super(FlightServer, self).__init__(location, **kwargs)
self._location = location
self.con = duckdb.connect(":memory:")
def do_get(self, context, ticket):
params = json.loads(ticket.ticket.decode("utf-8"))
rb = self.con.execute(
"""select * from parquet_scan('monthly/*.parquet')
where topic_1 = ? and address = ?""",
[params["event"], params["contract"]],
).fetch_record_batch()
return pa.flight.RecordBatchStream(
rb, options=IpcWriteOptions(compression="zstd")
)
server = FlightServer()
print("Server listening on port", server.port)
server.serve()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment