Skip to content

Instantly share code, notes, and snippets.

@Tjorriemorrie
Created July 21, 2016 16:26
Show Gist options
  • Save Tjorriemorrie/1ebbac68440f5601cef3b47e2874542d to your computer and use it in GitHub Desktop.
Save Tjorriemorrie/1ebbac68440f5601cef3b47e2874542d to your computer and use it in GitHub Desktop.
gevent io
import argparse
import gevent
import gevent.monkey
import gevent.pool
import gevent.queue
import gevent.signal
import random
import signal
import time
import logging
import logging.config
import psutil
from config import *
from bigquery import BigQuery
from storage import Storage
from solr import Solr
logger = logging.getLogger('mc.loader')
gevent.signal(signal.SIGQUIT, gevent.killall)
gevent.get_hub().SYSTEM_ERROR += (BaseException,)
gevent.monkey.patch_all()
class Loader():
durations = []
solr_cluster_size = 40
def __init__(self):
self.bqs = BigQuery()
self.gcs = Storage()
self.solr = Solr()
def run(self, dataset, table, server, collection, bucket, truncate):
time_start = time.time()
logger.info('table: {}'.format(table))
logger.info('dataset: {}'.format(dataset))
logger.info('server: {}'.format(server))
logger.info('collection: {}'.format(collection))
logger.info('truncate: {}'.format(truncate))
data_type = self.bqs.max_value(dataset, table, 'type')
schema = self.bqs.schema(dataset, table)
multi_values = self.solr.multi_valued_fields(schema)
if not truncate:
delta = self.solr.delta(server, collection, data_type)
if delta:
query = 'SELECT * FROM {0}.{1} WHERE delta > "{2}"'.format(dataset, table, delta)
dataset, table = 'tmp', '{}_{}'.format(dataset, table)
self.bqs.query(dataset, table, query)
else:
logger.warn('Loader:run no delta found, turning on truncate')
truncate = True
self.bqs.export_storage(dataset, table, bucket)
destination_files = self.gcs.list_bucket(bucket, '{}/{}'.format(dataset, table))
if not destination_files:
logger.warn('{}: no new data, bucket empty'.format(table))
return
try:
if truncate:
self.solr.truncate_type(server, collection, data_type)
logger.info('Loader:index_files {} files to server: {} collection: {}'.format(
len(destination_files), server, collection)
)
solr_url = 'http://{}:8983/solr/{}/update?commit=false&softCommit=false&optimize=false&overwrite=true{}'.format(
server, collection, multi_values
)
logger.debug('Url: {}'.format(solr_url))
self.durations = [0] * len(destination_files)
queue = gevent.queue.Queue(items=[(f, bucket, solr_url) for f in destination_files])
pool = gevent.pool.Pool(size=self.solr_cluster_size)
backoff = 1
while not queue.empty():
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
if pool.full():
logger.warn('pool full, waiting...')
backoff += 1
elif cpu_percent < 90 and memory[1] > 1 << 31:
pool.apply_async(self.index_file, queue.get(), callback=self.eta)
backoff = max(1, backoff - 1)
else:
logger.warn('VM highly utilised [CPU: {:.0f} MEM: {:.0f}], waiting...'.format(cpu_percent, memory[2]))
backoff += 2
gevent.sleep(backoff * 10)
logger.info('Queue done')
pool.join()
logger.info('{} files indexed'.format(len(destination_files)))
except Exception:
raise
finally:
self.gcs.clear_bucket(bucket, destination_files)
logger.info('Total time: {:.0f}m'.format((time.time() - time_start) / 60))
def index_file(self, destination_file, bucket, solr_url):
gcs = Storage()
media = gcs.media(bucket, destination_file)
solr = Solr()
for _ in range(5):
try:
solr.index(solr_url, media, destination_file)
return
except RuntimeError as e:
if _ == 4: raise
gevent.sleep(_ * 60)
def eta(self, greenlet):
self.durations.append(time.time())
self.durations.pop(0)
durations_finished = [v for v in self.durations if v > 0]
avg = (max(durations_finished[-50:]) - min(durations_finished[-50:])) / float(len(durations_finished[-50:]))
remaining = (len(self.durations) - len(durations_finished)) * avg / 60.
completed = len(durations_finished) / float(len(self.durations)) * 100.
if durations_finished > 3:
logger.info('{:.0f}% completed - ETA {:.0f}m'.format(completed, remaining))
def test(self):
logger.info('start')
queue = gevent.queue.Queue(items=range(10))
logger.info('created queue'.format(len(queue)))
pool = gevent.pool.Pool(size=3)
logger.info('created manager')
backoff = 1
while not queue.empty():
cpu_percent = psutil.cpu_percent()
memory = psutil.virtual_memory()
logger.info('CPU: {:.0f} MEM: {:.0f}'.format(cpu_percent, memory[2]))
if pool.full():
logger.warn('pool is full, waiting...')
elif cpu_percent < 90 and memory[1] > 1 << 31:
pool.apply_async(self.test_task, (queue.get(),), callback=self.test_cb)
else:
logger.warn('vm is at max, waiting...')
backoff += 1
gevent.sleep(backoff)
logger.info('queue done')
pool.join()
logger.info('end')
def test_task(self, task):
logger.info('start {}'.format(task))
n = random.randint(2, 5)
gevent.sleep(n)
logger.info('end {}'.format(task))
def test_cb(self, greenlet):
logger.info('tick')
def main(*args):
loader = Loader()
loader.run(*args)
# loader.test()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('dataset')
parser.add_argument('table')
parser.add_argument('-s', '--server', default=SOLR_SERVER)
parser.add_argument('-c', '--collection', choices=['customers', 'details'])
parser.add_argument('-b', '--bucket', default=BUCKET_EXPORT)
parser.add_argument('-t', '--truncate', action='store_true')
args = parser.parse_args()
dataset = args.dataset
table = args.table
server = args.server
bucket = args.bucket
truncate = args.truncate
collection = args.collection if args.collection else ('customers' if table == 'customer_items' else 'details')
log_path = '/var/log/insights/{}'.format(PROJECT_ID)
LOGGING_CONFIG['handlers']['rotate_file']['filename'] = '{}/{}_{}.log'.format(log_path, dataset, table)
logging.config.dictConfig(LOGGING_CONFIG)
main(dataset, table, server, collection, bucket, truncate)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment