Skip to content

Instantly share code, notes, and snippets.

@fduxiao
Last active May 14, 2017 13:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fduxiao/4413f4e9d6d3dbd421511bba24cd2a77 to your computer and use it in GitHub Desktop.
Save fduxiao/4413f4e9d6d3dbd421511bba24cd2a77 to your computer and use it in GitHub Desktop.
wxa server authorization
//app.js
App({
onLaunch: function () {
//调用API从本地缓存中获取数据
var logs = wx.getStorageSync('logs') || []
logs.unshift(Date.now())
wx.setStorageSync('logs', logs)
},
getUserInfo:function(cb){
var that = this
if(this.globalData.userInfo){
typeof cb == "function" && cb(this.globalData.userInfo)
}else{
//调用登录接口
that.wx_login(function(){
wx.getUserInfo({
success: function (res) {
that.globalData.userInfo = res.userInfo
typeof cb == "function" && cb(that.globalData.userInfo)
}
})
});
}
},
globalData:{
userInfo:null,
openid: '',
hash: '',
code:''
},
wx_login:function(cb) {
var that = this
wx.login({
success:function(res) {
that.globalData.code = res.code;
typeof cb == "function" && cb(res);
}
})
},
server_login:function(cb) {
var that = this
that.complete = function() {
wx.request({
url: 'https://xiao-ny.duckdns.org/pcs/',
data: {
code: that.globalData.code
},
header: {
'content-type': 'application/json'
},
method: 'POST',
success: function (res) {
var openid = res.data.openid
var hash = res.data.hash
wx.setStorageSync('openid', openid)
wx.setStorageSync('hash', hash)
typeof cb == "function" && cb();
},
fail: function (res) {
console.log(res);
}
})
}
wx.checkSession({
success: that.complete,
fail: function(){
that.wx_login(that.complete)
}
})
},
check_login: function(cb){
var that=this
var openid = wx.getStorageSync('openid')
var hash = wx.getStorageSync('hash')
var truecb = function(){
that.globalData.openid=wx.getStorageSync('openid')
that.globalData.hash = wx.getStorageSync('hash')
return typeof cb == "function" && cb()
}
wx.request({
url: 'https://xiao-ny.duckdns.org/pcs/',
data: {
openid: openid,
hash: hash,
code: that.globalData.code
},
header: {
'content-type': 'application/json'
},
method: 'POST',
success: function (res) {
if (res.data.errcode) {
that.server_login(truecb())
} else {
var openid = res.data.openid
var hash = res.data.hash
wx.setStorageSync('openid', openid)
wx.setStorageSync('hash', hash)
truecb()
}
},
fail: function (res) {
console.log(res);
}
})
},
request: function(v) {
var that = this
if(!v.method) {
v.method='GET'
}
this.check_login(function(){
v.data.openid = that.globalData.openid
v.data.hash = that.globalData.hash
wx.request({
url: v.url,
data: v.data,
header: { 'content-type': 'application/json'},
method: v.method,
success: v.success,
fail: v.fail,
complete: v.complete,
})
})
}
})
from flask import *
import os
import requests
import hashlib
from functools import wraps
from pymongo import MongoClient
app = Flask(__name__)
application = app
app.config.update(dict(
SECRET_KEY='SOME KEY',
))
wx_appid = os.environ.get('WXAPPID', 'DEFAULT')
wx_appsec = os.environ.get('WXAPPSEC', 'DEFAULT')
random_word = os.environ.get('RANDOM', 'DEFAULT')
def get_items(d, keys):
return {k: v for k, v in d.items() if k in keys}
def get_user_info(code):
url = "https://api.weixin.qq.com/sns/jscode2session?appid=%s&secret=%s&js_code=%s&grant_type=authorization_code" \
% (wx_appid, wx_appsec, code)
result = requests.get(url)
return result.json()
# You'd better use salt
def calculate_session_hash(openid):
return hashlib.sha1((random_word + openid).encode()).hexdigest()
def connect_db():
"""Connects to the specific database."""
client = MongoClient()
return client
def get_db():
if not hasattr(g, 'mongo_db'):
g.mongo_db = connect_db().pcswxa
return g.mongo_db
def check_user(f):
@wraps(f)
def wrapper(*args, **kwargs):
if application.debug:
session['openid'] = 'test'
session['hash'] = 'test'
return f(*args, **kwargs)
if request.json is None:
return 'Fuck off'
h = request.json.get('hash', None)
openid = request.json.get('openid', None)
if h is None or openid is None or calculate_session_hash(openid) != h: # wrong session
code = request.json.get('code', None) # get code then re-login
if code is None:
return "Fuck off"
info = get_user_info(code)
if 'errcode' in info:
return jsonify(info)
openid = info['openid']
h = calculate_session_hash(openid)
session['openid'] = openid
session['hash'] = h
return f(*args, **kwargs)
return wrapper
@app.route('/', methods=['GET', 'POST'])
@check_user
def index():
return jsonify(dict(
code=0,
openid=session['openid'],
hash=session['hash']
))
@app.route('/outside', methods=['POST'])
@check_user
def outside():
openid = session['openid']
db = get_db()
count = db.outside.find({'openid': openid}).count()
if count >= 5:
return jsonify({"code": -1, "err": '请求过多 Too many requests'})
info = get_items(request.json, ['telephone', 'name', 'stunum', 'address', 'time', 'desc'])
info['openid'] = openid
info['finished'] = False
db.outside.insert_one(info)
return jsonify({"code": 0})
if __name__ == '__main__':
app.run(debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment