Skip to content

Instantly share code, notes, and snippets.

@distagon
Created January 6, 2019 16:09
Show Gist options
  • Save distagon/6ce5c152d27151367a5062f615c78529 to your computer and use it in GitHub Desktop.
Save distagon/6ce5c152d27151367a5062f615c78529 to your computer and use it in GitHub Desktop.
群益證券報價 API 實驗程式
import os
import json
import math
import time
import signal
import asyncio
import threading
import pythoncom
import comtypes.client
import comtypes.gen.SKCOMLib as sk
# 文件 4-1 p16
skC = comtypes.client.CreateObject(sk.SKCenterLib, interface=sk.ISKCenterLib)
# 文件 4-4 p77
skQ = comtypes.client.CreateObject(sk.SKQuoteLib, interface=sk.ISKQuoteLib)
# Ctrl+C 狀態參數
app_done = False
class QuoteReceiver(threading.Thread):
def __init__(self):
super().__init__()
self.ready = False
with open('quicksk.json', 'r') as cfgfile:
self.config = json.load(cfgfile)
self.log_path = os.path.realpath(self.config['log_path'])
async def wait_for_ready(self):
while not self.ready:
time.sleep(0.25)
async def monitor_quote(self):
global skC, skQ, app_done
try:
#skC.SKCenterLib_ResetServer('morder1.capital.com.tw')
skC.SKCenterLib_SetLogPath(self.log_path)
print('登入', flush=True)
retq = -1
retc = skC.SKCenterLib_Login(self.config['account'], self.config['password'])
if retc == 0:
print('啟動行情監控', flush=True)
retry = 0
while retq != 0 and retry < 3:
if retry > 0:
print('嘗試再啟動 {}'.format(retry))
retq = skQ.SKQuoteLib_EnterMonitor()
retry += 1
else:
msg = skC.SKCenterLib_GetReturnCodeMessage(retc)
print('登入失敗: #{} {}'.format(retc, msg))
if retq == 0:
print('等待行情監控器啟動完成')
await self.wait_for_ready()
print('設定商品', flush=True)
list_with_comma = ','.join(self.config['products'])
skQ.SKQuoteLib_RequestStocks(0, list_with_comma)
else:
print('無法監看報價: #{}'.format(retq))
while not app_done:
time.sleep(1)
except:
print('init() 發生不預期狀況', flush=True)
def run(self):
ehC = comtypes.client.GetEvents(skC, self)
ehQ = comtypes.client.GetEvents(skQ, self)
asyncio \
.new_event_loop() \
.run_until_complete(self.monitor_quote())
def OnTimer(self, nTime):
print('OnTimer(): {}'.format(nTime), flush=True)
def OnConnection(self, nKind, nCode):
print('OnConnection(): nKind={}, nCode={}'.format(nKind, nCode), flush=True)
if nKind == 3003:
self.ready = True
def OnNotifyQuote(self, sMarketNo, sStockidx):
pStock = sk.SKSTOCK()
skQ.SKQuoteLib_GetStockByIndex(sMarketNo, sStockidx, pStock)
msg = '{} {} 總量: {} 收盤: {}'.format(
pStock.bstrStockNo,
pStock.bstrStockName,
pStock.nTQty,
pStock.nClose / math.pow(10, pStock.sDecimal)
)
print(msg)
def ctrl_c(sig, frm):
global app_done, skQ
print('Ctrl+C detected.')
skQ.SKQuoteLib_LeaveMonitor()
app_done = True
def main():
signal.signal(signal.SIGINT, ctrl_c)
qrcv = QuoteReceiver()
qrcv.start()
print('Main thread: #{}'.format(threading.get_ident()), flush=True)
print('Receiver thread: #{}'.format(qrcv.ident), flush=True)
while not app_done:
pythoncom.PumpWaitingMessages()
time.sleep(5)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment