Skip to content

Instantly share code, notes, and snippets.

@cameronneylon
Created August 7, 2019 02:55
Show Gist options
  • Save cameronneylon/43b542f7ddc8746484692875c6c99773 to your computer and use it in GitHub Desktop.
Save cameronneylon/43b542f7ddc8746484692875c6c99773 to your computer and use it in GitHub Desktop.
COKI code for pulling for CED
import requests
import datetime
from google.cloud import firestore
from google.cloud import storage
import time
import logging
db = firestore.Client()
def trigger(event, context):
# Get configuration and aquire the lock
config_ref = db.collection("configuration").document("crossref-event")
transaction = db.transaction()
configuration, locked = atomic_lock(transaction, config_ref)
print(configuration, locked)
if not locked:
# Pull values
next_cursor = configuration["next_cursor"]
next_itteration_number = configuration["next_itteration_number"]
destination_bucket_name = configuration["destination_bucket_name"]
email = configuration["email"]
results, filename, next_cursor, next_itteration_number = run_itteration(email, next_cursor, next_itteration_number)
write_results(destination_bucket_name, filename, results)
uncomplete = True
backoff = 3
attempts = 0
while(uncomplete):
try:
# Release lock
db.collection("configuration").document("crossref-event").update({
u"locked": False,
u"next_cursor": next_cursor,
u"next_itteration_number": next_itteration_number
})
uncomplete = False
except:
# Only try 3 times before failing
if attempts == 3:
logging.error(f"Update to firestore failed 3 times [successful fetch]")
raise Exception
logging.warning(f"Update to firestore failed, trying again in {backoff} seconds [successful fetch]")
attempts += 1
# Wait for [backoff] seconds
time.sleep(backoff)
backoff += 3
def fetch_events(email, cursor, num_rows):
if cursor == "start":
url = f"https://api.eventdata.crossref.org/v1/events?mailto={email}&rows={num_rows}"
else:
url = f"https://api.eventdata.crossref.org/v1/events?mailto={email}&rows={num_rows}&cursor={cursor}"
# Make the request
r = requests.get(url)
# Extract the next cursor from the response
output = r.text[0:200]
start = output.find("next-cursor") + 14
end = output.find("total") - 3
next_cursor = output[start:end]
return r.text, next_cursor
def run_itteration(email, cursor, itteration_number):
timestamp = str(datetime.datetime.now(datetime.timezone.utc))
next_itteration_number = itteration_number + 1
start = time.monotonic()
results, next_cursor = fetch_events(email, cursor, 10000)
end = time.monotonic()
request_duration = end - start
# Determine folder structure - bucketed into around ~1 million records, and ~100 files per folder
folder = int(itteration_number / 100)
filename = f"{folder}/{itteration_number}_{timestamp}_{cursor}_{next_cursor}_{request_duration}.json"
return results, filename, next_cursor, next_itteration_number
def write_results(destination_bucket_name, destination_blob_name, results):
bucket = storage.Client().get_bucket(destination_bucket_name)
bucket.blob(destination_blob_name).upload_from_string(results, content_type='text/plain')
@firestore.transactional
def atomic_lock(transaction, config_ref):
configuration = config_ref.get(transaction=transaction).to_dict()
if configuration["locked"] is False:
# Aquire lock
transaction.update(db.collection("configuration").document("crossref-event"), {
u"locked": True
})
return configuration, False
else:
return configuration, True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment