Created
December 30, 2021 14:37
-
-
Save piotrmaslanka/c3b639ce35f17fbfb78bcb975ed8d16c to your computer and use it in GitHub Desktop.
KOLEKCJONER. A script used by Tarnobrzeska Spółdzielnia Mieszkaniowa before they had SMOK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python 2 | |
# -*- coding: utf-8 -*- | |
'''Komunikacja MODBUS i kod Kolekcjonera Frisko''' | |
from __future__ import division | |
import struct | |
import socket | |
from threading import Thread | |
from threading import Timer | |
import datetime | |
import os | |
class ModbusManager: | |
def __init__(self): | |
'''Przygotowuje tablice CRC''' | |
lst = [] | |
i = 0 | |
while (i<256): | |
data = i<<1 | |
crc = 0 | |
j = 8 | |
while (j>0): | |
data >>= 1 | |
if ((data^crc)&0x1): | |
crc = (crc>>1) ^ 0xA001 | |
else: | |
crc >>= 1 | |
j -= 1 | |
lst.append (crc) | |
i += 1 | |
self.table = lst | |
def cCRC(self,st): | |
'''Liczy MODBUS-CRC z podanego stringa''' | |
crc = 0xFFFF | |
for ch in st: | |
crc = (crc>>8)^self.table[(crc^ord(ch))&0xFF] | |
return crc | |
def getReturnValues(self, sock): | |
'''Pobiera wartosci zwrocone przez funkcje zwracajace ilosc bajtow(b) i szereg wordow. Wartosci wywala do listy''' | |
global struct | |
datei = struct.unpack('>BBB',sock.recv(3)) | |
rest = sock.recv(datei[2]+2) | |
lamka = list() | |
while True: | |
lamka.append(struct.unpack('>h',rest[0:2])[0]) | |
rest = rest[2:] | |
if len(rest)==2: | |
break | |
return lamka | |
def generateMultipleRead(self, frm, cnt): | |
'''Generuje zapytanie MODBUS 0x3. frm oznacza adres startowy, cnt oznacza ilosc portow''' | |
global struct | |
naglowek = struct.pack('>BBHH',1,3,frm,cnt) | |
return naglowek + struct.pack('<H',self.cCRC(naglowek)) | |
class QueryThread(Thread): | |
'''Klasa watku odpytujaca sterownik, zapisujaca do pliku i popelniajaca samobojstwo :)''' | |
def __init__(self, name, ip, mm): | |
global socket | |
print "Odpytuje "+name | |
Thread.__init__(self) | |
self.name = name | |
self.mm = mm | |
self.ip = ip | |
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.s.settimeout(30) | |
def run(self): | |
global datetime, os | |
failure = False | |
czas = str(datetime.datetime.now().hour) +":"+str(datetime.datetime.now().minute) | |
try: | |
self.s.connect((self.ip,9001)) | |
except: | |
failure = True | |
print "Zdechlo polaczenie "+self.name | |
return | |
folder_name = str(datetime.date.today()) | |
try: | |
os.mkdir(folder_name) | |
except: | |
pass # ok, folder juz istnieje | |
if (not os.path.exists(folder_name+'/'+self.name+'.txt')): | |
fl = file(folder_name+'/'+self.name+'.txt', 'a') | |
fl.write('Wygenerowano automatycznie z systemu SMW dla Tarnobrzeskiej Spoldzielni Mieszkaniowej\n') | |
fl.write('Czas|CO|CW|Zewnetrzna|MPEC\n') | |
del fl | |
try: | |
fl = file(folder_name+'/'+self.name+'.txt','a') | |
except: | |
print "Wyjatek podczas tworzenia pliku - sprawdz uprawnienia! "+self.name | |
return | |
if not failure: | |
try: | |
self.s.send(self.mm.generateMultipleRead(4065,8)) | |
dane = self.mm.getReturnValues(self.s) | |
except: | |
print 'Blizej nieokreslona porazka, skontaktuj sie z tworca programu - '+self.name | |
failure = True | |
if not failure: | |
dane = map(lambda x: 'ERR2' if (int(x)>3000) or (int(x)<-3000) else str(x/10).replace('.',','), dane) # CO CW zew MPEC | |
fl.write(czas+'|'+dane[4]+'|'+dane[5]+'|'+dane[0]+'|'+dane[7]+'\x0A') # dane plus CRLF - czyli zlam linii | |
else: | |
fl.write(czas+'|ERR|ERR\x0A') # dane plus CRLF - czyli zlam linii | |
del fl | |
del self.s | |
print "Wykonano "+self.name | |
def odpytywanka(): | |
global lolserwery, co_ile | |
print "Odpalam nowa kolejke odpytywania" | |
Timer(co_ile,odpytywanka).start() | |
for x,y in lolserwery.iteritems(): | |
QueryThread(x,y, ModbusManager()).start() | |
def fajnyParser(): | |
serwerki = {} | |
x = file('serwery.txt','r') | |
for z in x.readlines(): | |
if len(z)<5: | |
continue # takie zabezpieczenie nie wiadomo na co | |
abc = str.split(z,'|') | |
if abc[1][-1]=='\n': | |
abc[1] = abc[1][0:-1] | |
serwerki[abc[0]] = abc[1] | |
print "Wczytuje "+abc[0]+" z adresem "+abc[1] | |
del x | |
print str(serwerki) | |
return serwerki | |
if __name__ == '__main__': | |
lolserwery = fajnyParser() | |
try: | |
co_ile = float(raw_input("Co ile minut? "))*60 | |
except: | |
print "Tak, i co jeszcze?" | |
exit() | |
if co_ile<1: | |
print "Timeout na sockecie nie wyrobi. Nacisnij enter i wpisz cos innego" | |
raw_input() | |
exit() | |
Timer(co_ile,odpytywanka).start() | |
print "No to jedziemy z tym koksem" | |
print "Jak chcesz przerwac, to wcisnij [enter]" | |
raw_input() | |
exit() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment