Skip to content

Instantly share code, notes, and snippets.

@dodola
Created May 11, 2014 01:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dodola/24721c8794e172936b22 to your computer and use it in GitHub Desktop.
Save dodola/24721c8794e172936b22 to your computer and use it in GitHub Desktop.
从google play 下载APK包
__author__ = 'dodola'
# encoding: utf-8
import time
import urllib.request
import threading
import contextlib
import queue
import string
import shutil
import re
import unicodedata
import gzip
import base64
import codecs
import tempfile
import sys
import array
import binascii
from urllib.error import URLError, HTTPError, ContentTooShortError
import os
from urllib.parse import (
urlparse, urlsplit, urljoin, unwrap, quote, unquote,
splittype, splithost, splitport, splituser, splitpasswd,
splitattr, splitquery, splitvalue, splittag, to_bytes, urlunparse)
API_URL = "https://android.clients.google.com/market/api/ApiRequest";
FDFE_URL_BASE = "https://android.clients.google.com/fdfe/";
SAVE_PATH = "D:/apks/"
def myurlretrieve(url, filename=None, marketda=None, reporthook=None, data=None):
_url_tempfiles = []
url_type, path = splittype(url)
# user_agent = 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)'
headers = {'Cookie':"MarketDA=" + marketda}
req = urllib.request.Request(url, data, headers);
with contextlib.closing(urllib.request.urlopen(req)) as fp:
headers = fp.info()
if url_type == "file" and not filename:
return os.path.normpath(path), headers
# Handle temporary file setup.
if filename:
tfp = open(filename, 'wb')
else:
tfp = tempfile.NamedTemporaryFile(delete=False)
filename = tfp.name
_url_tempfiles.append(filename)
with tfp:
result = filename, headers
bs = 1024 * 8
size = -1
read = 0
blocknum = 0
if "content-length" in headers:
size = int(headers["Content-Length"])
if reporthook:
reporthook(blocknum, 0, size)
while True:
block = fp.read(bs)
if not block:
break
read += len(block)
tfp.write(block)
blocknum += 1
if reporthook:
reporthook(blocknum, len(block), size)
if size >= 0 and read < size:
raise ContentTooShortError(
"retrieval incomplete: got only %i out of %i bytes"
% (read, size), result)
return result
def serializeInt32(num):
data = []
for i in range(0, 5):
elm = num % 128
num = num >> 7
if(num):
elm += 128
data.append(elm)
if(num == 0):
break
return data
def serializeData(arr, value, data_type):
new_data = []
if data_type == "string":
new_data.extend(serializeInt32(len(value)))
new_data.extend(value.encode("utf_8"))
elif data_type == "int32":
new_data.extend(serializeInt32(value))
elif data_type == "bool":
new_data.extend([1 if value else 0])
arr.extend(new_data)
return arr
def login(email, password, deviceId):
ACCOUNT_TYPE_HOSTED_OR_GOOGLE = "HOSTED_OR_GOOGLE";
URL_LOGIN = "https://www.google.com/accounts/ClientLogin"
LOGIN_SERVICE = "androidsecure";
params = {"Email": email, "Passwd": password, "service": LOGIN_SERVICE, "accountType": ACCOUNT_TYPE_HOSTED_OR_GOOGLE}
postparams = urllib.parse.urlencode(params).encode(encoding='utf_8', errors='strict');
# print(postparams)
req = urllib.request.Request(URL_LOGIN, postparams);
res = urllib.request.urlopen(req)
pattern = re.compile(r'Auth=([a-z0-9=_\-]+)')
requestContent = str(res.read())
print(requestContent)
match = pattern.match(requestContent)
print(match)
if match:
return match.group()
def generateAssetRequest(options):
FIELD_AUTHTOKEN = 0;
FIELD_ISSECURE = 2;
FIELD_SDKVERSION = 4;
FIELD_DEVICEID = 6;
FIELD_DEVICEANDSDKVERSION = 8;
FIELD_LOCALE = 10;
FIELD_COUNTRY = 12;
FIELD_OPERATORALPHA = 14;
FIELD_SIMOPERATORALPHA = 16;
FIELD_OPERATORNUMERIC = 18;
FIELD_SIMOPERATORNUMERIC = 20;
FIELD_PACKAGENAME_LENGTH = 22;
FIELD_PACKAGENAME = 24;
desc = [FIELD_AUTHTOKEN, [16], FIELD_ISSECURE, [24], \
FIELD_SDKVERSION, [34], FIELD_DEVICEID, [42], \
FIELD_DEVICEANDSDKVERSION, [50], FIELD_LOCALE, [58], \
FIELD_COUNTRY, [66], FIELD_OPERATORALPHA, [74], \
FIELD_SIMOPERATORALPHA, [82], FIELD_OPERATORNUMERIC, [90], \
FIELD_SIMOPERATORNUMERIC, [19, 82], \
FIELD_PACKAGENAME_LENGTH, [10], FIELD_PACKAGENAME, [20]]
outdata = []
simOperatorLength = 0;
for i in range(0, len(desc)):
if isinstance(desc[i], list):
outdata.extend(desc[i])
continue
if desc[i] == FIELD_AUTHTOKEN:
outdata = serializeData(outdata, options["authToken"], "string")
elif desc[i] == FIELD_ISSECURE:
outdata = serializeData(outdata, options["isSecure"], "bool")
elif desc[i] == FIELD_SDKVERSION:
outdata = serializeData(outdata, options["sdkVersion"], "int32")
elif desc[i] == FIELD_DEVICEID:
outdata = serializeData(outdata, options["deviceId"], "string")
elif desc[i] == FIELD_DEVICEANDSDKVERSION:
outdata = serializeData(outdata, options["deviceAndSdkVersion"], "string")
elif desc[i] == FIELD_LOCALE:
outdata = serializeData(outdata, options["locale"], "string")
elif desc[i] == FIELD_COUNTRY:
outdata = serializeData(outdata, options["country"], "string")
elif desc[i] == FIELD_OPERATORALPHA:
outdata = serializeData(outdata, options["operatorAlpha"], "string")
elif desc[i] == FIELD_SIMOPERATORALPHA:
outdata = serializeData(outdata, options["simOperatorAlpha"], "string")
elif desc[i] == FIELD_OPERATORNUMERIC:
outdata = serializeData(outdata, options["operatorNumeric"], "string")
elif desc[i] == FIELD_SIMOPERATORNUMERIC:
outdata = serializeData(outdata, options["simOperatorNumeric"], "string")
simOperatorLength = len(outdata) + 1
elif desc[i] == FIELD_PACKAGENAME_LENGTH:
outdata.extend(serializeInt32(len(options["packageName"]) + 2))
elif desc[i] == FIELD_PACKAGENAME:
outdata = serializeData(outdata, options["packageName"], "string")
vdata = [10]
vdata.extend(serializeInt32(simOperatorLength))
vdata.extend([10])
vdata.extend(outdata)
idata = array.array('B', vdata).tostring()
# print(idata)
result = base64.urlsafe_b64encode(idata)
return result.decode("utf-8")
def processAsset(asset_query_base64, packageName):
# 先请求一个接口,获取到Cookie
payload = "version=2&request=%s" % (asset_query_base64);
# payload="version=2&request=CvECCqACRFFBQUFNUUFBQUJvbDdKYUtwTkZXZ2VRX0hDd2pNQTlkLTFiQURKaVVGRTYwTVM4TXp6NzE5eF9BY2trbUlBTXNyclo4bHc0U1gydVZmcUFCUzAzdUFLQTg1dFAxMHdkSU5nQjJuWmp5dzB3WFhUUmJ6MzR2cTEyZDRuTkRxcmhKcS01OHN6SC1GLWI1dGlZcngyYmMzY3d2ZmlCTG4xYVVUQ2tLUGU2SnRXc2dTMmcwbzNSV1VZR251QTZHd3lmYkNKeWhRTlBfTUdHV0NNNXExZkpSclYxeV9YUkJHTDZnTXBBckdSLUJweUl4S0lJTUs4NVF0eWFLWm9IcUZSRUIyQVNZSVZmS0h4SHRqTTRrT1VYcDZLSE9GSmttV3A2EAEYs896IhAzM0I4NEEzREUyOTYxN0NFKgpwYXNzaW9uOjE1MgJlbjoCdXNCCFQtTW9iaWxlSghULU1vYmlsZVIFMzEwMjBaBTMxMDIwE1ITChFjb20ubWFudWVsbWFseS5obhQ="
# print(payload)
headers = {"Content-Type":"application/x-www-form-urlencoded"}
req = urllib.request.Request(API_URL, payload.encode(), headers)
res = urllib.request.urlopen(req)
# print(res.read())
data = gzip.decompress(res.read())
undata = str(data, "utf-8", errors='ignore')
print(undata)
downurl = re.compile(r'https?:\/\/[^:]+').findall(undata)[0]
marketda = re.compile(r'MarketDAr..(\d+)').findall(undata)[0]
print(downurl)
print(marketda)
filename = packageName.replace("\n", "") + ".apk";
# 开始下载
myurlretrieve(downurl, SAVE_PATH + filename, marketda)
def validateTitle(title):
rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/\:*?"<>|'
new_title = re.sub(rstr, "", title)
return new_title
def getOptions(packname):
options = {"authToken":"DQAAAMAAAADVbtSNxdGLL2_oW3gy_yQVCUSn3Rc_u3qJdpd40VPq15JDUv8_uH7uptE9AH8PgN1AHnT9H7cZ0J1OOMzOmlHe4s0LLO2MuuvuFXqb5BfaB5yVqCMegUxqAUTktgrw3My2MIKpJZgs5MmlhIc1ubAK-Ib1bVsAZZ5UxN2Qxzch8K5LcQxeW9gvlI-1kG1u67OaR28E4DhZOp3VyAYF0iaAa8XA0AYh1Zd_FFCzUnSNP5acQ0HKC2aiw_owaVIuRJM", \
"isSecure":True, \
"sdkVersion":2009011, \
"deviceId":"33B84A3DE29617CE", \
"deviceAndSdkVersion":"passion:15", \
"locale":"en", \
"country":"us", \
"operatorAlpha":"T-Mobile", \
"simOperatorAlpha":"T-Mobile", \
"operatorNumeric":"31020", \
"simOperatorNumeric":"31020", \
"packageName":packname}
return options
# print(generateAssetRequest(options))
# processAsset(str(generateAssetRequest(options)),"com.manuelmaly.hn")
def startDownload():
# 从txt中读取列表
f = open("d:/apks.txt")
for line in f:
line = line.replace("\n", "")
print("下载:" + line)
options = getOptions(line)
try:
processAsset(str(generateAssetRequest(options)), line)
except Exception as ex:
print(line + "下载失败")
print(ex)
# startDownload()
print(sys.argv)
if len(sys.argv) > 1:
packageName = sys.argv[1]
options = getOptions(packageName)
processAsset(str(generateAssetRequest(options)), packageName)
else:
print("输入包名")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment