Skip to content

Instantly share code, notes, and snippets.

@timhberry
Created January 17, 2023 12:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timhberry/9caae4cdbaa488a56f3a4ae7769dc98b to your computer and use it in GitHub Desktop.
Save timhberry/9caae4cdbaa488a56f3a4ae7769dc98b to your computer and use it in GitHub Desktop.
import csv
import json
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable
from time import sleep
from random import random
project_id = "ab-academy-demo"
topic_id = "city_speeds"
client = pubsub_v1.PublisherClient()
topic = client.topic_path(project_id, topic_id)
publish_futures = []
filename = "city_daily_speeds-10k.csv"
counter = 0
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")
return callback
with open(filename, encoding="ascii", errors="surrogateescape") as data_file:
reader = csv.reader(data_file)
# skip the first row of the CSV file.
next(reader)
for line in reader:
# prepare each line as a JSON record to match the Avro schema
# we don't actually need the schema in our script, unless we want
# to use a DatumWriter for binary insertion
record = {
"country": line[0],
"country_code": line[1],
"region": line[2],
"region_code": line[3],
"city": line[4],
"date": line[5],
"download_kbps": float(line[6]),
"upload_kbps": float(line[7]),
"total_tests": int(line[8]),
"distance_miles": float(line[9])
}
counter = counter + 1
data_str = json.dumps(record)
data = data_str.encode("utf-8")
publish_future = client.publish(topic, data)
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
print(counter)
# sleep between 0 and 1 seconds
sleep(random())
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment