Created
July 21, 2016 16:26
-
-
Save Tjorriemorrie/1ebbac68440f5601cef3b47e2874542d to your computer and use it in GitHub Desktop.
gevent io
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import gevent | |
import gevent.monkey | |
import gevent.pool | |
import gevent.queue | |
import gevent.signal | |
import random | |
import signal | |
import time | |
import logging | |
import logging.config | |
import psutil | |
from config import * | |
from bigquery import BigQuery | |
from storage import Storage | |
from solr import Solr | |
logger = logging.getLogger('mc.loader') | |
gevent.signal(signal.SIGQUIT, gevent.killall) | |
gevent.get_hub().SYSTEM_ERROR += (BaseException,) | |
gevent.monkey.patch_all() | |
class Loader(): | |
durations = [] | |
solr_cluster_size = 40 | |
def __init__(self): | |
self.bqs = BigQuery() | |
self.gcs = Storage() | |
self.solr = Solr() | |
def run(self, dataset, table, server, collection, bucket, truncate): | |
time_start = time.time() | |
logger.info('table: {}'.format(table)) | |
logger.info('dataset: {}'.format(dataset)) | |
logger.info('server: {}'.format(server)) | |
logger.info('collection: {}'.format(collection)) | |
logger.info('truncate: {}'.format(truncate)) | |
data_type = self.bqs.max_value(dataset, table, 'type') | |
schema = self.bqs.schema(dataset, table) | |
multi_values = self.solr.multi_valued_fields(schema) | |
if not truncate: | |
delta = self.solr.delta(server, collection, data_type) | |
if delta: | |
query = 'SELECT * FROM {0}.{1} WHERE delta > "{2}"'.format(dataset, table, delta) | |
dataset, table = 'tmp', '{}_{}'.format(dataset, table) | |
self.bqs.query(dataset, table, query) | |
else: | |
logger.warn('Loader:run no delta found, turning on truncate') | |
truncate = True | |
self.bqs.export_storage(dataset, table, bucket) | |
destination_files = self.gcs.list_bucket(bucket, '{}/{}'.format(dataset, table)) | |
if not destination_files: | |
logger.warn('{}: no new data, bucket empty'.format(table)) | |
return | |
try: | |
if truncate: | |
self.solr.truncate_type(server, collection, data_type) | |
logger.info('Loader:index_files {} files to server: {} collection: {}'.format( | |
len(destination_files), server, collection) | |
) | |
solr_url = 'http://{}:8983/solr/{}/update?commit=false&softCommit=false&optimize=false&overwrite=true{}'.format( | |
server, collection, multi_values | |
) | |
logger.debug('Url: {}'.format(solr_url)) | |
self.durations = [0] * len(destination_files) | |
queue = gevent.queue.Queue(items=[(f, bucket, solr_url) for f in destination_files]) | |
pool = gevent.pool.Pool(size=self.solr_cluster_size) | |
backoff = 1 | |
while not queue.empty(): | |
cpu_percent = psutil.cpu_percent() | |
memory = psutil.virtual_memory() | |
if pool.full(): | |
logger.warn('pool full, waiting...') | |
backoff += 1 | |
elif cpu_percent < 90 and memory[1] > 1 << 31: | |
pool.apply_async(self.index_file, queue.get(), callback=self.eta) | |
backoff = max(1, backoff - 1) | |
else: | |
logger.warn('VM highly utilised [CPU: {:.0f} MEM: {:.0f}], waiting...'.format(cpu_percent, memory[2])) | |
backoff += 2 | |
gevent.sleep(backoff * 10) | |
logger.info('Queue done') | |
pool.join() | |
logger.info('{} files indexed'.format(len(destination_files))) | |
except Exception: | |
raise | |
finally: | |
self.gcs.clear_bucket(bucket, destination_files) | |
logger.info('Total time: {:.0f}m'.format((time.time() - time_start) / 60)) | |
def index_file(self, destination_file, bucket, solr_url): | |
gcs = Storage() | |
media = gcs.media(bucket, destination_file) | |
solr = Solr() | |
for _ in range(5): | |
try: | |
solr.index(solr_url, media, destination_file) | |
return | |
except RuntimeError as e: | |
if _ == 4: raise | |
gevent.sleep(_ * 60) | |
def eta(self, greenlet): | |
self.durations.append(time.time()) | |
self.durations.pop(0) | |
durations_finished = [v for v in self.durations if v > 0] | |
avg = (max(durations_finished[-50:]) - min(durations_finished[-50:])) / float(len(durations_finished[-50:])) | |
remaining = (len(self.durations) - len(durations_finished)) * avg / 60. | |
completed = len(durations_finished) / float(len(self.durations)) * 100. | |
if durations_finished > 3: | |
logger.info('{:.0f}% completed - ETA {:.0f}m'.format(completed, remaining)) | |
def test(self): | |
logger.info('start') | |
queue = gevent.queue.Queue(items=range(10)) | |
logger.info('created queue'.format(len(queue))) | |
pool = gevent.pool.Pool(size=3) | |
logger.info('created manager') | |
backoff = 1 | |
while not queue.empty(): | |
cpu_percent = psutil.cpu_percent() | |
memory = psutil.virtual_memory() | |
logger.info('CPU: {:.0f} MEM: {:.0f}'.format(cpu_percent, memory[2])) | |
if pool.full(): | |
logger.warn('pool is full, waiting...') | |
elif cpu_percent < 90 and memory[1] > 1 << 31: | |
pool.apply_async(self.test_task, (queue.get(),), callback=self.test_cb) | |
else: | |
logger.warn('vm is at max, waiting...') | |
backoff += 1 | |
gevent.sleep(backoff) | |
logger.info('queue done') | |
pool.join() | |
logger.info('end') | |
def test_task(self, task): | |
logger.info('start {}'.format(task)) | |
n = random.randint(2, 5) | |
gevent.sleep(n) | |
logger.info('end {}'.format(task)) | |
def test_cb(self, greenlet): | |
logger.info('tick') | |
def main(*args): | |
loader = Loader() | |
loader.run(*args) | |
# loader.test() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser() | |
parser.add_argument('dataset') | |
parser.add_argument('table') | |
parser.add_argument('-s', '--server', default=SOLR_SERVER) | |
parser.add_argument('-c', '--collection', choices=['customers', 'details']) | |
parser.add_argument('-b', '--bucket', default=BUCKET_EXPORT) | |
parser.add_argument('-t', '--truncate', action='store_true') | |
args = parser.parse_args() | |
dataset = args.dataset | |
table = args.table | |
server = args.server | |
bucket = args.bucket | |
truncate = args.truncate | |
collection = args.collection if args.collection else ('customers' if table == 'customer_items' else 'details') | |
log_path = '/var/log/insights/{}'.format(PROJECT_ID) | |
LOGGING_CONFIG['handlers']['rotate_file']['filename'] = '{}/{}_{}.log'.format(log_path, dataset, table) | |
logging.config.dictConfig(LOGGING_CONFIG) | |
main(dataset, table, server, collection, bucket, truncate) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment