Skip to content

Instantly share code, notes, and snippets.

@nburn42
Created April 10, 2017 20:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nburn42/2c2a06e383aa6b04f84ed31548f1cb09 to your computer and use it in GitHub Desktop.
Save nburn42/2c2a06e383aa6b04f84ed31548f1cb09 to your computer and use it in GitHub Desktop.
import sys
import uuid
from apache_beam.io import iobase
from apache_beam.transforms import PTransform;
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import StandardOptions
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
import googledatastore
import apache_beam as beam
import json
import time
import MySQLdb
from MySQLdb.cursors import SSCursor
kind = "Part"
i = 0
ii = 0
conn = MySQLdb.connect(host = "xxx" , user = "xxx", passwd = "xxx", db = "xxx", cursorclass=SSCursor)
cursor = conn.cursor()
cursor.execute("SELECT * FROM part;")
while True:
rows = cursor.fetchmany(400)
if rows == ():
break
entities = []
for row in rows:
name = row[0]
task_entity = entity_pb2.Entity()
# Prepares the new entity
data = json.loads(row[2])
for key in data:
d = data[key]
if type(d) is list or type(d) is tuple:
if len(d) != 0:
googledatastore.helper.add_properties(task_entity, {key: d})
else:
googledatastore.helper.add_properties(task_entity, {key: d})
i += 1
if i % 100 == 0:
print "i", i, name
googledatastore.helper.add_key_path(task_entity.key, kind, name)
entities.append(task_entity)
count = 0
while count < 10:
try:
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'xxx'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://xxx/staging'
google_cloud_options.temp_location = 'gs://xxx/temp'
#options.view_as(StandardOptions).runner = 'DataflowRunner'
gcloud_options = options.view_as(GoogleCloudOptions)
p = beam.Pipeline(options=options)
(p
| 'add seed' >> beam.Create(entities)
| 'Write To Datastore' >> WriteToDatastore(gcloud_options.project))
print p.run()
count = 10
except Exception as e:
print e
sleep((count + 1) * 2)
count += 1
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment