Skip to content

Instantly share code, notes, and snippets.

@nburn42
Created April 10, 2017 19:12
Show Gist options
  • Save nburn42/3c2c9974c2885d1bfe5d69ee79bd896c to your computer and use it in GitHub Desktop.
Save nburn42/3c2c9974c2885d1bfe5d69ee79bd896c to your computer and use it in GitHub Desktop.
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import SetupOptions
from apache_beam.utils.pipeline_options import StandardOptions
import googledatastore
import apache_beam as beam
import json
import time
import MySQLdb
from MySQLdb.cursors import SSCursor
kind = "part"
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
class get_rows(iobase.BoundedSource):
def __init__(self):
print "init"
self.conn = MySQLdb.connect(host = "xxx" , user = "xxx", passwd = "xxx", db = "xxx", cursorclass=MySQLdb.cursors.SSCursor)
self.cursor = self.conn.cursor()
self.cursor.execute("SELECT * FROM part;")
self.i = 0
self.ii = 0
def estimate_size(self):
print "estimate size"
return 5000000/400
def split(self, desired_bundle_size, start_position=None, end_position=None):
print "split"
for x in xrange(estimate_size()):
yield iobase.SourceBundle(weight=(1),
source=self,
start_position=x,
stop_position=x)
def get_range_tracker(self, start_position, end_position):
start, end = self._normalize(start_position, end_position)
return range_trackers.OffsetRangeTracker(start, end)
def read(self, range_tracker):
print "read"
rows = self.cursor.fetchmany(400)
if rows == ():
print "fetch done"
return
print "row batch"
for row in rows:
# The name/ID for the new entity
name = row[0]
# The Cloud Datastore key for the new entity
task_entity = entity_pb2.Entity()
googledatastore.helper.add_properties(task_entity, {'pullurl': unicode(row[1])})
googledatastore.helper.add_properties(task_entity, {'pulldate': unicode(row[3])})
# Prepares the new entity
data = json.loads(row[2])
for key in data:
if key == "partnumbervariations":
for num in data[key]:
name = num.replace("-", "").replace(" ","")
if key == "title":
if "title2" not in data.keys() or len(data["title2"]) == 0:
googledatastore.helper.add_properties(task_entity, {"title": data[key]})
elif key == "title2":
if len(data["title2"]) >= 0:
googledatastore.helper.add_properties(task_entity, {"title": data[key]})
elif key == "uses":
pass
elif key == "description":
googledatastore.helper.add_properties(task_entity, {key: [d.replace("Purchase this ", "") for d in data[key]]})
else:
d = data[key]
if type(d) is list or type(d) is tuple:
if len(d) != 0:
googledatastore.helper.add_properties(task_entity, {key: d})
else:
googledatastore.helper.add_properties(task_entity, {key: d})
self.i += 1
if self.i % 100 == 0:
print "i", self.i, name
googledatastore.helper.add_key_path(task_entity.key, kind, name)
yield task_entity
class ReadFromCountingSource(PTransform):
def __init__(self, **kwargs):
super(ReadFromCountingSource, self).__init__(**kwargs)
def expand(self, pcoll):
return pcoll | iobase.Read(get_rows())
options = PipelineOptions()
options.view_as(SetupOptions).save_main_session = True
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'xxxx'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://xxx/staging'
google_cloud_options.temp_location = 'gs://xxx/temp'
#options.view_as(StandardOptions).runner = 'DataflowRunner'
sys.setrecursionlimit(5000000/200)
p = beam.Pipeline(options=options)
gcloud_options = options.view_as(GoogleCloudOptions)
#(p
# | 'add seed' >> beam.Create(['seed'])
# | 'add from sql' >> beam.ParDo(get_rows())
# | 'Write To Datastore' >> WriteToDatastore(gcloud_options.project))
(p
| 'ProduceNumbers' >> ReadFromCountingSource()
| 'Write To Datastore' >> WriteToDatastore(gcloud_options.project))
result = p.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment