-
-
Save MaskRay/3b5b3fcbccfcba3b8f29 to your computer and use it in GitHub Desktop.
wx.qq.com登录
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio, aiohttp | |
from argparse import ArgumentParser | |
from threading import Thread | |
from urllib.parse import urljoin | |
import json, logging, os, random, re, requests, signal, sys, tempfile, time | |
import xml.etree.ElementTree as ET | |
import aiohttp | |
from ipdb import set_trace as bp | |
logger = logging.getLogger('wx') | |
USER_AGENT = 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36' | |
def sigint(): | |
for task in asyncio.Task.all_tasks(): | |
task.cancel() | |
sys.exit(3) | |
class ExceptionHook(object): | |
instance = None | |
def __call__(self, *args, **kwargs): | |
if self.instance is None: | |
from IPython.core import ultratb | |
self.instance = ultratb.VerboseTB(call_pdb=True) | |
return self.instance(*args, **kwargs) | |
def debug(msg, *args): | |
logger.debug(msg, *args) | |
def info(msg, *args): | |
logger.info(msg, *args) | |
def warning(msg, *args): | |
logger.warning(msg, *args) | |
def error(msg, *args): | |
logger.error(msg, *args) | |
class WebWeChat: | |
# "ContactList" field | |
CONTACTFLAG_SNSBLACKLISTCONTACT = 256 | |
# "EmojiCodeConv" field | |
def __init__(self): | |
self.closed = False | |
async def start(self, loop): | |
self.loop = loop | |
self.session = aiohttp.ClientSession(headers={'User-Agent': USER_AGENT}) | |
await self.authenticate() | |
task = loop.create_task(self.heartbeat()) | |
await self.initContact() | |
await self.postTextMessage('filehelper', 'meow') | |
await task | |
def stop(self): | |
self.session.close() | |
self.closed = True | |
def get(self, url, **kwargs): | |
return self.session.get(url, **kwargs) | |
def post(self, url, data=None, json=None, **kwargs): | |
if json is not None: | |
kwargs['headers'] = {'Content-Type': 'application/json;charset=UTF-8'} | |
data = sys.modules['json'].dumps(json) | |
return self.session.post(url, data=data, **kwargs) | |
#@API_jsLogin /API_jsLogin: | |
#@getUUID /getUUID: function() { | |
async def getUUID(self): | |
debug('getUUID') | |
async with self.get('https://login.weixin.qq.com/jslogin', params={ | |
'appid': 'wx782c26e4c19acffb', 'fun': 'new', 'lang': 'zh_CN', | |
'_': int(time.time()*1000) | |
}) as r: | |
text = await r.text() | |
code, uuid = re.search(r'window.QRLogin.code = (\d+); window.QRLogin.uuid = "(.+?)"', text).groups() | |
assert int(code) == 200, 'Failed to get UUID' | |
return uuid | |
#@qrcodeUrl /e.qrcodeUrl = | |
async def qrcode(self, uuid): | |
debug('qrcode') | |
async with self.get('https://login.weixin.qq.com/qrcode/'+uuid, params={ | |
't': 'webwx', | |
'_': int(time.time()*1000) | |
}) as r: | |
content = await r.read() | |
self.tip = 1 | |
with tempfile.NamedTemporaryFile() as f: | |
f.write(content) | |
os.spawnlp(os.P_NOWAIT, 'xdg-open', 'xdg-open', f.name) | |
print('请使用微信扫描二维码以登录') | |
# wait 1 second for xdg-open to open the image viewer | |
await asyncio.sleep(1, loop=self.loop) | |
#@checkLogin /checkLogin: | |
async def checkLogin(self, uuid): | |
debug('checkLogin') | |
# `=` in uuid should not be escaped | |
params = '&'.join('{}={}'.format(k, v) for k, v in { | |
'loginicon': 'true', | |
'uuid': uuid, | |
'tip': self.tip, | |
'r': ~int(time.time()*1000)%2**32 | |
}.items()) | |
async with self.get('https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login', | |
params=params) as r: | |
text = await r.text() | |
code = int(re.search(r'window.code=(\d+)', text).group(1)) | |
if code == 201: | |
self.tip = 0 | |
info('成功扫描,请在手机上点击确认以登录') | |
return code | |
elif code == 408: | |
self.tip = 0 | |
return code | |
assert code == 200 | |
self.redirect_uri = re.search(r'window.redirect_uri="(.+?)"', text).group(1) | |
self.base_uri = self.redirect_uri[:self.redirect_uri.rfind('/')] | |
# push_uri与base_uri对应关系(排名分先后)(就是这么奇葩..) | |
services = [ | |
('wx2.qq.com', 'webpush2.weixin.qq.com'), | |
('qq.com', 'webpush.weixin.qq.com'), | |
('web1.wechat.com', 'webpush1.wechat.com'), | |
('web2.wechat.com', 'webpush2.wechat.com'), | |
('wechat.com', 'webpush.wechat.com'), | |
('web1.wechatapp.com', 'webpush1.wechatapp.com'), | |
] | |
self.push_uri = self.base_uri | |
for (searchUrl, pushUrl) in services: | |
if self.base_uri.find(searchUrl) >= 0: | |
self.push_uri = 'https://%s/cgi-bin/mmwebwx-bin' % pushUrl | |
break | |
return 200 | |
#@newLoginPage /t.newLoginPage | |
async def newLoginPage(self): | |
debug('newLoginPage') | |
async with self.get(self.redirect_uri, params={ | |
'fun': 'new', 'version': 'v2', '_': int(time.time()*1000) | |
}) as r: | |
root = ET.fromstring(await r.text()) | |
ret = root.find('ret').text | |
self.pass_ticket = root.find('pass_ticket').text | |
self.uin = int(root.find('wxuin').text) | |
self.sid = root.find('wxsid').text | |
self.skey = root.find('skey').text | |
self.device_id = 'e{:015}'.format(random.randint(0, 1e15-1)) | |
self.BaseRequest = { | |
'Uin': self.uin, | |
'Sid': self.sid, | |
'Skey': self.skey, | |
'DeviceID': self.device_id, | |
} | |
#@init /init: function | |
async def webwxinit(self): | |
debug('webwxinit') | |
async with self.post(self.base_uri+'/webwxinit', json={ | |
'BaseRequest': self.BaseRequest | |
}) as r: | |
json = await r.json() | |
self.Me = json['User'] | |
#contacts = json['ContactList'] | |
self.SyncKey = json['SyncKey'] | |
#@initContact /initContact: function | |
async def initContact(self): | |
debug('initContact') | |
async with self.get(self.base_uri+'/webwxgetcontact', params={ | |
'r': int(time.time()*1000), | |
'seq': 0, | |
'pass_ticket': self.pass_ticket, | |
'skey': self.skey, | |
}) as r: | |
json = await r.json(encoding='utf-8') | |
# https://github.com/0x5e/wechat-deleted-friends/blob/master/wdf.py SpecialUsers | |
specials = 'newsapp,fmessage,filehelper,weibo,qqmail,fmessage,tmessage,qmessage,qqsync,floatbottle,lbsapp,shakeapp,medianote,qqfriend,readerapp,blogapp,facebookapp,masssendapp,meishiapp,feedsapp,voip,blogappweixin,weixin,brandsessionholder,weixinreminder,wxid_novlwrv3lqwv11,gh_22b87fa7cb3c,officialaccounts,notification_messages,wxid_novlwrv3lqwv11,gh_22b87fa7cb3c,wxitil,userexperience_alarm,notification_messages'.split(',') | |
members = [] | |
for member in json['MemberList']: | |
if member['VerifyFlag'] & 8 != 0: # 公众号/服务号 | |
pass | |
elif member['UserName'] in specials: # 特殊账号 | |
pass | |
elif member['UserName'].find('@@') != -1: # 群聊 | |
pass | |
elif member['UserName'] == self.Me['UserName']: # 自己 | |
pass | |
else: | |
members.append(member) | |
#print(members) | |
for member in members: | |
print(member['RemarkName']) | |
def syncKey(self): | |
return '|'.join('%s_%s' % (item['Key'], item['Val']) | |
for item in self.SyncKey['List']) | |
#@syncCheck /syncCheck: function | |
async def syncCheck(self): | |
async with self.get(self.push_uri+'/synccheck', params={ | |
'r': int(time.time()*1000), | |
'skey': self.skey, | |
'sid': self.sid, | |
'uin': self.uin, | |
'deviceid': self.device_id, | |
'synckey': self.syncKey(), | |
}) as r: | |
code, selector = re.search(r'window.synccheck={retcode:"(\d+)",selector:"(\d+)"}', | |
await r.text(encoding='utf-8')).groups() | |
return int(selector) | |
async def sync(self): | |
async with self.post(self.push_uri+'/synccheck', params={ | |
'sid': self.sid, | |
'skey': self.skey, | |
}, json={ | |
'BaseRequest': self.BaseRequest, | |
'SyncKey': self.SyncKey, | |
'rr': ~int(time.time()*1000)%2**32, | |
}) as r: | |
json = await r.json() | |
await json['BaseResponse']['Ret'] == 0 | |
for msg in json['AddMsgList']: | |
print('msg', msg['MsgType']) | |
async def heartbeat(self): | |
try: | |
while not self.closed: | |
if await self.syncCheck() != 0: | |
sync() | |
await asyncio.sleep(10, loop=self.loop) | |
except: | |
pass | |
async def authenticate(self): | |
uuid = await self.getUUID() | |
await self.qrcode(uuid) | |
await asyncio.sleep(1, loop=self.loop) | |
while await self.checkLogin(uuid) != 200: | |
await asyncio.sleep(1, loop=self.loop) | |
await self.newLoginPage() | |
await self.webwxinit() | |
async def postTextMessage(self, to, msg): | |
local_id = int(time.time()*1000)*1000+random.randint(0, 999) | |
json = { | |
'BaseRequest': self.BaseRequest, | |
'Msg': { | |
'ClientMsgId': local_id, | |
'Content': msg, | |
'FromUserName': self.Me['UserName'], | |
'LocalId': local_id, | |
'ToUserName': to, | |
'Type': 1, | |
} | |
} | |
async with self.post(self.push_uri+'/webwxsendmsg', params={ | |
'pass_ticket': self.pass_ticket, | |
}, json={ | |
'BaseRequest': self.BaseRequest, | |
'Msg': { | |
'ClientMsgId': local_id, | |
'Content': msg, | |
'FromUserName': self.Me['UserName'], | |
'LocalId': local_id, | |
'ToUserName': to, | |
'Type': 1, | |
} | |
}) as r: | |
json = await r.json(encoding='utf-8') | |
print(json) | |
bp() | |
def getvoice(self): | |
return 'https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetvoice?msgid=1841843421481029619&skey=@crypt_e6b01c56_65f3a5c358c4602f5273cfd665b75f71' | |
# Content-Type:audio/mp3 | |
def main(): | |
ap = ArgumentParser() | |
ap.add_argument('-q', '--quiet', action='store_const', const=logging.WARN, dest='loglevel') | |
ap.add_argument('-v', '--verbose', action='store_const', const=logging.DEBUG, dest='loglevel') | |
ap.add_argument('-t', '--tags', action='store_true') | |
ap.add_argument('-d', '--debug', action='store_true') | |
args = ap.parse_args() | |
if args.debug: | |
sys.excepthook = ExceptionHook() | |
loop = asyncio.get_event_loop() | |
loop.add_signal_handler(signal.SIGINT, sigint) | |
if args.tags: | |
# TODO create tags | |
with open(__file__) as f: | |
regex = re.compile(r'#@(\w+) +(/.*)') | |
tags = [] | |
for line in f.read().splitlines(): | |
m = regex.search(line) | |
if m: | |
tags.append(m.groups()) | |
tags.sort() | |
for name, address in tags: | |
print(name, 'wx.js', address, sep='\t') | |
else: | |
logging.basicConfig(level=args.loglevel or logging.INFO, format='%(asctime)s:%(levelname)s: %(message)s') | |
wx = WebWeChat() | |
loop.run_until_complete(wx.start(loop)) | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment