Skip to content

Instantly share code, notes, and snippets.

@yoonbae81
Created March 24, 2018 13:47
Show Gist options
  • Save yoonbae81/89de4661c2eba4a25f5ab139de6b1e00 to your computer and use it in GitHub Desktop.
Save yoonbae81/89de4661c2eba4a25f5ab139de6b1e00 to your computer and use it in GitHub Desktop.
yQuant prototype using ZeroMQ
#!/usr/bin/python3
import datetime
import json
import time
import zmq
import random
import operator
import re
import time
import sys
import pymysql
from collections import defaultdict
from os import getpid
from threading import Thread
from multiprocessing import Process, Pool
from urllib.request import urlopen
from urllib.error import URLError
from logger import get_logger
def feed_dummy_price(start_date, end_date):
"""
for Backtesting
"""
# SELECT * FROM price_intraday WHERE datetime BTEWEEN start_date AND
# end_date
pass
def feed_price(port):
get_logger().debug(
"Process[feed_price] is started (pid:{})".format(getpid()))
def parse(URL, socket):
# 정규표현식으로 파싱(키값에 따옴표가 없어서 JSON 파싱 불가)
# example : , {code:"095570",name :"AJ네트웍스",cost :"34,650",updn
# :"▲100",rate :"+0.29%"}
pattern = "code:\"(.+)\",name :\"(.+)\",cost :\"(.+)\",updn"
rep = re.compile(pattern)
start = time.time()
response = urlopen(URL, timeout=TIMEOUT).read().decode('UTF-8')
get_logger().debug(
'웹페이지 수신 ({0:.5g} seconds)'.format(time.time() - start))
start = time.time()
for line in response.splitlines():
if '장종료' in line:
is_market_finished = True
# 장이 종료되었으면 마지막 한 번은 저장
if 'code' not in line:
continue
# print(line)
match = rep.search(line)
symbol = match.group(1)
name = match.group(2)
price = int(match.group(3).replace(',', ''))
get_logger().debug(
"종목코드/가격 파싱 ({0:.5g} seconds)".format(time.time() - start))
# response = urlopen(URL, timeout=1).read().decode('UTF-8')
# for line in response.splitlines():
symbol = random.choice(['삼성전자', '한국전력', '현대차'])
price = random.randrange(10000, 90000, 1000)
socket.send_string("{}={}".format(symbol, price))
get_logger().debug(
"feed_price sent a message ({}={})".format(symbol, price))
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:{}".format(port))
TIMEOUT = 1
INTERVAL = 3
URLs = ('http://finance.daum.net/xml/xmlallpanel.daum?stype=P&type=S',
'http://finance.daum.net/xml/xmlallpanel.daum?stype=Q&type=S')
try:
while True:
for URL in URLs:
# 반드시 Thread (socket 전달과 main에서 모든 프로세스 종료 목적)
# 측정결과 parse는 Thread로 하여도 Process 대비 성능저하 없음
job = Thread(target=parse, args=(URL, socket))
job.start()
jobs.append(job)
time.sleep(INTERVAL)
except URLError:
get_logger().debug('웹페이지 수신 {}초내 수신 실패'.format(TIMEOUT))
return
except KeyboardInterrupt:
for job in jobs:
job.terminate()
finally:
socket.close()
context.term()
def store_price(port):
get_logger().debug(
"Process[store_price] is started (pid:{})".format(getpid()))
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '')
socket.connect("tcp://127.0.0.1:{}".format(port))
with open('config.json') as file:
config = json.load(file)['mysql']
prices = defaultdict(lambda: 0)
try:
start = time.time()
conn = pymysql.connect(host=config['host'],
port=config['port'],
user=config['user'],
password=config['password'],
db=config['db'])
cursor = conn.cursor()
get_logger().debug(
'Database 연결 ({0:.5g} seconds)'.format(time.time() - start))
while True:
message = socket.recv_string()
get_logger().debug(
"store_price recevied a message ({})".format(message))
start = time.time()
affected_rows = 0
insert_values.append("('{}', {})".format(symbol, price))
if(len(insert_values) > 0):
cursor.execute(
"INSERT INTO price_intraday (symbol, price) VALUES " + ", ".join(insert_values))
affected_rows = cursor.rowcount
return affected_rows
# INSERT INTO price_intraday
except pymysql.err.OperationalError:
get_logger().warn('Database 연결 실패')
raise
finally:
socket.close()
context.term()
conn.commit()
conn.close()
get_logger().debug("Database {0}건 저장 ({1:.5g} seconds)".format(
affected_rows, time.time() - start))
def monitor_price(port, symbol, price, op=operator.lt):
get_logger().debug(
"Process[monitor_price] started (pid:{})".format(getpid()))
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, symbol)
socket.connect("tcp://127.0.0.1:{}".format(port))
try:
while True:
message = socket.recv_string()
get_logger().debug(
"monitor_price recevied a message ({})".format(message))
symbol = message.split('=')[0]
current_price = int(message.split('=')[1])
if op(price, current_price):
get_logger().debug("{} reached more than the monitoring price, {} (current: {})".format(
symbol, price, current_price))
# Send a order message to portfolio or broker process
finally:
socket.close()
context.term()
def main():
get_logger().debug("Process[main] is started (pid:{})".format(getpid()))
pool = Pool()
try:
pool.apply_async(feed_price, args=(50001,))
pool.apply_async(store_price, args=(50001,))
#pool.apply_async(monitor_price, args=(50001, '현대차', 65000))
except KeyboardInterrupt:
pool.terminate()
else:
pool.close()
finally:
pool.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment