Last active
January 28, 2024 05:23
-
-
Save nazo/527f7ae680fed5e2df919a151ea946a5 to your computer and use it in GitHub Desktop.
gcloud-aio-bigquery snippet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from collections.abc import AsyncGenerator | |
import aiohttp | |
from gcloud.aio.bigquery import Job | |
from google.cloud import bigquery | |
from google.cloud.bigquery._helpers import _rows_from_json | |
from google.cloud.bigquery.table import _parse_schema_resource # type:ignore | |
async def query( | |
sql: str, max_results: int = 100 | |
) -> AsyncGenerator[list[bigquery.Row], None]: | |
loop = asyncio.get_running_loop() | |
async with aiohttp.ClientSession(loop=loop) as session: | |
job = Job(session=session) | |
query_request = { | |
"query": sql, | |
"maxResults": max_results, | |
"useLegacySql": "false", | |
} | |
response = await job.query(query_request=query_request) | |
while True: | |
errors = response.get("errors") | |
page_token = response.get("pageToken") | |
if errors is not None: | |
raise RuntimeError(errors) | |
if response.get("jobComplete"): | |
if int(response.get("totalRows", 0)) == 0: | |
return | |
schema = _parse_schema_resource(response.get("schema", {})) | |
yield _rows_from_json(response.get("rows", ()), schema) | |
if page_token is None: | |
return | |
else: | |
await asyncio.sleep(1.0) | |
job = Job( | |
job_id=response["jobReference"]["jobId"], | |
project=response["jobReference"]["projectId"], | |
session=session, | |
) | |
params = { | |
"location": response["jobReference"]["location"], | |
"maxResults": max_results, | |
"pageToken": page_token, | |
} | |
response = await job.get_query_results(params=params) | |
async def main() -> None: | |
async for rows in query("SELECT * FROM `project_id.dataset_id.table`"): | |
for row in rows: | |
for key, value in row.items(): | |
print(f"{key}={value}") | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment