Skip to content

Instantly share code, notes, and snippets.

@distagon
Forked from kwedr/群益API with pythonnet
Created August 8, 2017 13:36
Show Gist options
  • Save distagon/aa81fa5899daebb018243b7d103bf4a9 to your computer and use it in GitHub Desktop.
Save distagon/aa81fa5899daebb018243b7d103bf4a9 to your computer and use it in GitHub Desktop.
群益API with pythonnet
# -*- coding: utf-8 -*-
import sys, os, clr, time
is_py2 = sys.version[0] == '2'
if is_py2:
import Queue as queue
else:
import queue as queue
from switch import *
import logging
logging.basicConfig(level=logging.DEBUG)
q = queue.Queue ()
class Job:
Quote_EnterMonitor = 1
GetStockByIndex = 2
GetStockByIndexResult = 3
OnNotifyQuote = 4
def __init__ (self, do_type):
self.do_type = do_type
class SKCenterLibEvent:
def __init__(self, parent, bot):
self.parent = parent
self.bot = bot
def Bind(self, obj):
obj.OnTimer += self.OnTimer
def OnTimer(self, nTime):
print ("OnTimer: " + str(nTime))
def OnShowAgreement(self, bstrData):
print ("OnShowAgreement: " + str(bstrData))
class SKReplyLibEvent:
def __init__(self):
self.parent = None
self.bot = None
def OnConnect(bstrUserID, nErrorCode ):
print ("OnConnect: user:" + bstrUserID + " error:" + str(nErrorCode))
class SKQuoteLibEvent:
def __init__(self, parent, bot):
self.parent = parent
self.bot = bot
def Bind(self, obj):
obj.OnConnection += self.OnConnection
obj.OnNotifyQuote += self.OnNotifyQuote
def OnConnection(self, nKind, nCode):
print ("OnConnection: kind:" + str(nKind) + " error:" + str(nCode))
self.bot.SKQuoteLibOnOnConnection (nKind, nCode)
def OnNotifyQuote(self, sMarketNo, sIndex):
j = Job (Job.GetStockByIndexResult)
j.sMarketNo = sMarketNo
j.sIndex = sIndex
q.put (j)
class SKCOMWapper:
def __init__(self, uid, bot):
self.uid = uid
self.bot = bot
sys.path.append(os.path.join(os.getcwd(), "dll", "x64"))
clr.AddReference("Interop.SKCOMLib")
import SKCOMLib as SKCOMLib
self.Template = SKCOMLib
self.SKCenterLib = SKCOMLib.SKCenterLib ()
self.SKCenterLibEvent = SKCenterLibEvent(self, bot)
self.SKCenterLibEvent.Bind (self.SKCenterLib)
self.SKReplyLib = SKCOMLib.SKReplyLib ()
#國內報價物件。
self.SKQuoteLib = SKCOMLib.SKQuoteLib ()
self.SKQuoteLibEvent = SKQuoteLibEvent(self, bot)
self.SKQuoteLibEvent.Bind (self.SKQuoteLib)
self.SKSTOCK = SKCOMLib.SKSTOCK ()
class StockBot:
def __init__(self, stockno):
self.SKCOM = SKCOMWapper (0, self)
self.Stock = {}
self.StockNo = stockno
def DoLogin (self, account, pwd):
ret = self.SKCOM.SKCenterLib.SKCenterLib_Login (account, pwd)
print ("CeterLib Login: " + str (ret))
q.put (Job (Job.Quote_EnterMonitor))
def SKQuoteLibOnOnConnection (self, nKind, nCode):
if nKind == 3003 and nCode == 0:
q.put (Job (Job.GetStockByIndex))
def DoOnNotifyQuote (self, sMarketNo, sIndex):
ret, Stock = self.SKCOM.SKQuoteLib.SKQuoteLib_GetStockByIndex(sMarketNo, sIndex, self.SKCOM.SKSTOCK)
j = Job(Job.OnNotifyQuote)
j.Stock = Stock
q.put (j)
def DoGetStockByIndex (self):
self.SKCOM.SKQuoteLib.SKQuoteLib_RequestStocks (-1, self.StockNo)
def DoEnterMonitor (self):
ret = self.SKCOM.SKQuoteLib.SKQuoteLib_EnterMonitor ()
def DoJob(Bot, x):
for case in switch(x.do_type):
print ("DoJob: " + str (x.do_type))
if case(Job.Quote_EnterMonitor):
Bot.DoEnterMonitor ()
break
if case(Job.GetStockByIndex):
Bot.DoGetStockByIndex ()
break
if case(Job.GetStockByIndexResult):
Bot.DoOnNotifyQuote (x.sMarketNo, x.sIndex)
break
if case(Job.OnNotifyQuote):
print ("OnNotifyQuote: " + x.Stock.bstrStockName + " Close: " + str (x.Stock.nClose))
Bot.Stock[x.Stock.bstrStockNo] = x.Stock
break;
if __name__ == "__main__":
Bot = StockBot("6111")
Bot.DoLogin('帳號', '密碼')
while 1:
while not q.empty():
next_job = q.get()
DoJob (Bot, next_job)
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment