Created
June 13, 2019 07:37
-
-
Save candale/49e26396632405ada9c66b34aa01efd7 to your computer and use it in GitHub Desktop.
fetch_cassandra_wide.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def fetch_car_hist_items(limit=None, start=None, end=None): | |
class StopFetch(Exception): | |
pass | |
ses = get_or_create_session() | |
state = { | |
'last_token': -9223372036854775808, | |
'stop_token': 9223372036854775807, | |
'last_crawled_at': None, | |
'count': 0 | |
} | |
if start is not None: | |
state['last_token'] = start | |
if end is not None: | |
state['stop_token'] = end | |
select_fields = 'base_url, crawled_at | |
def get_with_crawled_at(state, last_crawled_at): | |
while True: | |
current_count = 0 | |
try_count = 1 | |
statement = ( | |
( | |
'SELECT {fields}, TOKEN(base_url) as b_token ' | |
'FROM sjerlok.car_hist ' | |
'WHERE TOKEN(base_url) = {last_token} AND crawled_at < {last_crawled_at} ' | |
'LIMIT 100000 ALLOW FILTERING' | |
).format( | |
fields=select_fields, last_token=state['last_token'], | |
last_crawled_at=last_crawled_at | |
) | |
) | |
while True: | |
try: | |
start = time.time() | |
rows = list(ses.execute(statement, timeout=30)) | |
logger.info( | |
'Took {} seconds to get 100k'.format(time.time() - start)) | |
break | |
except Timeout: | |
logger.info( | |
'WARNING: got timeout on read car_hist crawled_at. Try count: {}' | |
.format(try_count) | |
) | |
try_count += 1 | |
for row in rows: | |
last_crawled_at = row['crawled_at'] | |
current_count += 1 | |
state['count'] += 1 | |
yield row | |
if state['count'] == limit: | |
logger.info('Reached limit. Stopping') | |
raise StopFetch | |
logger.info('Last crawled at: {}'.format(last_crawled_at)) | |
if current_count < 100000: | |
state['last_token'] += 1 | |
logger.info('Finished moving with crawled_at') | |
break | |
def get_with_token(state): | |
while True: | |
current_count = 0 | |
last_crawled_at = None | |
try_count = 1 | |
statement = ( | |
( | |
'SELECT {fields}, TOKEN(base_url) as b_token ' | |
'FROM sjerlok.car_hist WHERE TOKEN(base_url) >= {last_token} ' | |
'LIMIT 100000' | |
) | |
.format(fields=select_fields, last_token=state['last_token']) | |
) | |
while True: | |
try: | |
start = time.time() | |
rows = list(ses.execute(statement, timeout=30)) | |
logger.info( | |
'Took {} seconds to get 100k'.format(time.time() - start)) | |
break | |
except Timeout: | |
logger.info( | |
'WARNING: got timeout on read car_hist token. Try count: {}' | |
.format(try_count) | |
) | |
try_count += 1 | |
for row in rows: | |
b_token = row['b_token'] | |
last_crawled_at = row['crawled_at'] | |
current_count += 1 | |
yield row | |
state['count'] += 1 | |
if state['count'] == limit: | |
logger.info('Reached limit. Stopping') | |
raise StopFetch | |
if current_count < 100000: | |
logger.info( | |
'No more items to fetch. Last token: {}' | |
.format(state['last_token']) | |
) | |
break | |
# we have reached the proposed end | |
if state['stop_token'] < b_token: | |
break | |
if state['last_token'] == b_token: | |
logger.info('Moving with crawled_at') | |
yield from get_with_crawled_at(state, last_crawled_at) | |
else: | |
state['last_token'] = b_token | |
logger.info('Last token: {}'.format(b_token)) | |
try: | |
yield from get_with_token(state) | |
except StopFetch: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment