Create a gist now

Instantly share code, notes, and snippets.

@MaskRay /a.py
Created Feb 19, 2016

What would you like to do?
wx.qq.com登录
#!/usr/bin/env python3
import asyncio, aiohttp
from argparse import ArgumentParser
from threading import Thread
from urllib.parse import urljoin
import json, logging, os, random, re, requests, signal, sys, tempfile, time
import xml.etree.ElementTree as ET
import aiohttp
from ipdb import set_trace as bp
logger = logging.getLogger('wx')
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36'
def sigint():
for task in asyncio.Task.all_tasks():
task.cancel()
sys.exit(3)
class ExceptionHook(object):
instance = None
def __call__(self, *args, **kwargs):
if self.instance is None:
from IPython.core import ultratb
self.instance = ultratb.VerboseTB(call_pdb=True)
return self.instance(*args, **kwargs)
def debug(msg, *args):
logger.debug(msg, *args)
def info(msg, *args):
logger.info(msg, *args)
def warning(msg, *args):
logger.warning(msg, *args)
def error(msg, *args):
logger.error(msg, *args)
class WebWeChat:
# "ContactList" field
CONTACTFLAG_SNSBLACKLISTCONTACT = 256
# "EmojiCodeConv" field
def __init__(self):
self.closed = False
async def start(self, loop):
self.loop = loop
self.session = aiohttp.ClientSession(headers={'User-Agent': USER_AGENT})
await self.authenticate()
task = loop.create_task(self.heartbeat())
await self.initContact()
await self.postTextMessage('filehelper', 'meow')
await task
def stop(self):
self.session.close()
self.closed = True
def get(self, url, **kwargs):
return self.session.get(url, **kwargs)
def post(self, url, data=None, json=None, **kwargs):
if json is not None:
kwargs['headers'] = {'Content-Type': 'application/json;charset=UTF-8'}
data = sys.modules['json'].dumps(json)
return self.session.post(url, data=data, **kwargs)
#@API_jsLogin /API_jsLogin:
#@getUUID /getUUID: function() {
async def getUUID(self):
debug('getUUID')
async with self.get('https://login.weixin.qq.com/jslogin', params={
'appid': 'wx782c26e4c19acffb', 'fun': 'new', 'lang': 'zh_CN',
'_': int(time.time()*1000)
}) as r:
text = await r.text()
code, uuid = re.search(r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(.+?)"', text).groups()
assert int(code) == 200, 'Failed to get UUID'
return uuid
#@qrcodeUrl /e.qrcodeUrl =
async def qrcode(self, uuid):
debug('qrcode')
async with self.get('https://login.weixin.qq.com/qrcode/'+uuid, params={
't': 'webwx',
'_': int(time.time()*1000)
}) as r:
content = await r.read()
self.tip = 1
with tempfile.NamedTemporaryFile() as f:
f.write(content)
os.spawnlp(os.P_NOWAIT, 'xdg-open', 'xdg-open', f.name)
print('请使用微信扫描二维码以登录')
# wait 1 second for xdg-open to open the image viewer
await asyncio.sleep(1, loop=self.loop)
#@checkLogin /checkLogin:
async def checkLogin(self, uuid):
debug('checkLogin')
# `=` in uuid should not be escaped
params = '&'.join('{}={}'.format(k, v) for k, v in {
'loginicon': 'true',
'uuid': uuid,
'tip': self.tip,
'r': ~int(time.time()*1000)%2**32
}.items())
async with self.get('https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login',
params=params) as r:
text = await r.text()
code = int(re.search(r'window.code=(\d+)', text).group(1))
if code == 201:
self.tip = 0
info('成功扫描,请在手机上点击确认以登录')
return code
elif code == 408:
self.tip = 0
return code
assert code == 200
self.redirect_uri = re.search(r'window.redirect_uri="(.+?)"', text).group(1)
self.base_uri = self.redirect_uri[:self.redirect_uri.rfind('/')]
# push_uri与base_uri对应关系(排名分先后)(就是这么奇葩..)
services = [
('wx2.qq.com', 'webpush2.weixin.qq.com'),
('qq.com', 'webpush.weixin.qq.com'),
('web1.wechat.com', 'webpush1.wechat.com'),
('web2.wechat.com', 'webpush2.wechat.com'),
('wechat.com', 'webpush.wechat.com'),
('web1.wechatapp.com', 'webpush1.wechatapp.com'),
]
self.push_uri = self.base_uri
for (searchUrl, pushUrl) in services:
if self.base_uri.find(searchUrl) >= 0:
self.push_uri = 'https://%s/cgi-bin/mmwebwx-bin' % pushUrl
break
return 200
#@newLoginPage /t.newLoginPage
async def newLoginPage(self):
debug('newLoginPage')
async with self.get(self.redirect_uri, params={
'fun': 'new', 'version': 'v2', '_': int(time.time()*1000)
}) as r:
root = ET.fromstring(await r.text())
ret = root.find('ret').text
self.pass_ticket = root.find('pass_ticket').text
self.uin = int(root.find('wxuin').text)
self.sid = root.find('wxsid').text
self.skey = root.find('skey').text
self.device_id = 'e{:015}'.format(random.randint(0, 1e15-1))
self.BaseRequest = {
'Uin': self.uin,
'Sid': self.sid,
'Skey': self.skey,
'DeviceID': self.device_id,
}
#@init /init: function
async def webwxinit(self):
debug('webwxinit')
async with self.post(self.base_uri+'/webwxinit', json={
'BaseRequest': self.BaseRequest
}) as r:
json = await r.json()
self.Me = json['User']
#contacts = json['ContactList']
self.SyncKey = json['SyncKey']
#@initContact /initContact: function
async def initContact(self):
debug('initContact')
async with self.get(self.base_uri+'/webwxgetcontact', params={
'r': int(time.time()*1000),
'seq': 0,
'pass_ticket': self.pass_ticket,
'skey': self.skey,
}) as r:
json = await r.json(encoding='utf-8')
# https://github.com/0x5e/wechat-deleted-friends/blob/master/wdf.py SpecialUsers
specials = 'newsapp,fmessage,filehelper,weibo,qqmail,fmessage,tmessage,qmessage,qqsync,floatbottle,lbsapp,shakeapp,medianote,qqfriend,readerapp,blogapp,facebookapp,masssendapp,meishiapp,feedsapp,voip,blogappweixin,weixin,brandsessionholder,weixinreminder,wxid_novlwrv3lqwv11,gh_22b87fa7cb3c,officialaccounts,notification_messages,wxid_novlwrv3lqwv11,gh_22b87fa7cb3c,wxitil,userexperience_alarm,notification_messages'.split(',')
members = []
for member in json['MemberList']:
if member['VerifyFlag'] & 8 != 0: # 公众号/服务号
pass
elif member['UserName'] in specials: # 特殊账号
pass
elif member['UserName'].find('@@') != -1: # 群聊
pass
elif member['UserName'] == self.Me['UserName']: # 自己
pass
else:
members.append(member)
#print(members)
for member in members:
print(member['RemarkName'])
def syncKey(self):
return '|'.join('%s_%s' % (item['Key'], item['Val'])
for item in self.SyncKey['List'])
#@syncCheck /syncCheck: function
async def syncCheck(self):
async with self.get(self.push_uri+'/synccheck', params={
'r': int(time.time()*1000),
'skey': self.skey,
'sid': self.sid,
'uin': self.uin,
'deviceid': self.device_id,
'synckey': self.syncKey(),
}) as r:
code, selector = re.search(r'window.synccheck={retcode:"(\d+)",selector:"(\d+)"}',
await r.text(encoding='utf-8')).groups()
return int(selector)
async def sync(self):
async with self.post(self.push_uri+'/synccheck', params={
'sid': self.sid,
'skey': self.skey,
}, json={
'BaseRequest': self.BaseRequest,
'SyncKey': self.SyncKey,
'rr': ~int(time.time()*1000)%2**32,
}) as r:
json = await r.json()
await json['BaseResponse']['Ret'] == 0
for msg in json['AddMsgList']:
print('msg', msg['MsgType'])
async def heartbeat(self):
try:
while not self.closed:
if await self.syncCheck() != 0:
sync()
await asyncio.sleep(10, loop=self.loop)
except:
pass
async def authenticate(self):
uuid = await self.getUUID()
await self.qrcode(uuid)
await asyncio.sleep(1, loop=self.loop)
while await self.checkLogin(uuid) != 200:
await asyncio.sleep(1, loop=self.loop)
await self.newLoginPage()
await self.webwxinit()
async def postTextMessage(self, to, msg):
local_id = int(time.time()*1000)*1000+random.randint(0, 999)
json = {
'BaseRequest': self.BaseRequest,
'Msg': {
'ClientMsgId': local_id,
'Content': msg,
'FromUserName': self.Me['UserName'],
'LocalId': local_id,
'ToUserName': to,
'Type': 1,
}
}
async with self.post(self.push_uri+'/webwxsendmsg', params={
'pass_ticket': self.pass_ticket,
}, json={
'BaseRequest': self.BaseRequest,
'Msg': {
'ClientMsgId': local_id,
'Content': msg,
'FromUserName': self.Me['UserName'],
'LocalId': local_id,
'ToUserName': to,
'Type': 1,
}
}) as r:
json = await r.json(encoding='utf-8')
print(json)
bp()
def getvoice(self):
return 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetvoice?msgid=1841843421481029619&skey=@crypt_e6b01c56_65f3a5c358c4602f5273cfd665b75f71'
# Content-Type:audio/mp3
def main():
ap = ArgumentParser()
ap.add_argument('-q', '--quiet', action='store_const', const=logging.WARN, dest='loglevel')
ap.add_argument('-v', '--verbose', action='store_const', const=logging.DEBUG, dest='loglevel')
ap.add_argument('-t', '--tags', action='store_true')
ap.add_argument('-d', '--debug', action='store_true')
args = ap.parse_args()
if args.debug:
sys.excepthook = ExceptionHook()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, sigint)
if args.tags:
# TODO create tags
with open(__file__) as f:
regex = re.compile(r'#@(\w+) +(/.*)')
tags = []
for line in f.read().splitlines():
m = regex.search(line)
if m:
tags.append(m.groups())
tags.sort()
for name, address in tags:
print(name, 'wx.js', address, sep='\t')
else:
logging.basicConfig(level=args.loglevel or logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s')
wx = WebWeChat()
loop.run_until_complete(wx.start(loop))
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment