Skip to content

Instantly share code, notes, and snippets.

@candale
Created June 13, 2019 07:37
Show Gist options
  • Save candale/49e26396632405ada9c66b34aa01efd7 to your computer and use it in GitHub Desktop.
Save candale/49e26396632405ada9c66b34aa01efd7 to your computer and use it in GitHub Desktop.
fetch_cassandra_wide.py
def fetch_car_hist_items(limit=None, start=None, end=None):
class StopFetch(Exception):
pass
ses = get_or_create_session()
state = {
'last_token': -9223372036854775808,
'stop_token': 9223372036854775807,
'last_crawled_at': None,
'count': 0
}
if start is not None:
state['last_token'] = start
if end is not None:
state['stop_token'] = end
select_fields = 'base_url, crawled_at
def get_with_crawled_at(state, last_crawled_at):
while True:
current_count = 0
try_count = 1
statement = (
(
'SELECT {fields}, TOKEN(base_url) as b_token '
'FROM sjerlok.car_hist '
'WHERE TOKEN(base_url) = {last_token} AND crawled_at < {last_crawled_at} '
'LIMIT 100000 ALLOW FILTERING'
).format(
fields=select_fields, last_token=state['last_token'],
last_crawled_at=last_crawled_at
)
)
while True:
try:
start = time.time()
rows = list(ses.execute(statement, timeout=30))
logger.info(
'Took {} seconds to get 100k'.format(time.time() - start))
break
except Timeout:
logger.info(
'WARNING: got timeout on read car_hist crawled_at. Try count: {}'
.format(try_count)
)
try_count += 1
for row in rows:
last_crawled_at = row['crawled_at']
current_count += 1
state['count'] += 1
yield row
if state['count'] == limit:
logger.info('Reached limit. Stopping')
raise StopFetch
logger.info('Last crawled at: {}'.format(last_crawled_at))
if current_count < 100000:
state['last_token'] += 1
logger.info('Finished moving with crawled_at')
break
def get_with_token(state):
while True:
current_count = 0
last_crawled_at = None
try_count = 1
statement = (
(
'SELECT {fields}, TOKEN(base_url) as b_token '
'FROM sjerlok.car_hist WHERE TOKEN(base_url) >= {last_token} '
'LIMIT 100000'
)
.format(fields=select_fields, last_token=state['last_token'])
)
while True:
try:
start = time.time()
rows = list(ses.execute(statement, timeout=30))
logger.info(
'Took {} seconds to get 100k'.format(time.time() - start))
break
except Timeout:
logger.info(
'WARNING: got timeout on read car_hist token. Try count: {}'
.format(try_count)
)
try_count += 1
for row in rows:
b_token = row['b_token']
last_crawled_at = row['crawled_at']
current_count += 1
yield row
state['count'] += 1
if state['count'] == limit:
logger.info('Reached limit. Stopping')
raise StopFetch
if current_count < 100000:
logger.info(
'No more items to fetch. Last token: {}'
.format(state['last_token'])
)
break
# we have reached the proposed end
if state['stop_token'] < b_token:
break
if state['last_token'] == b_token:
logger.info('Moving with crawled_at')
yield from get_with_crawled_at(state, last_crawled_at)
else:
state['last_token'] = b_token
logger.info('Last token: {}'.format(b_token))
try:
yield from get_with_token(state)
except StopFetch:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment