Skip to content

Instantly share code, notes, and snippets.

@jmelloy
Created July 13, 2017 23:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmelloy/a43f53160fe77650d6b253a8d1740f09 to your computer and use it in GitHub Desktop.
Save jmelloy/a43f53160fe77650d6b253a8d1740f09 to your computer and use it in GitHub Desktop.
def test_handler(self):
sqs = boto3.resource("sqs", "us-west-2")
job_queue = sqs.get_queue_by_name(QueueName=self.queue_name)
job_queue.send_message(
MessageBody=json.dumps(self.data)
)
bq_http = HttpMockSequence([
({'status': '200'}, self.bigquery_discovery),
({'status': '200'}, self.bigquery_done), ])
cs_http = HttpMockSequence([
({'status': '200'}, self.storage_discovery),
({'status': '200'}, ''), ])
def service(name, scope=None, v=None):
print(name)
if name == "bigquery":
return build("bigquery", "v2", http=bq_http)
elif name == "storage":
return build("storage", "v1", http=cs_http)
with patch.dict("os.environ", {"jobs": self.queue_name}):
with patch("google.service", service):
job_check.handler({}, FakeContext(10))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment