Skip to content

Instantly share code, notes, and snippets.

View tseaver's full-sized avatar

Tres Seaver tseaver

View GitHub Profile
import csv
import datetime
import io
import random
import string
from google.api_core import exceptions
from google.cloud.storage import Client
BUCKET_NAME = "gcp-storage-repro-462"
import datetime
import requests
from google.api_core import exceptions
from google.cloud import storage
BUCKET_NAME = 'repro-gcp-9163'
BLOB_NAME = 'test.txt'
PAYLOAD = b"DEADBEEF"
@tseaver
tseaver / repro_gcp_7756.py
Created August 1, 2019 20:54
Reproducer for google-cloud-python #7756
import sys
from google.cloud.firestore_v1 import Client
def cleanup(coll):
for doc in coll.list_documents():
print("Removing: {}".format(doc.path))
doc.delete()
@tseaver
tseaver / update-st-nested-dot.textproto
Created February 6, 2019 20:29
textprot conformance test case for g-c-python #7125
description: "update: nested ServerTimestamp field via dotted names"
update: <
doc_ref_path: "projects/projectID/databases/(default)/documents/C/d"
json_data: "{\"a.b\": 1, \"a.c\": \"ServerTimestamp\"}"
request: <
database: "projects/projectID/databases/(default)"
writes: <
update: <
name: "projects/projectID/databases/(default)/documents/C/d"
fields: <
@tseaver
tseaver / gcs_example_with_pipes.py
Created January 30, 2019 17:54
GCS example with pipes
import os
import threading
from google.cloud.storage import Client
PAYLOAD = b'DEADBEEF' * 100000
def reader(in_file):
count = 0
@tseaver
tseaver / repro_gcp_7215.py
Created January 29, 2019 17:21
Reproduce gcp-7215
import os
import pprint
from google.oauth2 import service_account
from google.cloud import firestore_v1beta1
def prepare_document(client):
document = client.collection('test').document()
document.set({
"foo":{
@tseaver
tseaver / test_gcp_5111.py
Created July 25, 2018 20:19
GCP #5111: sample showing nested 'exclude_from_indexes'
import datetime
import uuid
from google.cloud import datastore
def main():
ds = datastore.Client()
data = {
# System test for 'Blob.generate_signed_url' w/ non-ASCII blob name.
def test_create_signed_read_url_w_non_ascii_name(self):
blob = self.bucket.blob(u'Caf\xe9.txt')
payload = b'Test signed URL for blob w/ non-ASCII name'
blob.upload_from_string(payload)
self.case_blobs_to_delete.append(blob)
expiration = int(time.time() + 10)
signed_url = blob.generate_signed_url(expiration, method='GET',
import os
import sys
from google.cloud import storage
BUCKET_NAME = 'gcp-5254-repro'
FILE_NAME = 'dataset/VIDEO-2018-06-06-12-26-54.mp4'
import pprint
from google.api_core import exceptions
from google.cloud import bigquery
def setup_dataset(client):
ds_ref = client.dataset('gcp_5539')
try:
return client.get_dataset(ds_ref)