Instantly share code, notes, and snippets.

What would you like to do?
Api Key + Security Key + Sign.
# coding=utf-8
:copyright: (c) 2016 by fangpeng(
:license: MIT, see LICENSE for more details.
import hmac
import hashlib
import base64
import redis
_pool = redis.ConnectionPool()
from config import __key__
__all__ = [
def gen_sign(api_key, security_key, timestamp, url, is_hex=False):
msg = api_key + security_key + str(timestamp) + url
key = __key__ + api_key
return encrypt(key, msg, is_hex)
def encrypt(key, msg, is_hex=False):
采用hmac sha256算法构造一个hash值的签名(sign)
:param msg:
dig =, str(msg), digestmod=hashlib.sha256)
_dig = dig.digest() if not is_hex else dig.hexdigest()
return base64.b64encode(_dig).decode()
def check_sign(sign, api_key, security_key, timestamp, url, is_hex=False):
return gen_sign(api_key, security_key, timestamp, url, is_hex) == sign
def _md5(slat, msg):
m = hashlib.md5()
m.update(slat + msg)
return m.hexdigest()
def gen_security_key(sid):
# 生成私钥
import uuid
import time
slat = str(uuid.uuid4()).replace('-', '') + str(int(time.time()))
return _md5(slat, sid)
# coding=utf-8
:copyright: (c) 2016 by fangpeng(
:license: MIT, see LICENSE for more details.
import logging
import json
import functools
import exceptions
import tornado.gen
import tornado.web
from tornado.escape import json_decode
from tornado.gen import coroutine, Return
from tornado.concurrent import is_future
from ..utility import json_process
from service import APiServiceHandlerRequest
class ServiceException(exceptions.Exception):
def __init__(self, code=0, msg="", data=None):
self.code = code
self.message = msg = data
def __str__(self):
return repr(self.message)
class BaseHandler(tornado.web.RequestHandler, APiServiceHandlerRequest):
def on_finish(self):
self.json_data = {}
def write_error(self, status_code, **kwargs):
if status_code == 500:
err = kwargs['exc_info'][1]
if isinstance(err, tornado.gen.Return):
elif isinstance(err, ServiceException):
self.render_json({'code': 500, 'data': None, 'msg': err.message})
self.render_json({'code': 500, 'data': None, 'msg': err})
super(BaseHandler, self).write_error(status_code, **kwargs)
def return_error(self, code=500, msg=u"请求异常", data=None):
return {'code': code, 'data': data, 'msg': msg}
def raise_error(self, code=500, msg=u'请求异常', data=None):
raise tornado.gen.Return(self.return_error(code, msg, data))
def render_json(self, data):
self.set_header("Content-Type", "application/json")
self.write(self.jsonp(json.dumps(data, default=json_process)))
def jsonp(self, data_json):
jsonp_callback = self.get_argument("jsonpCallback", None)
if jsonp_callback is None or jsonp_callback == "":
return data_json
self.set_header("Content-Type", "application/javascript")
return "%s(%s)" % (jsonp_callback, data_json)
def get_valid_params(self, params=None, **kwargs):
req_data = json_decode(self.request.body)
req_data = {k: ''.join(v) for k, v in
valid = self.valid_request(self.request.path, kwargs)
if valid:
if isinstance(valid, tuple):
return self.return_error(*valid)
return self.return_error(1000, msg=valid)
for p in params:
if p not in kwargs:
return self.return_error(1003, msg=u'missing param:%s' % p)
if hasattr(self, 'user_id'):
kwargs['user_id'] = self.user_id
self.json_data = kwargs
return kwargs
def get_req_params(self):
return self.json_data
def asyn_return(self, value={}):
raise tornado.gen.Return(value)
def yield_valid(self, res, cond=True, msg=u'请求数据异常', code=500):
if is_future(res):
for i in range(2):
result = yield res
if result != cond:
raise ServiceException(code, msg)
raise Return(result)
def yield_valid_count(self, res, cond, msg=u'数据异常', code=500):
if is_future(res):
for i in range(2):
result = yield res
if eval(str(result) + cond):
raise ServiceException(code, msg)
def handler(*args, **kwargs):
from tornado_route import route
from crm.handler import BaseHandler, handler
class CommonHandler(BaseHandler):
@handler('name', 'age')
def get(self, *args, **kwargs):
return {'good': 100}
class FutureHandler(BaseHandler):
def get(self, *args, **kwargs):
db = self.settings['db']
ids = []
for x in range(10000):
_id = yield db.crm.insert({
'name': 'fang',
raise tornado.gen.Return(ids)
params = args
params_kw = kwargs
def deal(fn):
def wrapper(self, *args, **kwargs):
result = {'code': 0}
req_datas = self.get_valid_params(params, **params_kw)
if req_datas.get('code', None) and req_datas.get('msg', ''):
result = req_datas
res = fn(self, *args, **kwargs)
if is_future(res):
result['data'] = yield res
if result['data'] and 'code' and 'msg' in result['data']:
result = result['data']
result['data'] = res
except ServiceException as ex:
result['code'] = ex.code
result['data'] = None
result['msg'] = ex.message
except Exception as ex:
result['code'] = 500
result['data'] = str(ex)
result['msg'] = u'服务异常'
return wrapper
return deal
# coding=utf-8
:copyright: (c) 2016 by fangpeng(
:license: MIT, see LICENSE for more details.
__key__ = "abcsd23w3#@xsdf.."
__crypto_uid_key__ = 12233
# coding=utf-8
:copyright: (c) 2016 by fangpeng(
:license: MIT, see LICENSE for more details.
import logging
import base64
import time
import struct
import exceptions
import redis
from functools import wraps, partial
from . import check_sign, _pool
from .config import __crypto_uid_key__, ANONYMOUS_URLS
R = redis.Redis(connection_pool=_pool)
class ServiceException(exceptions.Exception):
def __init__(self, code=400, msg="", data=None):
self.code = code
self.message = msg = data
def __str__(self):
return self.message
def wrapper_log(fn):
def wrapper(*args, **kwargs):
return fn(*args, **kwargs)
except Exception as ex:
err = "[Err]: " + fn.__name__ + " : " + ex.message
return 'Invalid request.'
return wrapper
class APiServiceHandlerRequest(object):
def __init__(self):
self.user_id = 0
self.sid = ''
self.url = ''
self.api_key = ''
self.security_key = ''
self.device_type = ''
self.version = ''
def valid_request(self, url, req_data):
# 过滤不需要校验的url
if url == '/api/app/version':
self.sid = req_data.get(
req_data.get('sid', '')).replace(' ', '+')
uid = self.get_uid(self.sid)
self.user_id = uid
self.url = url
funcs = (partial(f, args) for f, args in [
(self.valid_req_field, req_data), # 验证请求字段是否齐全
(self.valid_timestarp, req_data.get('timestamp', '')),
# 验证请求时间戳
(self.valid_version, req_data.get('version', '')), # 校验设备与版本号
(self.valid_security_key, req_data.get('api_key', '')), # 验证user_id 和 # security_key noqa
(self.valid_sign, req_data) # 验证sign签名
for func in funcs:
valid = func()
if valid:
return valid
def valid_req_field(self, req_data, *args):
check_fields = args
if not check_fields:
check_fields=('api_key', 'timestamp', 'sign', 'version')
check = map(lambda x: x in req_data, check_fields)
if all(check) is False:
dic = dict(zip(check_fields, check))
missing_fields = ','.join([k for k, v in dic.items() if v is False])
return 'Missing fields %s' % missing_fields
def valid_timestarp(self, timestamp, expires=10*60):
if not timestamp:
return "Invalid Request, Missing timestamp."
now = int(time.time())
timestamp = int(timestamp)
if abs(now - timestamp) > expires:
return "The request is out of date."
def valid_security_key(self, api_key):
if not api_key:
return 'Missing api_key'
self.api_key = api_key.replace(' ', '+')
self.sid = self.api_key
uid = self.get_uid(self.api_key)
if not uid:
return 'UserId not found.'
self.user_id = uid
# get sessionID, 简称sid
sid = '' # get by redis.
if not sid:
return (400, u'您的账号已过期, 请重新登录')
if self.api_key != sid:
return (401, u'账号在其他设备上登陆, 如不是您本人操作请修改账号密码')
if not R.exists(self.api_key): # AK 映射 SK
return (403, u'您的账号已过期, 请重新登录')
self.security_key = R.get(self.api_key)
def valid_sign(self, req_data):
is_hex = False
# if self.device_type == 'ios':
# is_hex = True
if not check_sign(req_data['sign'], self.api_key, self.security_key,
req_data['timestamp'], self.url, is_hex):
return 'Invalid signature.'
def get_uid(self, sid):
def valid_version(self, version):
if not version:
return 'Missing Request Version.'
if device_type.lower() not in ('android', 'ios', 'pc', 'web', 'api'):
return 'Invalid Version.'
self.device_type = device_type.lower()
self.version = v
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment