Last active
April 6, 2020 07:57
-
-
Save Ray901/a305cb06746b4a0b0ba1ef71de8eb32c to your computer and use it in GitHub Desktop.
get MSMQ data to SQL SERVER
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
import win32com.client | |
import os | |
import pythoncom | |
import json | |
import pyodbc | |
from datetime import datetime | |
from datetime import timedelta | |
import time | |
# local sql | |
local_server = '' | |
local_database = 'MSMQ_LOG' | |
local_username = '' | |
local_password = '' | |
local_driver = '{SQL Server}' | |
# 多筆資料接收不刪除 | |
qinfo=win32com.client.Dispatch("MSMQ.MSMQQueueInfo") | |
computer_name = "" | |
qinfo.FormatName="direct=TCP:"+computer_name+"\\PRIVATE$\\cr" | |
queue=qinfo.Open(32, 0) # Open a ref to queue to read(1) | |
SeqNo = 1 | |
minSeq = 1 | |
minDate = '2000-01-01 00:00:00' | |
timeout_sec = 1.0 | |
diffSeconds = 1000 | |
connection = pyodbc.connect( | |
'Driver='+ local_driver +';' | |
'Server='+ local_server +';' | |
'Database='+ local_database +';' | |
'uid='+ local_username +';pwd=' + local_password) | |
cursor = connection.cursor() | |
SQLQueryMin = (""" | |
DECLARE @sql nvarchar(max); | |
SELECT @sql = Coalesce(@sql + ' UNION ALL ', '') + ' | |
SELECT MAX([seqNo]) AS [seqNo],MAX([renewDate]) AS [renewDate] | |
FROM [MSMQ_LOG].[dbo].['+ T.[TName] +'] | |
' | |
FROM ( | |
SELECT TOP 2 TABLE_NAME AS [TName] | |
FROM INFORMATION_SCHEMA.TABLES | |
WHERE TABLE_TYPE = 'BASE TABLE' AND TABLE_CATALOG='MSMQ_LOG' | |
ORDER BY TABLE_NAME DESC | |
) T | |
EXEC(@sql) | |
""") | |
cursor.execute(SQLQueryMin) | |
row = cursor.fetchone() | |
while row: | |
if row[0] is not None : | |
minSeq = row[0] | |
SeqNo = row[0] + 1 | |
minDate = row[1] | |
row = cursor.fetchone() | |
connection.commit() | |
cursor.close() | |
cursor = connection.cursor() | |
SQLCommand = (""" | |
INSERT INTO CR_MSMQ_TEST | |
VALUES (?, ?, ?, ?, ?, ?) | |
""") | |
while True:#diffSeconds>0: | |
msg = queue.PeekCurrent(pythoncom.Empty, pythoncom.Empty, timeout_sec * 5000) | |
if msg.Label is not None : | |
#print "SeqNo:",SeqNo | |
#print( msg.Label ) | |
js = json.loads(msg.Body[0:len(msg.Body)]) | |
renewDate = js['renewDate'] | |
#print(renewDate) | |
setTable = 'CR_' + renewDate[0:10].replace("-","") | |
if cursor.tables(table=setTable, tableType='TABLE').fetchone(): | |
#print('Table Exist') | |
if SeqNo % 100000 == 0: | |
print("Normal") | |
else : | |
SQLCreateTable = ( | |
'CREATE TABLE ' + setTable + | |
""" | |
( | |
[seqNo] [decimal](10, 0) NULL, | |
[renewDate] [varchar](19) NULL, | |
[label] [varchar](19) NULL, | |
[device_id] [varchar](12) NULL, | |
[device_model] [varchar](10) NULL, | |
[machine_status] [varchar](max) NULL | |
) ON [PRIMARY] | |
""") | |
cursor.execute(SQLCreateTable) | |
connection.commit() | |
cursor.close() | |
cursor = connection.cursor() | |
SQLCommand = ( | |
'INSERT INTO ' + setTable + | |
""" | |
VALUES (?, ?, ?, ?, ?, ?) | |
""") | |
device_id = js['device_id'] | |
if "status" in msg.Label: | |
device_model = js['device_model'] | |
machine_status = '{"machine_status":'+str(js['machine_status']).replace("'",'"')+'}' | |
else: | |
device_model = js['item'] | |
machine_status = '{"event_msg":"'+ js['event_msg'] + '","id":"' + js['id'] + '"}' | |
dataRow = [SeqNo, renewDate, msg.Label, device_id, device_model, machine_status] | |
eDate = datetime.strptime(renewDate,'%Y-%m-%d %H:%M:%S') | |
getMinDate = datetime.strptime(minDate,'%Y-%m-%d %H:%M:%S') | |
if eDate>getMinDate: | |
cursor.execute(SQLCommand, dataRow) | |
SeqNo += 1 | |
else: | |
print("repeat record") | |
msg = queue.PeekNext(pythoncom.Empty, pythoncom.Empty, timeout_sec * 5000) | |
else: | |
print("No More Messages in Queue") | |
time.sleep(86400) | |
connection.commit() | |
cursor.close() | |
connection.close() | |
queue.Close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment