Skip to content

Instantly share code, notes, and snippets.

@nazo
Last active January 28, 2024 05:23
Show Gist options
  • Save nazo/527f7ae680fed5e2df919a151ea946a5 to your computer and use it in GitHub Desktop.
Save nazo/527f7ae680fed5e2df919a151ea946a5 to your computer and use it in GitHub Desktop.
gcloud-aio-bigquery snippet
import asyncio
from collections.abc import AsyncGenerator
import aiohttp
from gcloud.aio.bigquery import Job
from google.cloud import bigquery
from google.cloud.bigquery._helpers import _rows_from_json
from google.cloud.bigquery.table import _parse_schema_resource # type:ignore
async def query(
sql: str, max_results: int = 100
) -> AsyncGenerator[list[bigquery.Row], None]:
loop = asyncio.get_running_loop()
async with aiohttp.ClientSession(loop=loop) as session:
job = Job(session=session)
query_request = {
"query": sql,
"maxResults": max_results,
"useLegacySql": "false",
}
response = await job.query(query_request=query_request)
while True:
errors = response.get("errors")
page_token = response.get("pageToken")
if errors is not None:
raise RuntimeError(errors)
if response.get("jobComplete"):
if int(response.get("totalRows", 0)) == 0:
return
schema = _parse_schema_resource(response.get("schema", {}))
yield _rows_from_json(response.get("rows", ()), schema)
if page_token is None:
return
else:
await asyncio.sleep(1.0)
job = Job(
job_id=response["jobReference"]["jobId"],
project=response["jobReference"]["projectId"],
session=session,
)
params = {
"location": response["jobReference"]["location"],
"maxResults": max_results,
"pageToken": page_token,
}
response = await job.get_query_results(params=params)
async def main() -> None:
async for rows in query("SELECT * FROM `project_id.dataset_id.table`"):
for row in rows:
for key, value in row.items():
print(f"{key}={value}")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment