Created
November 24, 2016 10:30
-
-
Save chrisk314/1cb9ce2723e9f5c066fdb073f97d279a to your computer and use it in GitHub Desktop.
Spark job script to load data messages from Moreover archive and insert them into BigTable.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import json | |
import sys | |
import os | |
os.environ["THEANO_FLAGS"] = "base_compiledir=/home/csk13/.theano" | |
import cPickle as pkl | |
import logging | |
import random | |
from time import time | |
from data_message_schema.transformers import moreover_xml_to_data_message | |
from data_message_schema.wrapper_interfaces import ObjectNotCompleteError | |
from streaming_pipeline.utils.loading_utils import load_json_config | |
from streaming_pipeline.utils.io_utils import GCSIO | |
from streaming_pipeline.document_id_assignment.id_assigner import DocumentIdAssigner | |
from streaming_pipeline.bigtable_processing.bigtable_processor import BigTableProcessor | |
def print_doc_id(dm): | |
print(dm.document_id, file=sys.stderr) | |
return dm | |
def create_data_message(xml): | |
try: | |
return moreover_xml_to_data_message(xml, '1.0.0') | |
except ObjectNotCompleteError: | |
return None | |
# Pick out a problem dm from the document_id | |
# def get_bad_dm(dm): | |
# if dm.document_id == '2015-01-01T22:19:44Z_http://www.mykosmos.gr/loc_mk/news/lifestyle/3004421/hmerhsies-problepseis-gia-ola-ta-zwdia-gia-thn-paraskeuh-21.htm'.encode('utf-8'): | |
# return dm | |
# else: | |
# return None | |
class BigTableProcessorWrapper(object): | |
def __init__(self, project_id, bigtable_config_path): | |
# Load BigTable config file | |
self.project_id = project_id | |
gcs_io = GCSIO(project_id=project_id) | |
config = load_json_config(bigtable_config_path, gcs_io.load) | |
bigtable_config = config['bigtable_config'] | |
self.bigtable_instance_id = bigtable_config['instance_id'] | |
#self.bigtable_document_table_id = bigtable_config['document_table_id'] | |
self.bigtable_document_table_id = 'test_historic_table' | |
# self.dm = pkl.load(open('/home/csk13/dm.pkl', 'r')) | |
def process_data_messages(self, dms): | |
bigtable_processor = BigTableProcessor( | |
self.bigtable_document_table_id, | |
self.bigtable_instance_id, | |
self.project_id | |
) | |
attempt = 0 | |
for dm in dms: | |
attempt += 1 | |
print('ATTEMPT {:d} @ {}'.format(attempt, time()), file=sys.stderr) | |
try: | |
bigtable_processor.process_data_message(dm) | |
if bigtable_processor.bigtable_entry_exists(dm.document_id): | |
print('SUCCESS adding {} to BigTable' | |
.format(dm.document_id), file=sys.stderr) | |
else: | |
print('FAILED adding {} to BigTable' | |
.format(dm.document_id), file=sys.stderr) | |
yield dm | |
except Exception as e: | |
print('FAILED adding {} to BigTable with EXCEPTION'. | |
format(dm.document_id), file=sys.stderr) | |
print('_RENDEZVOUS : {}'.format(e._state.code), file=sys.stderr) | |
break | |
pass | |
if not len(sys.argv) == 3: | |
print("Incorrect number of command line args", file=sys.stderr) | |
print("Usage:\narchive_to_bigtable.py <PROJECT_ID> <CYTORA_BIGTABLE_PROCESSING_CONFIGFILE_PATH>", file=sys.stderr) | |
exit(1) | |
else: | |
# Set project id and bigtable configuration path | |
project_id = sys.argv[1] | |
bigtable_config_path = sys.argv[2] | |
# Read project_name and bigtable config path from env variables | |
# project_id = os.path.expandvars('$PROJECT_ID') | |
# bigtable_config_path = os.path.expandvars('$CYTORA_BIGTABLE_PROCESSING_CONFIGFILE_PATH') | |
# Create wrapper object for bigtable_processor | |
bigtable_processor_wrapper = BigTableProcessorWrapper(project_id, bigtable_config_path) | |
# Uncomment these lines if you want to submit the code as a job via: | |
# $ gcloud beta dataproc jobs submit pyspark --async --cluster my-cluster-01 my_script.py | |
import pyspark | |
sc = pyspark.SparkContext() | |
# Set input data set | |
bq_outputs = sc.textFile("gs://cytora-test-bucket-1/moreover_all/xml_20150101/000000000000.json") | |
# bq_outputs = sc.textFile("gs://cytora-test-bucket-1/moreover_all/xml_2015*") | |
# bq_outputs = sc.textFile("gs://cytora-test-bucket-1/moreover_all/xml_20150101/*") | |
# Load raw data messages | |
articles_raw = bq_outputs.map(lambda record: json.loads(record)).map(lambda record: record['data']) | |
# Create data messages from raw articles | |
articles_dm = articles_raw.map(create_data_message).filter(lambda x: x != None) | |
print("\narticles_dm.count() {}\n".format(articles_dm.count()), file=sys.stderr) | |
# Assign ID to historic data messages | |
id_proc = DocumentIdAssigner().process_data_message | |
id_assigned_dm = articles_dm.map(id_proc) | |
print("\nid_assigned_dm.count() {}\n".format(id_assigned_dm.count()), file=sys.stderr) | |
#printed_dm = id_assigned_dm.map(print_doc_id) | |
#print("\nprinted_dm.count() {}\n".format(printed_dm.count()), file=sys.stderr) | |
# bad_dm = id_assigned_dm.map(get_bad_dm).filter(lambda x: x != None).first() | |
# with open('/home/chris/bad_dm.pkl', 'w') as f: | |
# pkl.dump(bad_dm, f) | |
# Insert data into BigTable | |
bt_inserted_dm = id_assigned_dm.mapPartitions(bigtable_processor_wrapper.process_data_messages) | |
print("\nbt_inserted_dm.count() {}\n".format(bt_inserted_dm.count()), file=sys.stderr) | |
#print 'num partitions : ', bt_inserted_dm.getNumPartitions() | |
#print bt_inserted_dm.count() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment