Skip to content

Instantly share code, notes, and snippets.

@chrisk314
Created November 24, 2016 10:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chrisk314/1cb9ce2723e9f5c066fdb073f97d279a to your computer and use it in GitHub Desktop.
Save chrisk314/1cb9ce2723e9f5c066fdb073f97d279a to your computer and use it in GitHub Desktop.
Spark job script to load data messages from Moreover archive and insert them into BigTable.
from __future__ import print_function
import json
import sys
import os
os.environ["THEANO_FLAGS"] = "base_compiledir=/home/csk13/.theano"
import cPickle as pkl
import logging
import random
from time import time
from data_message_schema.transformers import moreover_xml_to_data_message
from data_message_schema.wrapper_interfaces import ObjectNotCompleteError
from streaming_pipeline.utils.loading_utils import load_json_config
from streaming_pipeline.utils.io_utils import GCSIO
from streaming_pipeline.document_id_assignment.id_assigner import DocumentIdAssigner
from streaming_pipeline.bigtable_processing.bigtable_processor import BigTableProcessor
def print_doc_id(dm):
print(dm.document_id, file=sys.stderr)
return dm
def create_data_message(xml):
try:
return moreover_xml_to_data_message(xml, '1.0.0')
except ObjectNotCompleteError:
return None
# Pick out a problem dm from the document_id
# def get_bad_dm(dm):
# if dm.document_id == '2015-01-01T22:19:44Z_http://www.mykosmos.gr/loc_mk/news/lifestyle/3004421/hmerhsies-problepseis-gia-ola-ta-zwdia-gia-thn-paraskeuh-21.htm'.encode('utf-8'):
# return dm
# else:
# return None
class BigTableProcessorWrapper(object):
def __init__(self, project_id, bigtable_config_path):
# Load BigTable config file
self.project_id = project_id
gcs_io = GCSIO(project_id=project_id)
config = load_json_config(bigtable_config_path, gcs_io.load)
bigtable_config = config['bigtable_config']
self.bigtable_instance_id = bigtable_config['instance_id']
#self.bigtable_document_table_id = bigtable_config['document_table_id']
self.bigtable_document_table_id = 'test_historic_table'
# self.dm = pkl.load(open('/home/csk13/dm.pkl', 'r'))
def process_data_messages(self, dms):
bigtable_processor = BigTableProcessor(
self.bigtable_document_table_id,
self.bigtable_instance_id,
self.project_id
)
attempt = 0
for dm in dms:
attempt += 1
print('ATTEMPT {:d} @ {}'.format(attempt, time()), file=sys.stderr)
try:
bigtable_processor.process_data_message(dm)
if bigtable_processor.bigtable_entry_exists(dm.document_id):
print('SUCCESS adding {} to BigTable'
.format(dm.document_id), file=sys.stderr)
else:
print('FAILED adding {} to BigTable'
.format(dm.document_id), file=sys.stderr)
yield dm
except Exception as e:
print('FAILED adding {} to BigTable with EXCEPTION'.
format(dm.document_id), file=sys.stderr)
print('_RENDEZVOUS : {}'.format(e._state.code), file=sys.stderr)
break
pass
if not len(sys.argv) == 3:
print("Incorrect number of command line args", file=sys.stderr)
print("Usage:\narchive_to_bigtable.py <PROJECT_ID> <CYTORA_BIGTABLE_PROCESSING_CONFIGFILE_PATH>", file=sys.stderr)
exit(1)
else:
# Set project id and bigtable configuration path
project_id = sys.argv[1]
bigtable_config_path = sys.argv[2]
# Read project_name and bigtable config path from env variables
# project_id = os.path.expandvars('$PROJECT_ID')
# bigtable_config_path = os.path.expandvars('$CYTORA_BIGTABLE_PROCESSING_CONFIGFILE_PATH')
# Create wrapper object for bigtable_processor
bigtable_processor_wrapper = BigTableProcessorWrapper(project_id, bigtable_config_path)
# Uncomment these lines if you want to submit the code as a job via:
# $ gcloud beta dataproc jobs submit pyspark --async --cluster my-cluster-01 my_script.py
import pyspark
sc = pyspark.SparkContext()
# Set input data set
bq_outputs = sc.textFile("gs://cytora-test-bucket-1/moreover_all/xml_20150101/000000000000.json")
# bq_outputs = sc.textFile("gs://cytora-test-bucket-1/moreover_all/xml_2015*")
# bq_outputs = sc.textFile("gs://cytora-test-bucket-1/moreover_all/xml_20150101/*")
# Load raw data messages
articles_raw = bq_outputs.map(lambda record: json.loads(record)).map(lambda record: record['data'])
# Create data messages from raw articles
articles_dm = articles_raw.map(create_data_message).filter(lambda x: x != None)
print("\narticles_dm.count() {}\n".format(articles_dm.count()), file=sys.stderr)
# Assign ID to historic data messages
id_proc = DocumentIdAssigner().process_data_message
id_assigned_dm = articles_dm.map(id_proc)
print("\nid_assigned_dm.count() {}\n".format(id_assigned_dm.count()), file=sys.stderr)
#printed_dm = id_assigned_dm.map(print_doc_id)
#print("\nprinted_dm.count() {}\n".format(printed_dm.count()), file=sys.stderr)
# bad_dm = id_assigned_dm.map(get_bad_dm).filter(lambda x: x != None).first()
# with open('/home/chris/bad_dm.pkl', 'w') as f:
# pkl.dump(bad_dm, f)
# Insert data into BigTable
bt_inserted_dm = id_assigned_dm.mapPartitions(bigtable_processor_wrapper.process_data_messages)
print("\nbt_inserted_dm.count() {}\n".format(bt_inserted_dm.count()), file=sys.stderr)
#print 'num partitions : ', bt_inserted_dm.getNumPartitions()
#print bt_inserted_dm.count()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment