Skip to content

Instantly share code, notes, and snippets.

@leetking
Created August 3, 2019 13:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save leetking/7fc64ee800bb6f1170f3bbf1b915f5fa to your computer and use it in GitHub Desktop.
Save leetking/7fc64ee800bb6f1170f3bbf1b915f5fa to your computer and use it in GitHub Desktop.
a little python script to scrape chengdu metro infomation
#!/usr/bin/env python
import hashlib
import sqlite3
import os
from functools import reduce
import requests
class DbCache:
# TODO implement get data by a range
def __init__(self, dbpath):
directory = os.path.dirname(dbpath)
if not os.path.exists(directory):
os.makedirs(directory)
self._db = sqlite3.connect(dbpath)
cur = self._db.cursor()
cur.execute("""create table if not exists `cdmetro`(
`time` integer not null unique,
`data` blob not null,
primary key(`time`))""")
self._db.commit()
cur.close()
def __getitem__(self, time):
import gzip
cur = self._db.cursor()
cur.execute("select `data` from `cdmetro` where `time` = ?", (time,))
data = cur.fetchone()
cur.close()
return gzip.decompress(data[0]).decode() if data else ""
def __setitem__(self, time, data):
import gzip
cur = self._db.cursor()
cur.execute("insert into `cdmetro`(`time`,`data`) values(?, ?)",
(time, gzip.compress(data.encode())))
self._db.commit()
cur.close()
def close(self):
self._db.close()
def get_data():
from time import time
HOST = "http://webapp.cocc.cdmetro.cn:10080"
URL = "/api/realDmyjdSearch"
t_out = 7 # unit: second
headers = {
'platformType': 'android',
'Accept-APIVersion': '1.0',
'mobileBrand': 'OnePlus',
'appVersionNo': '77',
'tokenId': '',
'userId': '',
'sign': 'p@ssw0rd',
'platformVersion': '8.0.0',
'mobileStandard': 'WIFI',
'callTime': str(int(time()*1000)),
}
def generate_sign(headers):
md5 = hashlib.md5()
data = reduce(lambda x,y: y if x is '' else x+'&'+y,
[key+'='+headers[key] for key in sorted(headers)], '')
md5.update(data.encode())
return md5.hexdigest()
headers['sign'] = generate_sign(headers)
headers['Content-Type'] = 'application/x-www-form-urlencoded'
headers['User-Agent'] = 'okhttp/3.4.1'
try:
res = requests.post(HOST+URL, headers=headers, timeout=t_out)
except requests.exceptions.Timeout:
print("request timeout.")
return ''
return res.text
def main():
from time import sleep, time
import datetime
combine = datetime.datetime.combine
DBPATH = "./cdmetro.db3"
start = datetime.time(6, 5) # 6:05
end = datetime.time(23, 30) # 23:30
gap = 60 # unit: second
while True:
s = combine(datetime.date.today(), start)
e = combine(datetime.date.today(), end)
now = datetime.datetime.now()
# the train is running
if s <= now <= e:
db = DbCache(DBPATH)
tm = int(time()*1000)
data = get_data()
if data is not '':
db[tm] = data
print("get a datum.")
db.close()
now2 = datetime.datetime.now()
sleep(gap - (now2-now).total_seconds())
print("Oops, I have to quit!")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment