Skip to content

Instantly share code, notes, and snippets.

@Ray901
Last active April 6, 2020 07:57
Show Gist options
  • Save Ray901/a305cb06746b4a0b0ba1ef71de8eb32c to your computer and use it in GitHub Desktop.
Save Ray901/a305cb06746b4a0b0ba1ef71de8eb32c to your computer and use it in GitHub Desktop.
get MSMQ data to SQL SERVER
# coding=utf-8
import win32com.client
import os
import pythoncom
import json
import pyodbc
from datetime import datetime
from datetime import timedelta
import time
# local sql
local_server = ''
local_database = 'MSMQ_LOG'
local_username = ''
local_password = ''
local_driver = '{SQL Server}'
# 多筆資料接收不刪除
qinfo=win32com.client.Dispatch("MSMQ.MSMQQueueInfo")
computer_name = ""
qinfo.FormatName="direct=TCP:"+computer_name+"\\PRIVATE$\\cr"
queue=qinfo.Open(32, 0) # Open a ref to queue to read(1)
SeqNo = 1
minSeq = 1
minDate = '2000-01-01 00:00:00'
timeout_sec = 1.0
diffSeconds = 1000
connection = pyodbc.connect(
'Driver='+ local_driver +';'
'Server='+ local_server +';'
'Database='+ local_database +';'
'uid='+ local_username +';pwd=' + local_password)
cursor = connection.cursor()
SQLQueryMin = ("""
DECLARE @sql nvarchar(max);
SELECT @sql = Coalesce(@sql + ' UNION ALL ', '') + '
SELECT MAX([seqNo]) AS [seqNo],MAX([renewDate]) AS [renewDate]
FROM [MSMQ_LOG].[dbo].['+ T.[TName] +']
'
FROM (
SELECT TOP 2 TABLE_NAME AS [TName]
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='MSMQ_LOG'
ORDER BY TABLE_NAME DESC
) T
EXEC(@sql)
""")
cursor.execute(SQLQueryMin)
row = cursor.fetchone()
while row:
if row[0] is not None :
minSeq = row[0]
SeqNo = row[0] + 1
minDate = row[1]
row = cursor.fetchone()
connection.commit()
cursor.close()
cursor = connection.cursor()
SQLCommand = ("""
INSERT INTO CR_MSMQ_TEST
VALUES (?, ?, ?, ?, ?, ?)
""")
while True:#diffSeconds>0:
msg = queue.PeekCurrent(pythoncom.Empty, pythoncom.Empty, timeout_sec * 5000)
if msg.Label is not None :
#print "SeqNo:",SeqNo
#print( msg.Label )
js = json.loads(msg.Body[0:len(msg.Body)])
renewDate = js['renewDate']
#print(renewDate)
setTable = 'CR_' + renewDate[0:10].replace("-","")
if cursor.tables(table=setTable, tableType='TABLE').fetchone():
#print('Table Exist')
if SeqNo % 100000 == 0:
print("Normal")
else :
SQLCreateTable = (
'CREATE TABLE ' + setTable +
"""
(
[seqNo] [decimal](10, 0) NULL,
[renewDate] [varchar](19) NULL,
[label] [varchar](19) NULL,
[device_id] [varchar](12) NULL,
[device_model] [varchar](10) NULL,
[machine_status] [varchar](max) NULL
) ON [PRIMARY]
""")
cursor.execute(SQLCreateTable)
connection.commit()
cursor.close()
cursor = connection.cursor()
SQLCommand = (
'INSERT INTO ' + setTable +
"""
VALUES (?, ?, ?, ?, ?, ?)
""")
device_id = js['device_id']
if "status" in msg.Label:
device_model = js['device_model']
machine_status = '{"machine_status":'+str(js['machine_status']).replace("'",'"')+'}'
else:
device_model = js['item']
machine_status = '{"event_msg":"'+ js['event_msg'] + '","id":"' + js['id'] + '"}'
dataRow = [SeqNo, renewDate, msg.Label, device_id, device_model, machine_status]
eDate = datetime.strptime(renewDate,'%Y-%m-%d %H:%M:%S')
getMinDate = datetime.strptime(minDate,'%Y-%m-%d %H:%M:%S')
if eDate>getMinDate:
cursor.execute(SQLCommand, dataRow)
SeqNo += 1
else:
print("repeat record")
msg = queue.PeekNext(pythoncom.Empty, pythoncom.Empty, timeout_sec * 5000)
else:
print("No More Messages in Queue")
time.sleep(86400)
connection.commit()
cursor.close()
connection.close()
queue.Close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment