Last active
August 14, 2016 17:04
-
-
Save d0zingcat/ebaf97ceba5b865ee0cd369d191fa4ad to your computer and use it in GitHub Desktop.
The main feature of one python based back side for wechat official account (Powered by LeanEngine)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This code is for my post: Using Flask and LeanCloud to build a backend of Wechat Official Account | |
# welcome to visit the post: https://blog.d0zingcat.xyz/2016/07/24/Python/Using%20Flask%20and%20LeanCloud%20to%20build%20a%20backend%20of%20Wechat%20Official%20Account/#more | |
# coding: utf-8 | |
from datetime import datetime | |
from flask import render_template, Flask, request, make_response, jsonify, abort | |
from flask_sockets import Sockets | |
from views.todos import todos_view | |
import xml.etree.ElementTree as ET | |
import funcs | |
import leancloud | |
import requests | |
import json | |
error_message = "不是合法的命令!" | |
subsribe_message = "欢迎关注 Townes! 本公众号秉承开源共享的理念,旨在解决5位数字和4位数字之间的映射关系问题。发送5位数字即可查询对应的4位数!我们不保证此数字之间的关联性和可靠性,同样我们也不保证此处数字和其他任何地方以任何形式出现的数字之间的联系!详细的可以参考科学家们做的实验——无限猴子定理: https://zh.wikipedia.org/zh/%E7%84%A1%E9%99%90%E7%8C%B4%E5%AD%90%E5%AE%9A%E7%90%86 " | |
reg_message = "Please register first through this link:" | |
reply_template = "<xml><ToUserName><![CDATA[%s]]></ToUserName><FromUserName><![CDATA[%s]]></FromUserName><CreateTime>%s</CreateTime><MsgType><![CDATA[text]]></MsgType><Content><![CDATA[%s]]></Content><FuncFlag>0</FuncFlag></xml>" | |
valid_names = ['汤力', '窦书明', '高涛波', '邵青', '张涛', '安番玉', 'test'] | |
types = ['CHECKIN', 'CHECKOUT'] | |
tuple_numbers = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9') | |
app = Flask(__name__) | |
sockets = Sockets(app) | |
# 动态路由 | |
app.register_blueprint(todos_view, url_prefix='/todos') | |
tasks = [ | |
{ | |
'id': 1, | |
'title': u'Buy groceries', | |
'description': u'Milk, Cheese, Pizza, Fruit, Tylenol', | |
'done': False | |
}, | |
{ | |
'id': 2, | |
'title': u'Learn Python', | |
'description': u'Need to find a good Python tutorial on the web', | |
'done': False | |
} | |
] | |
@app.route('/todo/api/v1.0/tasks', methods=['GET']) | |
def get_tasks(): | |
return jsonify({'status': 200, 'message': 'success', 'tasks': tasks}) | |
@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['GET']) | |
def get_task(task_id): | |
task = filter(lambda t: t['id'] == task_id, tasks) | |
if len(task) == 0: | |
abort(404) | |
return jsonify({'status': 200, 'message': 'success', 'task': task[0]}) | |
@app.errorhandler(404) | |
def not_found(error): | |
return make_response(jsonify({'status': 200, 'message': 'error', 'error': 'Not found!'})) | |
@app.route('/', methods=['GET']) | |
def index(): | |
return make_response('<script>window.location.href = "http://bike.littermonster.net";</script>') | |
@app.route('/weixin', methods=['GET', 'POST']) | |
def weixinBot(): | |
# we have to judge the request method to classify which method it is, when GET(due to the signature validation | |
# process of wechat is using GET method, but the other behaviour are using POST method, so thre is one IF statement) | |
# For valication, When GET | |
if request.method == 'GET': | |
token = 'hacktheofobicycle' | |
query = request.args | |
signature = query.get('signature', '') | |
timestamp = query.get('timestamp', '') | |
nonce = query.get('nonce', '') | |
echostr = query.get('echostr', '') | |
if funcs.check_signatrue(signature, timestamp, nonce) == True: | |
return make_response(echostr) | |
return make_response('{"status":200, "message": "success"}') | |
else: | |
# For other behaviours, When POST | |
import time | |
timeseconds = time.time() | |
# Basic Information Getter | |
# In general, there will be five parameter in the post data, among which are ToUserName, FromUserName, Content, | |
# MsgType, Event, that's what we have to extra from post data | |
xml_recv = ET.fromstring(request.data) | |
ToUserName = xml_recv.find("ToUserName") | |
FromUserName = xml_recv.find("FromUserName") | |
Content = xml_recv.find("Content") | |
MessageType = xml_recv.find('MsgType') | |
MsgEvent = xml_recv.find('Event') | |
if ToUserName is not None: | |
ToUserName = ToUserName.text | |
else: | |
ToUserName = '' | |
if FromUserName is not None: | |
FromUserName = FromUserName.text | |
else: | |
FromUserName = '' | |
if Content is not None: | |
Content = Content.text | |
else: | |
Content = '' | |
if MessageType is not None: | |
MessageType = MessageType.text | |
else: | |
MessageType = '' | |
if MsgEvent is not None: | |
MsgEvent = MsgEvent.text | |
else: | |
MsgEvent = '' | |
# To split the command line into serveral single commands to make the program regcognized easily and | |
# The separator is '+' | |
listofcommands = Content.split('+') | |
# User register module | |
# This is for register, reg+{name}, check if the name is in the list Guys, then query the database to check | |
# if the name has been used for register, then prompt the user that the name he used to register has been taken | |
# by another man. | |
if 'reg' == listofcommands[0]: | |
# Check if the name is valid name and count in the valid list | |
# The name is read as Unicode in the RAM, so we have to encode the name into utf-8 format, then it can be recognized by | |
# recognized by the system. | |
if (listofcommands[1].encode('utf-8') in valid_names) == True: | |
# The fixed syntax of LeanEngine of python | |
Guys = leancloud.Object.extend('Guys') | |
query = Guys.query | |
# Check if the name among the list has been used to register | |
try: | |
hasoneuserreg = len(query.equal_to('name', listofcommands[1]).descending('createAt').find()) | |
except leancloud.LeanCloudError, e: | |
if e.code == 101: # 服务端对应的 Class 还没创建 | |
hasoneuserreg = 0 | |
else: | |
raise e | |
if hasoneuserreg != 0: | |
reply_message = 'Your name has already been used to registered!' | |
else: | |
# Reset the query conditions | |
query = Guys.query | |
# Check if the user is using another valid name to register, this statement uses the user's wechat | |
# openid to judge if the user has been registered, even he did not use his own name | |
try: | |
secondtimereg = len( | |
query.equal_to('openidfromuser', FromUserName).descending('createAt').find()) | |
except leancloud.LeanCloudError, e: | |
if e.code == 101: # 服务端对应的 Class 还没创建 | |
secondtimereg = 0 | |
else: | |
raise e | |
if secondtimereg != 0: | |
reply_message = 'You have already registered!' | |
else: | |
# Create user record in the user database | |
guys = Guys() | |
guys.set('openidfromuser', FromUserName) | |
guys.set('openidtouser', ToUserName) | |
guys.set('name', listofcommands[1]) | |
guys.save() | |
reply_message = listofcommands[1] + u', Happy to tell you Register Success!' | |
else: | |
# Error message, indicating that the user is not a valid user | |
reply_message = 'Your name is not in the list!' | |
# Encode the message and return to the wechat server to send back the user | |
response = make_response( | |
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), | |
(reply_message).encode('utf-8'))) | |
response.content_type = 'application/xml' | |
return response | |
# Checkin Checkout module | |
if 's' == listofcommands[0]: | |
hasnouser = False | |
# Check if the user has been registered! | |
Guys = leancloud.Object.extend('Guys') | |
query = Guys.query | |
try: | |
name = query.equal_to('openidfromuser', FromUserName).descending('createAt').find()[0].get('name') | |
except leancloud.LeanCloudError, e: | |
if e.code == 101: # 服务端对应的 Class 还没创建 | |
hasnouser = True | |
else: | |
raise e | |
# If True, means the user has not been registered, then prompt he to register first | |
if hasnouser: | |
reply_message = 'Sorry, No such user has been registered! Please register first!([reg+yourname])' | |
response = make_response( | |
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), | |
(reply_message))) | |
response.content_type = 'application/xml' | |
return response | |
# Else, add the checkin record in the database | |
Checkincheckout = leancloud.Object.extend('Checkincheckout') | |
query = Checkincheckout.query | |
# Fetch the time of {201x-xx-xx} formatted, to make a wild query about if there were checkin records within | |
# this day, which means one user cannot check in for a twice time within a single day | |
dateoftoday = str(datetime.now())[:11] | |
query.contains('intime', dateoftoday) | |
query.equal_to('name', name) | |
query.equal_to('type', types[0]) | |
try: | |
if len(query.descending('createAt').find()) == 1: | |
reply_message = "Sorry! You can not check in for more than two times within a day! " | |
response = make_response( | |
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), | |
(reply_message))) | |
response.content_type = 'application/xml' | |
return response | |
except leancloud.LeanCloudError, e: | |
if e.code == 101: # 服务端对应的 Class 还没创建 | |
pass | |
else: | |
raise e | |
# Prepare for add record in the database for checkin | |
checkincheckout = Checkincheckout() | |
checkincheckout.set('name', name) | |
checkincheckout.set('type', types[0]) | |
checkincheckout.set('intime', str(datetime.now())) | |
# if the count of single commands is less than 2, it means he did not write a comment, then make a common | |
# comment, else set the user's preferred comment | |
if len(listofcommands) == 2: | |
checkincheckout.set('statements', listofcommands[1]) | |
else: | |
checkincheckout.set('statements', 'abbreviation') | |
checkincheckout.save() | |
reply_message = "Congratulations, %s! You've checked in successfully!" % name | |
response = make_response( | |
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), | |
(reply_message))) | |
response.content_type = 'application/xml' | |
return response | |
# Check out module | |
if 'e' == listofcommands[0]: | |
# Judge if the user has registered | |
hasnouser = False | |
Guys = leancloud.Object.extend('Guys') | |
query = Guys.query | |
try: | |
name = query.equal_to('openidfromuser', FromUserName).descending('createAt').find()[0].get('name') | |
except leancloud.LeanCloudError, e: | |
if e.code == 101: # 服务端对应的 Class 还没创建 | |
hasnouser = True | |
else: | |
raise e | |
if hasnouser: | |
reply_message = 'Sorry, No such user has been registered! Please register first!([reg+yourname])' | |
response = make_response( | |
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), | |
(reply_message))) | |
response.content_type = 'application/xml' | |
return response | |
# Check if the user has checked out for more than one times within a day | |
dateoftoday = str(datetime.now())[:11] | |
Checkincheckout = leancloud.Object.extend('Checkincheckout') | |
query = Checkincheckout.query | |
query.contains('intime', dateoftoday) | |
query.equal_to('name', name) | |
query.equal_to('type', types[1]) | |
try: | |
if len(query.descending('createAt').find()) == 1: | |
reply_message = "Sorry! You can not check out for more than two times within a day! " | |
response = make_response( | |
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), | |
(reply_message))) | |
response.content_type = 'application/xml' | |
return response | |
except leancloud.LeanCloudError, e: | |
if e.code == 101: # 服务端对应的 Class 还没创建 | |
pass | |
else: | |
raise e | |
# Check if the user has checked in, if not, prompt he to check in first, or add one check out record | |
checkincheckout = Checkincheckout() | |
query = Checkincheckout.query | |
query.contains('intime', dateoftoday) | |
query.equal_to('name', name) | |
query.equal_to('type', types[0]) | |
try: | |
if len(query.descending('createAt').find()) == 1: | |
checkincheckout.set('name', name) | |
checkincheckout.set('type', types[1]) | |
checkincheckout.set('intime', str(datetime.now())) | |
if len(listofcommands) == 2: | |
checkincheckout.set('statements', listofcommands[1]) | |
else: | |
checkincheckout.set('statements', 'abbreviation') | |
checkincheckout.save() | |
reply_message = 'Congratulations, %s! You have been checked out successfully!' % name | |
response = make_response( | |
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), | |
(reply_message))) | |
response.content_type = 'application/xml' | |
return response | |
else: | |
reply_message = 'Sorry, You have not been checked in! Please checkin first!([s+abbreviation])' | |
response = make_response( | |
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), | |
(reply_message))) | |
response.content_type = 'application/xml' | |
return response | |
except leancloud.LeanCloudError, e: | |
if e.code == 101: # 服务端对应的 Class 还没创建 | |
pass | |
else: | |
raise e | |
# If is the first time that subsribe to this mp, then show him the welcome message | |
if MessageType == 'event' and MsgEvent == 'subscribe': | |
response = make_response( | |
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), subsribe_message)) | |
response.content_type = 'application/xml' | |
return response | |
# Judge if the command is for code_pass addition and query | |
elif listofcommands[0] == 'a': | |
# index out of range bug to be fixed | |
# The command is not valid, return error message | |
if len(listofcommands) < 3: | |
rContent = error_message | |
response = make_response(reply_template % (FromUserName, ToUserName, str(int(timeseconds)), rContent)) | |
response.content_type = 'application/xml' | |
return response | |
# The formatted command is "a+{5-digit}+{4-digit}", so we have to check if the parameters are such numbers | |
if len(listofcommands[1]) == 5 and len(listofcommands[2]) == 4: | |
flag = True | |
for i in listofcommands[1]: | |
# The command contains more than one non-digit character | |
if i not in tuple_numbers: | |
flag = False | |
for i in listofcommands[2]: | |
if i not in tuple_numbers: | |
flag = False | |
# If the command is valid, just call the api to add the code and pass | |
if flag == True: | |
myrequestjson = requests.get('http://code_pass.fairvalue.cc/api/v1.0/code_pass_setter/%s/%s' % ( | |
listofcommands[1], listofcommands[2])).json() | |
# Get the status code to judge if the code has been added successfully | |
if myrequestjson['status'] == 205 or myrequestjson['status'] == 200: | |
rContent = '添加成功!\n' | |
else: | |
rContent = '添加失败!请重新尝试添加!\n' | |
else: | |
rContent = error_message | |
# The command is not the valid command, not in the format "a+{5-digit}+{4-digit}" then return the error message | |
else: | |
rContent = error_message | |
response = make_response(reply_template % (FromUserName, ToUserName, str(int(timeseconds)), rContent)) | |
response.content_type = 'application/xml' | |
return response | |
else: | |
# use the Monster Api to get information | |
rContent = error_message | |
flag = True | |
for i in Content: | |
if i not in tuple_numbers: | |
flag = False | |
# If the command has only numbers and the length is 5 | |
if flag == True and len(Content) == 5: | |
rContent = "5位数字是: " + Content + ", 4位数字可能是(按权值排名): \n" | |
# rContent = Content | |
myrequest = requests.get('http://code_pass.fairvalue.cc/api/v1.0/code_pass_query/%s' % Content) | |
if myrequest.status_code == 200: | |
reList = funcs.password_getter(myrequest.json()); | |
if len(reList) == 0: | |
rContent = '没有查询到相关结果!'; | |
else: | |
for item in reList: | |
rContent = rContent + (str(item)) + " " | |
response = make_response(reply_template % (FromUserName, ToUserName, str(int(timeseconds)), rContent)) | |
response.content_type = 'application/xml' | |
return response | |
@app.route('/home') | |
def home(): | |
return render_template('index.html') | |
@app.route('/time') | |
def time(): | |
return str(datetime.now()) | |
@app.route('/hello/') | |
@app.route('/hello/<name>') | |
def hello(name=None): | |
return render_template('hello.html', name=name) | |
# Rewrite and format the Monster Apis [GETTER] | |
@app.route('/api/v1.0/code_pass_query/<code>', methods=['GET']) | |
def get_code(code): | |
formatted_url = 'http://bike.littermonster.net/get.php?src=app&bike_code=%s' | |
r = requests.get(formatted_url % code) | |
if r.status_code == 200: | |
return make_response(r.content) | |
# Rewrite and format the Monster Apis [GETTER] | |
@app.route('/api/v1.0/code_pass_setter/<code>/<password>', methods=['GET']) | |
def set_code(code, password): | |
formatted_url = 'http://bike.littermonster.net/add.php?src=app&bike_password=%s&bike_code=%s' | |
r = requests.get(formatted_url % (password, code)) | |
if r.status_code == 200: | |
return make_response(r.content) | |
@sockets.route('/echo') | |
def echo_socket(ws): | |
while True: | |
message = ws.receive() | |
ws.send(message) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment