Skip to content

Instantly share code, notes, and snippets.

@piotrmaslanka
Created December 30, 2021 14:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save piotrmaslanka/c3b639ce35f17fbfb78bcb975ed8d16c to your computer and use it in GitHub Desktop.
Save piotrmaslanka/c3b639ce35f17fbfb78bcb975ed8d16c to your computer and use it in GitHub Desktop.
KOLEKCJONER. A script used by Tarnobrzeska Spółdzielnia Mieszkaniowa before they had SMOK
# Python 2
# -*- coding: utf-8 -*-
'''Komunikacja MODBUS i kod Kolekcjonera Frisko'''
from __future__ import division
import struct
import socket
from threading import Thread
from threading import Timer
import datetime
import os
class ModbusManager:
def __init__(self):
'''Przygotowuje tablice CRC'''
lst = []
i = 0
while (i<256):
data = i<<1
crc = 0
j = 8
while (j>0):
data >>= 1
if ((data^crc)&0x1):
crc = (crc>>1) ^ 0xA001
else:
crc >>= 1
j -= 1
lst.append (crc)
i += 1
self.table = lst
def cCRC(self,st):
'''Liczy MODBUS-CRC z podanego stringa'''
crc = 0xFFFF
for ch in st:
crc = (crc>>8)^self.table[(crc^ord(ch))&0xFF]
return crc
def getReturnValues(self, sock):
'''Pobiera wartosci zwrocone przez funkcje zwracajace ilosc bajtow(b) i szereg wordow. Wartosci wywala do listy'''
global struct
datei = struct.unpack('>BBB',sock.recv(3))
rest = sock.recv(datei[2]+2)
lamka = list()
while True:
lamka.append(struct.unpack('>h',rest[0:2])[0])
rest = rest[2:]
if len(rest)==2:
break
return lamka
def generateMultipleRead(self, frm, cnt):
'''Generuje zapytanie MODBUS 0x3. frm oznacza adres startowy, cnt oznacza ilosc portow'''
global struct
naglowek = struct.pack('>BBHH',1,3,frm,cnt)
return naglowek + struct.pack('<H',self.cCRC(naglowek))
class QueryThread(Thread):
'''Klasa watku odpytujaca sterownik, zapisujaca do pliku i popelniajaca samobojstwo :)'''
def __init__(self, name, ip, mm):
global socket
print "Odpytuje "+name
Thread.__init__(self)
self.name = name
self.mm = mm
self.ip = ip
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.settimeout(30)
def run(self):
global datetime, os
failure = False
czas = str(datetime.datetime.now().hour) +":"+str(datetime.datetime.now().minute)
try:
self.s.connect((self.ip,9001))
except:
failure = True
print "Zdechlo polaczenie "+self.name
return
folder_name = str(datetime.date.today())
try:
os.mkdir(folder_name)
except:
pass # ok, folder juz istnieje
if (not os.path.exists(folder_name+'/'+self.name+'.txt')):
fl = file(folder_name+'/'+self.name+'.txt', 'a')
fl.write('Wygenerowano automatycznie z systemu SMW dla Tarnobrzeskiej Spoldzielni Mieszkaniowej\n')
fl.write('Czas|CO|CW|Zewnetrzna|MPEC\n')
del fl
try:
fl = file(folder_name+'/'+self.name+'.txt','a')
except:
print "Wyjatek podczas tworzenia pliku - sprawdz uprawnienia! "+self.name
return
if not failure:
try:
self.s.send(self.mm.generateMultipleRead(4065,8))
dane = self.mm.getReturnValues(self.s)
except:
print 'Blizej nieokreslona porazka, skontaktuj sie z tworca programu - '+self.name
failure = True
if not failure:
dane = map(lambda x: 'ERR2' if (int(x)>3000) or (int(x)<-3000) else str(x/10).replace('.',','), dane) # CO CW zew MPEC
fl.write(czas+'|'+dane[4]+'|'+dane[5]+'|'+dane[0]+'|'+dane[7]+'\x0A') # dane plus CRLF - czyli zlam linii
else:
fl.write(czas+'|ERR|ERR\x0A') # dane plus CRLF - czyli zlam linii
del fl
del self.s
print "Wykonano "+self.name
def odpytywanka():
global lolserwery, co_ile
print "Odpalam nowa kolejke odpytywania"
Timer(co_ile,odpytywanka).start()
for x,y in lolserwery.iteritems():
QueryThread(x,y, ModbusManager()).start()
def fajnyParser():
serwerki = {}
x = file('serwery.txt','r')
for z in x.readlines():
if len(z)<5:
continue # takie zabezpieczenie nie wiadomo na co
abc = str.split(z,'|')
if abc[1][-1]=='\n':
abc[1] = abc[1][0:-1]
serwerki[abc[0]] = abc[1]
print "Wczytuje "+abc[0]+" z adresem "+abc[1]
del x
print str(serwerki)
return serwerki
if __name__ == '__main__':
lolserwery = fajnyParser()
try:
co_ile = float(raw_input("Co ile minut? "))*60
except:
print "Tak, i co jeszcze?"
exit()
if co_ile<1:
print "Timeout na sockecie nie wyrobi. Nacisnij enter i wpisz cos innego"
raw_input()
exit()
Timer(co_ile,odpytywanka).start()
print "No to jedziemy z tym koksem"
print "Jak chcesz przerwac, to wcisnij [enter]"
raw_input()
exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment