Skip to content

Instantly share code, notes, and snippets.

@timfeirg
Last active December 19, 2018 08:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timfeirg/65b6821af0589bd5d1da51bb70af4a5b to your computer and use it in GitHub Desktop.
Save timfeirg/65b6821af0589bd5d1da51bb70af4a5b to your computer and use it in GitHub Desktop.
scrapy HBase cache storage
from datetime import datetime
import happybase
import pymysql
from impala.dbapi import connect as hive_connect
from impala.util import as_pandas
from pymysql.cursors import DictCursor
from retrying import retry
from thriftpy.transport import TTransportException
from beehive.settings import HBASE_CONFIG
from beehive.utils import decode_bytes, convert_to_string
class HBaseClient(object):
def __init__(self, hive_config):
self.hbase_pool = happybase.ConnectionPool(**hive_config)
def write(self, table_name, row_key, data):
"""Directly write data to hbase
Raises:
TTransportException: When shit goes wrong with thrift, there's nothing
we can do about it, thrift is a piece of shit.
"""
with self.hbase_pool.connection() as con:
table = con.table(table_name)
table.put(row_key, data)
@staticmethod
def get_modified_time(data):
"""Get max timestamp from one hbase row.
Args:
data (dict): raw hbase row from `table.row`, must include timestamp.
Returns:
datetime: python datetime object.
>>> HBaseClient.get_modified_time({'name': ('some restaurant', 1489147687495)})
datetime.datetime(2017, 3, 10, 20, 8, 7, 495000)
"""
unix_timestamp = max([t[1] for t in data.values()]) * 0.001
return datetime.fromtimestamp(unix_timestamp)
@staticmethod
def prepare_data(data, column_family):
"""Prepend column family to dict keys, and ensure all values are either
bytes or str, if there's numbers among values, try to convert them to
str
>>> HBaseClient.prepare_data({'name': '良好'}, 'r')
{'r:name': '良好'}
>>> HBaseClient.prepare_data({'average_cost': 95.0}, 'r')
{'r:average_cost': '95.0'}
"""
return dict(
('{}:{}'.format(column_family, k), convert_to_string(v))
for k, v in data.items()
)
@staticmethod
def unprepare_data(data, column_family):
"""Remove column_family and timestamps (if present) from dict keys, and
try to decode values using utf-8 (not guaranteed to decode though)
>>> HBaseClient.unprepare_data({b'column_family:average_cost': (b'95.0', 1489147687495)}, 'column_family')
{'average_cost': '95.0'}
>>> HBaseClient.unprepare_data({b'column_family:average_cost': b'95.0'}, 'column_family')
{'average_cost': '95.0'}
"""
if not data:
return data
prefix_length = len(column_family) + 1
random_value = next(iter(data.values()))
if isinstance(random_value, tuple):
return {decode_bytes(k[prefix_length:]): decode_bytes(t[0]) for k, t in data.items()}
return {decode_bytes(k[prefix_length:]): decode_bytes(v) for k, v in data.items()}
@retry(stop_max_attempt_number=2, retry_on_exception=lambda e: isinstance(e, TTransportException))
def get_row(self, table_name, row_key, **kwargs):
"""Get a single row from hbase, any kwargs will get passed to `table.get`"""
with self.hbase_pool.connection() as con:
table = con.table(table_name)
row = table.row(row_key, **kwargs)
return row
def scan(self, table_name, **kwargs):
with self.hbase_pool.connection() as con:
table = con.table(table_name)
return table.scan(**kwargs)
def delete(self, table_name, row_key, **kwargs):
with self.hbase_pool.connection() as con:
table = con.table(table_name)
table.delete(row_key, **kwargs)
hbase = HBaseClient(HBASE_CONFIG)
# -*- coding: utf-8 -*-
from scrapy.responsetypes import responsetypes
from scrapy.utils.python import to_bytes
from scrapy.utils.request import request_fingerprint
from six.moves import cPickle as pickle
from beehive.db import hbase
class HBaseCacheStorage(object):
"""Store cache in HBase, TTL is managed by HBase itself (specify TTL when
create HBase table)
literal blocks::
create 'beehive_cache', { NAME => 'p', COMPRESSION => 'SNAPPY' , TTL => 259200}
"""
def __init__(self, settings):
self.table_name = settings['HBASE_CACHE_TABLE_NAME']
self.column_family = settings['HBASE_CACHE_COLUMN_FAMILY']
def open_spider(self, spider):
self.db = hbase
def close_spider(self, spider):
pass
def retrieve_response(self, spider, request):
key = self._request_key(request)
spider.logger.debug('Retriving cache key %s', key)
hbase_data = self.db.get_row(self.table_name, key)
if not hbase_data:
return # not cached
data = pickle.loads(self.db.unprepare_data(hbase_data, self.column_family)['data'])
url = data['url']
status = data['status']
headers = data['headers']
body = data['body']
respcls = responsetypes.from_args(headers=headers, url=url)
spider.logger.debug('Request cache %s: body type %s, header %s, respcls %s', request, type(data['body']), headers, respcls)
try:
response = respcls(url=url, headers=headers, status=status, body=body)
except TypeError as e:
spider.logger.exception(e)
from scrapy.shell import Shell
import sys
sh = Shell(spider.crawler)
sh.vars['data'] = data
sh.vars['responsetypes'] = responsetypes
sh.start(url=data['url'], request=request, spider=spider)
sys.exit(1)
return response
def store_response(self, spider, request, response):
"""Writing picklized object into hbase is probably not a good idea, but
dealing with scrapy request object is incredibly difficult"""
key = self._request_key(request)
data = {
'status': response.status,
'url': response.url,
'headers': response.headers,
'body': response.body,
}
wrapped = {'data': pickle.dumps(data, protocol=2)}
hbase_data = self.db.prepare_data(wrapped, self.column_family)
self.db.write(self.table_name, key, hbase_data)
def _request_key(self, request):
return to_bytes(request_fingerprint(request))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment