-
-
Save distagon/aa81fa5899daebb018243b7d103bf4a9 to your computer and use it in GitHub Desktop.
群益API with pythonnet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import sys, os, clr, time | |
is_py2 = sys.version[0] == '2' | |
if is_py2: | |
import Queue as queue | |
else: | |
import queue as queue | |
from switch import * | |
import logging | |
logging.basicConfig(level=logging.DEBUG) | |
q = queue.Queue () | |
class Job: | |
Quote_EnterMonitor = 1 | |
GetStockByIndex = 2 | |
GetStockByIndexResult = 3 | |
OnNotifyQuote = 4 | |
def __init__ (self, do_type): | |
self.do_type = do_type | |
class SKCenterLibEvent: | |
def __init__(self, parent, bot): | |
self.parent = parent | |
self.bot = bot | |
def Bind(self, obj): | |
obj.OnTimer += self.OnTimer | |
def OnTimer(self, nTime): | |
print ("OnTimer: " + str(nTime)) | |
def OnShowAgreement(self, bstrData): | |
print ("OnShowAgreement: " + str(bstrData)) | |
class SKReplyLibEvent: | |
def __init__(self): | |
self.parent = None | |
self.bot = None | |
def OnConnect(bstrUserID, nErrorCode ): | |
print ("OnConnect: user:" + bstrUserID + " error:" + str(nErrorCode)) | |
class SKQuoteLibEvent: | |
def __init__(self, parent, bot): | |
self.parent = parent | |
self.bot = bot | |
def Bind(self, obj): | |
obj.OnConnection += self.OnConnection | |
obj.OnNotifyQuote += self.OnNotifyQuote | |
def OnConnection(self, nKind, nCode): | |
print ("OnConnection: kind:" + str(nKind) + " error:" + str(nCode)) | |
self.bot.SKQuoteLibOnOnConnection (nKind, nCode) | |
def OnNotifyQuote(self, sMarketNo, sIndex): | |
j = Job (Job.GetStockByIndexResult) | |
j.sMarketNo = sMarketNo | |
j.sIndex = sIndex | |
q.put (j) | |
class SKCOMWapper: | |
def __init__(self, uid, bot): | |
self.uid = uid | |
self.bot = bot | |
sys.path.append(os.path.join(os.getcwd(), "dll", "x64")) | |
clr.AddReference("Interop.SKCOMLib") | |
import SKCOMLib as SKCOMLib | |
self.Template = SKCOMLib | |
self.SKCenterLib = SKCOMLib.SKCenterLib () | |
self.SKCenterLibEvent = SKCenterLibEvent(self, bot) | |
self.SKCenterLibEvent.Bind (self.SKCenterLib) | |
self.SKReplyLib = SKCOMLib.SKReplyLib () | |
#國內報價物件。 | |
self.SKQuoteLib = SKCOMLib.SKQuoteLib () | |
self.SKQuoteLibEvent = SKQuoteLibEvent(self, bot) | |
self.SKQuoteLibEvent.Bind (self.SKQuoteLib) | |
self.SKSTOCK = SKCOMLib.SKSTOCK () | |
class StockBot: | |
def __init__(self, stockno): | |
self.SKCOM = SKCOMWapper (0, self) | |
self.Stock = {} | |
self.StockNo = stockno | |
def DoLogin (self, account, pwd): | |
ret = self.SKCOM.SKCenterLib.SKCenterLib_Login (account, pwd) | |
print ("CeterLib Login: " + str (ret)) | |
q.put (Job (Job.Quote_EnterMonitor)) | |
def SKQuoteLibOnOnConnection (self, nKind, nCode): | |
if nKind == 3003 and nCode == 0: | |
q.put (Job (Job.GetStockByIndex)) | |
def DoOnNotifyQuote (self, sMarketNo, sIndex): | |
ret, Stock = self.SKCOM.SKQuoteLib.SKQuoteLib_GetStockByIndex(sMarketNo, sIndex, self.SKCOM.SKSTOCK) | |
j = Job(Job.OnNotifyQuote) | |
j.Stock = Stock | |
q.put (j) | |
def DoGetStockByIndex (self): | |
self.SKCOM.SKQuoteLib.SKQuoteLib_RequestStocks (-1, self.StockNo) | |
def DoEnterMonitor (self): | |
ret = self.SKCOM.SKQuoteLib.SKQuoteLib_EnterMonitor () | |
def DoJob(Bot, x): | |
for case in switch(x.do_type): | |
print ("DoJob: " + str (x.do_type)) | |
if case(Job.Quote_EnterMonitor): | |
Bot.DoEnterMonitor () | |
break | |
if case(Job.GetStockByIndex): | |
Bot.DoGetStockByIndex () | |
break | |
if case(Job.GetStockByIndexResult): | |
Bot.DoOnNotifyQuote (x.sMarketNo, x.sIndex) | |
break | |
if case(Job.OnNotifyQuote): | |
print ("OnNotifyQuote: " + x.Stock.bstrStockName + " Close: " + str (x.Stock.nClose)) | |
Bot.Stock[x.Stock.bstrStockNo] = x.Stock | |
break; | |
if __name__ == "__main__": | |
Bot = StockBot("6111") | |
Bot.DoLogin('帳號', '密碼') | |
while 1: | |
while not q.empty(): | |
next_job = q.get() | |
DoJob (Bot, next_job) | |
time.sleep(1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment