Skip to content

Instantly share code, notes, and snippets.

@mckelvin
Created August 24, 2017 07:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mckelvin/865c337c394e247790bc3ab92bac877f to your computer and use it in GitHub Desktop.
Save mckelvin/865c337c394e247790bc3ab92bac877f to your computer and use it in GitHub Desktop.
# coding: utf-8
# 从中国期货市场监控中心爬取数据,并计算净值
# 使用 tesserocr 解析验证码
import os
import json
import time
import datetime
import logging
import cStringIO as StringIO
from PIL import Image, ImageFilter, ImageOps, ImageEnhance, ImageChops
import tesserocr
import requests
VERI_CODE_LEN = 6
logger = logging.getLogger(__name__)
def ocr_image(image):
image = ImageOps.invert(image)
image = image.convert('L')
for i in xrange(image.size[0]):
for j in xrange(image.size[1]):
pixel = image.getpixel((i, j))
if pixel < 70:
pixel = 0
else:
pixel = 255
image.putpixel((i, j), pixel)
background = Image.new(
image.mode,
image.size,
image.getpixel((0, 0)),
)
diff = ImageChops.difference(image, background)
left, right, width, height = diff.getbbox()
image = image.crop((left, right, width, height))
with tesserocr.PyTessBaseAPI(
psm=tesserocr.PSM.SINGLE_LINE,
) as api:
api.SetVariable(
"tessedit_char_whitelist",
"0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ",
)
api.SetImage(image)
return api.GetUTF8Text().strip().replace(" ", "")
class CFMMCManager(object):
def __init__(self, user_id, password):
self.s = requests.Session()
self.user_id = user_id
self.password = password
self.s.get("https://wapinvestorservice.cfmmc.com/login.do")
self.is_loggedin = False
def get_validate_code(self):
veri_code_rsp = self.s.get(
"https://wapinvestorservice.cfmmc.com/veriCode.do"
"?t=%d" % int(time.time() * 1000)
)
image = Image.open(StringIO.StringIO(veri_code_rsp.content))
return ocr_image(image)
def try_login(self):
vcode = None
for i in range(10):
vcode = self.get_validate_code()
print vcode
if len(vcode) == VERI_CODE_LEN:
logger.info("验证码识别成功(local)")
break
else:
logger.warn("验证码识别失败(local): {}".format(vcode))
else:
raise RuntimeError("始终无法识别验证码")
r = self.s.post(
"https://wapinvestorservice.cfmmc.com/login.do",
{
"j_username": self.user_id,
"j_password": self.password,
"j_validateCode": vcode,
}
)
return self.user_id in r.text
def login(self):
if self.is_loggedin:
return
for retry in range(10):
if self.try_login():
logger.info("登录成功 (retry=%d)" % retry)
self.is_loggedin = True
break
else:
logger.warn("登录失败 (retry=%d)" % retry)
else:
raise RuntimeError("登录失败")
def get_daily_report(self, dt):
self.login()
daily_report_url = (
"https://wapinvestorservice.cfmmc.com/customer/findDailyReport.do"
)
res = self.s.post(
daily_report_url,
data={
"tradeDate": dt.strftime("%Y-%m-%d"),
},
)
return res.json()
def calculate_daily_returns(start_dt, end_dt,
in_fund_before_trading=True,
out_fund_before_trading=False):
curr_dt = start_dt
cumprod = 1.0
print "trading_day,pnl,returns"
while curr_dt <= end_dt:
out_file = curr_dt.strftime("%Y%m%d.json")
with open(out_file) as fhandler:
data = json.load(fhandler)
if int(data["flag"]) != 2:
report = data["report"]
in_fund = report["clientIOTotal"]["INFUND"]
assert in_fund >= 0
out_fund = report["clientIOTotal"]["OUTFUND"]
assert out_fund >= 0
client_fund = report["clientFund"]
today_right = client_fund["TODAYRIGHTBYDATE"]
last_right = client_fund["LASTRIGHTBYDATE"]
today_profit = client_fund["TODAYPROFITBYDATE"]
today_fee = client_fund["TRADEFEE"]
expected_today_fund = (
last_right + today_profit
- today_fee + in_fund - out_fund
)
assert str(expected_today_fund) == str(today_right)
last_right_adj = last_right
if in_fund_before_trading:
last_right_adj += in_fund
if out_fund_before_trading:
last_right_adj -= out_fund
today_return = (
1.0 * (today_right - last_right_adj) / last_right_adj
)
# print "%s: %.2f%%" % (curr_dt.strftime("%Y%m%d"), today_return * 100)
cumprod *= (1 + today_return)
print "%s,%s,%s" % (curr_dt.strftime("%Y-%m-%d"), today_return, cumprod)
curr_dt += datetime.timedelta(days=1)
def main():
logging.basicConfig(level=logging.INFO)
cfmmc_username = "0000000000000"
cfmmc_password = "000000000"
mgr = CFMMCManager(cfmmc_username, cfmmc_password)
start_dt = datetime.datetime(2017, 5, 15)
end_dt = datetime.datetime(2017, 6, 29)
curr_dt = start_dt
while curr_dt <= end_dt:
out_file = curr_dt.strftime("%Y%m%d.json")
if not os.path.exists(out_file):
logger.info("Dumping to %s" % out_file)
time.sleep(0.1)
data = mgr.get_daily_report(curr_dt)
assert int(data["flag"]) != 3
print curr_dt
print data
with open(out_file, "w") as fhandler:
fhandler.write(json.dumps(data))
curr_dt += datetime.timedelta(days=1)
calculate_daily_returns(start_dt, end_dt)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment