Skip to content

Instantly share code, notes, and snippets.

@d0zingcat
Last active August 14, 2016 17:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save d0zingcat/ebaf97ceba5b865ee0cd369d191fa4ad to your computer and use it in GitHub Desktop.
Save d0zingcat/ebaf97ceba5b865ee0cd369d191fa4ad to your computer and use it in GitHub Desktop.
The main feature of one python based back side for wechat official account (Powered by LeanEngine)
# This code is for my post: Using Flask and LeanCloud to build a backend of Wechat Official Account
# welcome to visit the post: https://blog.d0zingcat.xyz/2016/07/24/Python/Using%20Flask%20and%20LeanCloud%20to%20build%20a%20backend%20of%20Wechat%20Official%20Account/#more
# coding: utf-8
from datetime import datetime
from flask import render_template, Flask, request, make_response, jsonify, abort
from flask_sockets import Sockets
from views.todos import todos_view
import xml.etree.ElementTree as ET
import funcs
import leancloud
import requests
import json
error_message = "不是合法的命令!"
subsribe_message = "欢迎关注 Townes! 本公众号秉承开源共享的理念,旨在解决5位数字和4位数字之间的映射关系问题。发送5位数字即可查询对应的4位数!我们不保证此数字之间的关联性和可靠性,同样我们也不保证此处数字和其他任何地方以任何形式出现的数字之间的联系!详细的可以参考科学家们做的实验——无限猴子定理: https://zh.wikipedia.org/zh/%E7%84%A1%E9%99%90%E7%8C%B4%E5%AD%90%E5%AE%9A%E7%90%86 "
reg_message = "Please register first through this link:"
reply_template = "<xml><ToUserName><![CDATA[%s]]></ToUserName><FromUserName><![CDATA[%s]]></FromUserName><CreateTime>%s</CreateTime><MsgType><![CDATA[text]]></MsgType><Content><![CDATA[%s]]></Content><FuncFlag>0</FuncFlag></xml>"
valid_names = ['汤力', '窦书明', '高涛波', '邵青', '张涛', '安番玉', 'test']
types = ['CHECKIN', 'CHECKOUT']
tuple_numbers = ('0', '1', '2', '3', '4', '5', '6', '7', '8', '9')
app = Flask(__name__)
sockets = Sockets(app)
# 动态路由
app.register_blueprint(todos_view, url_prefix='/todos')
tasks = [
{
'id': 1,
'title': u'Buy groceries',
'description': u'Milk, Cheese, Pizza, Fruit, Tylenol',
'done': False
},
{
'id': 2,
'title': u'Learn Python',
'description': u'Need to find a good Python tutorial on the web',
'done': False
}
]
@app.route('/todo/api/v1.0/tasks', methods=['GET'])
def get_tasks():
return jsonify({'status': 200, 'message': 'success', 'tasks': tasks})
@app.route('/todo/api/v1.0/tasks/<int:task_id>', methods=['GET'])
def get_task(task_id):
task = filter(lambda t: t['id'] == task_id, tasks)
if len(task) == 0:
abort(404)
return jsonify({'status': 200, 'message': 'success', 'task': task[0]})
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({'status': 200, 'message': 'error', 'error': 'Not found!'}))
@app.route('/', methods=['GET'])
def index():
return make_response('<script>window.location.href = "http://bike.littermonster.net";</script>')
@app.route('/weixin', methods=['GET', 'POST'])
def weixinBot():
# we have to judge the request method to classify which method it is, when GET(due to the signature validation
# process of wechat is using GET method, but the other behaviour are using POST method, so thre is one IF statement)
# For valication, When GET
if request.method == 'GET':
token = 'hacktheofobicycle'
query = request.args
signature = query.get('signature', '')
timestamp = query.get('timestamp', '')
nonce = query.get('nonce', '')
echostr = query.get('echostr', '')
if funcs.check_signatrue(signature, timestamp, nonce) == True:
return make_response(echostr)
return make_response('{"status":200, "message": "success"}')
else:
# For other behaviours, When POST
import time
timeseconds = time.time()
# Basic Information Getter
# In general, there will be five parameter in the post data, among which are ToUserName, FromUserName, Content,
# MsgType, Event, that's what we have to extra from post data
xml_recv = ET.fromstring(request.data)
ToUserName = xml_recv.find("ToUserName")
FromUserName = xml_recv.find("FromUserName")
Content = xml_recv.find("Content")
MessageType = xml_recv.find('MsgType')
MsgEvent = xml_recv.find('Event')
if ToUserName is not None:
ToUserName = ToUserName.text
else:
ToUserName = ''
if FromUserName is not None:
FromUserName = FromUserName.text
else:
FromUserName = ''
if Content is not None:
Content = Content.text
else:
Content = ''
if MessageType is not None:
MessageType = MessageType.text
else:
MessageType = ''
if MsgEvent is not None:
MsgEvent = MsgEvent.text
else:
MsgEvent = ''
# To split the command line into serveral single commands to make the program regcognized easily and
# The separator is '+'
listofcommands = Content.split('+')
# User register module
# This is for register, reg+{name}, check if the name is in the list Guys, then query the database to check
# if the name has been used for register, then prompt the user that the name he used to register has been taken
# by another man.
if 'reg' == listofcommands[0]:
# Check if the name is valid name and count in the valid list
# The name is read as Unicode in the RAM, so we have to encode the name into utf-8 format, then it can be recognized by
# recognized by the system.
if (listofcommands[1].encode('utf-8') in valid_names) == True:
# The fixed syntax of LeanEngine of python
Guys = leancloud.Object.extend('Guys')
query = Guys.query
# Check if the name among the list has been used to register
try:
hasoneuserreg = len(query.equal_to('name', listofcommands[1]).descending('createAt').find())
except leancloud.LeanCloudError, e:
if e.code == 101: # 服务端对应的 Class 还没创建
hasoneuserreg = 0
else:
raise e
if hasoneuserreg != 0:
reply_message = 'Your name has already been used to registered!'
else:
# Reset the query conditions
query = Guys.query
# Check if the user is using another valid name to register, this statement uses the user's wechat
# openid to judge if the user has been registered, even he did not use his own name
try:
secondtimereg = len(
query.equal_to('openidfromuser', FromUserName).descending('createAt').find())
except leancloud.LeanCloudError, e:
if e.code == 101: # 服务端对应的 Class 还没创建
secondtimereg = 0
else:
raise e
if secondtimereg != 0:
reply_message = 'You have already registered!'
else:
# Create user record in the user database
guys = Guys()
guys.set('openidfromuser', FromUserName)
guys.set('openidtouser', ToUserName)
guys.set('name', listofcommands[1])
guys.save()
reply_message = listofcommands[1] + u', Happy to tell you Register Success!'
else:
# Error message, indicating that the user is not a valid user
reply_message = 'Your name is not in the list!'
# Encode the message and return to the wechat server to send back the user
response = make_response(
reply_template % (FromUserName, ToUserName, str(int(timeseconds)),
(reply_message).encode('utf-8')))
response.content_type = 'application/xml'
return response
# Checkin Checkout module
if 's' == listofcommands[0]:
hasnouser = False
# Check if the user has been registered!
Guys = leancloud.Object.extend('Guys')
query = Guys.query
try:
name = query.equal_to('openidfromuser', FromUserName).descending('createAt').find()[0].get('name')
except leancloud.LeanCloudError, e:
if e.code == 101: # 服务端对应的 Class 还没创建
hasnouser = True
else:
raise e
# If True, means the user has not been registered, then prompt he to register first
if hasnouser:
reply_message = 'Sorry, No such user has been registered! Please register first!([reg+yourname])'
response = make_response(
reply_template % (FromUserName, ToUserName, str(int(timeseconds)),
(reply_message)))
response.content_type = 'application/xml'
return response
# Else, add the checkin record in the database
Checkincheckout = leancloud.Object.extend('Checkincheckout')
query = Checkincheckout.query
# Fetch the time of {201x-xx-xx} formatted, to make a wild query about if there were checkin records within
# this day, which means one user cannot check in for a twice time within a single day
dateoftoday = str(datetime.now())[:11]
query.contains('intime', dateoftoday)
query.equal_to('name', name)
query.equal_to('type', types[0])
try:
if len(query.descending('createAt').find()) == 1:
reply_message = "Sorry! You can not check in for more than two times within a day! "
response = make_response(
reply_template % (FromUserName, ToUserName, str(int(timeseconds)),
(reply_message)))
response.content_type = 'application/xml'
return response
except leancloud.LeanCloudError, e:
if e.code == 101: # 服务端对应的 Class 还没创建
pass
else:
raise e
# Prepare for add record in the database for checkin
checkincheckout = Checkincheckout()
checkincheckout.set('name', name)
checkincheckout.set('type', types[0])
checkincheckout.set('intime', str(datetime.now()))
# if the count of single commands is less than 2, it means he did not write a comment, then make a common
# comment, else set the user's preferred comment
if len(listofcommands) == 2:
checkincheckout.set('statements', listofcommands[1])
else:
checkincheckout.set('statements', 'abbreviation')
checkincheckout.save()
reply_message = "Congratulations, %s! You've checked in successfully!" % name
response = make_response(
reply_template % (FromUserName, ToUserName, str(int(timeseconds)),
(reply_message)))
response.content_type = 'application/xml'
return response
# Check out module
if 'e' == listofcommands[0]:
# Judge if the user has registered
hasnouser = False
Guys = leancloud.Object.extend('Guys')
query = Guys.query
try:
name = query.equal_to('openidfromuser', FromUserName).descending('createAt').find()[0].get('name')
except leancloud.LeanCloudError, e:
if e.code == 101: # 服务端对应的 Class 还没创建
hasnouser = True
else:
raise e
if hasnouser:
reply_message = 'Sorry, No such user has been registered! Please register first!([reg+yourname])'
response = make_response(
reply_template % (FromUserName, ToUserName, str(int(timeseconds)),
(reply_message)))
response.content_type = 'application/xml'
return response
# Check if the user has checked out for more than one times within a day
dateoftoday = str(datetime.now())[:11]
Checkincheckout = leancloud.Object.extend('Checkincheckout')
query = Checkincheckout.query
query.contains('intime', dateoftoday)
query.equal_to('name', name)
query.equal_to('type', types[1])
try:
if len(query.descending('createAt').find()) == 1:
reply_message = "Sorry! You can not check out for more than two times within a day! "
response = make_response(
reply_template % (FromUserName, ToUserName, str(int(timeseconds)),
(reply_message)))
response.content_type = 'application/xml'
return response
except leancloud.LeanCloudError, e:
if e.code == 101: # 服务端对应的 Class 还没创建
pass
else:
raise e
# Check if the user has checked in, if not, prompt he to check in first, or add one check out record
checkincheckout = Checkincheckout()
query = Checkincheckout.query
query.contains('intime', dateoftoday)
query.equal_to('name', name)
query.equal_to('type', types[0])
try:
if len(query.descending('createAt').find()) == 1:
checkincheckout.set('name', name)
checkincheckout.set('type', types[1])
checkincheckout.set('intime', str(datetime.now()))
if len(listofcommands) == 2:
checkincheckout.set('statements', listofcommands[1])
else:
checkincheckout.set('statements', 'abbreviation')
checkincheckout.save()
reply_message = 'Congratulations, %s! You have been checked out successfully!' % name
response = make_response(
reply_template % (FromUserName, ToUserName, str(int(timeseconds)),
(reply_message)))
response.content_type = 'application/xml'
return response
else:
reply_message = 'Sorry, You have not been checked in! Please checkin first!([s+abbreviation])'
response = make_response(
reply_template % (FromUserName, ToUserName, str(int(timeseconds)),
(reply_message)))
response.content_type = 'application/xml'
return response
except leancloud.LeanCloudError, e:
if e.code == 101: # 服务端对应的 Class 还没创建
pass
else:
raise e
# If is the first time that subsribe to this mp, then show him the welcome message
if MessageType == 'event' and MsgEvent == 'subscribe':
response = make_response(
reply_template % (FromUserName, ToUserName, str(int(timeseconds)), subsribe_message))
response.content_type = 'application/xml'
return response
# Judge if the command is for code_pass addition and query
elif listofcommands[0] == 'a':
# index out of range bug to be fixed
# The command is not valid, return error message
if len(listofcommands) < 3:
rContent = error_message
response = make_response(reply_template % (FromUserName, ToUserName, str(int(timeseconds)), rContent))
response.content_type = 'application/xml'
return response
# The formatted command is "a+{5-digit}+{4-digit}", so we have to check if the parameters are such numbers
if len(listofcommands[1]) == 5 and len(listofcommands[2]) == 4:
flag = True
for i in listofcommands[1]:
# The command contains more than one non-digit character
if i not in tuple_numbers:
flag = False
for i in listofcommands[2]:
if i not in tuple_numbers:
flag = False
# If the command is valid, just call the api to add the code and pass
if flag == True:
myrequestjson = requests.get('http://code_pass.fairvalue.cc/api/v1.0/code_pass_setter/%s/%s' % (
listofcommands[1], listofcommands[2])).json()
# Get the status code to judge if the code has been added successfully
if myrequestjson['status'] == 205 or myrequestjson['status'] == 200:
rContent = '添加成功!\n'
else:
rContent = '添加失败!请重新尝试添加!\n'
else:
rContent = error_message
# The command is not the valid command, not in the format "a+{5-digit}+{4-digit}" then return the error message
else:
rContent = error_message
response = make_response(reply_template % (FromUserName, ToUserName, str(int(timeseconds)), rContent))
response.content_type = 'application/xml'
return response
else:
# use the Monster Api to get information
rContent = error_message
flag = True
for i in Content:
if i not in tuple_numbers:
flag = False
# If the command has only numbers and the length is 5
if flag == True and len(Content) == 5:
rContent = "5位数字是: " + Content + ", 4位数字可能是(按权值排名): \n"
# rContent = Content
myrequest = requests.get('http://code_pass.fairvalue.cc/api/v1.0/code_pass_query/%s' % Content)
if myrequest.status_code == 200:
reList = funcs.password_getter(myrequest.json());
if len(reList) == 0:
rContent = '没有查询到相关结果!';
else:
for item in reList:
rContent = rContent + (str(item)) + " "
response = make_response(reply_template % (FromUserName, ToUserName, str(int(timeseconds)), rContent))
response.content_type = 'application/xml'
return response
@app.route('/home')
def home():
return render_template('index.html')
@app.route('/time')
def time():
return str(datetime.now())
@app.route('/hello/')
@app.route('/hello/<name>')
def hello(name=None):
return render_template('hello.html', name=name)
# Rewrite and format the Monster Apis [GETTER]
@app.route('/api/v1.0/code_pass_query/<code>', methods=['GET'])
def get_code(code):
formatted_url = 'http://bike.littermonster.net/get.php?src=app&bike_code=%s'
r = requests.get(formatted_url % code)
if r.status_code == 200:
return make_response(r.content)
# Rewrite and format the Monster Apis [GETTER]
@app.route('/api/v1.0/code_pass_setter/<code>/<password>', methods=['GET'])
def set_code(code, password):
formatted_url = 'http://bike.littermonster.net/add.php?src=app&bike_password=%s&bike_code=%s'
r = requests.get(formatted_url % (password, code))
if r.status_code == 200:
return make_response(r.content)
@sockets.route('/echo')
def echo_socket(ws):
while True:
message = ws.receive()
ws.send(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment