Skip to content

Instantly share code, notes, and snippets.

@subinabr
Created March 7, 2014 07:35
Show Gist options
  • Save subinabr/9407033 to your computer and use it in GitHub Desktop.
Save subinabr/9407033 to your computer and use it in GitHub Desktop.
from pymodbus.constants import Endian
#from pymodbus.payload import BinaryPayloadDecoder
from struct import pack, unpack
from pymodbus.constants import Endian
from pymodbus.utilities import pack_bitstring
from pymodbus.utilities import unpack_bitstring
from pymodbus.exceptions import ParameterException
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
from pymodbus.transaction import ModbusSocketFramer as ModbusFramer
import csv
import datetime
import time
import subprocess
import sys
import os
import logging
import logging.handlers
import gc
from os.path import join
ID_VENDOR='0403'
ID_PRODUCT='6001'
class BinaryPayloadDecoder(object):
''' A utility that helps decode payload messages from a modbus
reponse message. It really is just a simple wrapper around
the struct module, however it saves time looking up the format
strings. What follows is a simple example::
decoder = BinaryPayloadDecoder(payload)
first = decoder.decode_8bit_uint()
second = decoder.decode_16bit_uint()
'''
def __init__(self, payload, endian=Endian.Little):
''' Initialize a new payload decoder
:param payload: The payload to decode with
:param endian: The endianess of the payload
'''
self._payload = payload
self._pointer = 0x00
self._endian = endian
@classmethod
def fromRegisters(klass, registers, endian=Endian.Little):
''' Initialize a payload decoder with the result of
reading a collection of registers from a modbus device.
The registers are treated as a list of 2 byte values.
We have to do this because of how the data has already
been decoded by the rest of the library.
:param registers: The register results to initialize with
:param endian: The endianess of the payload
:returns: An initialized PayloadDecoder
'''
if isinstance(registers, list): # repack into flat binary
payload = ' '.join(pack('>H', x) for x in registers)
return klass(payload, endian)
raise ParameterException('Invalid collection of registers supplied')
@classmethod
def fromCoils(klass, coils, endian=Endian.Little):
''' Initialize a payload decoder with the result of
reading a collection of coils from a modbus device.
The coils are treated as a list of bit(boolean) values.
:param coils: The coil results to initialize with
:param endian: The endianess of the payload
:returns: An initialized PayloadDecoder
'''
if isinstance(coils, list):
payload = pack_bitstring(coils)
return klass(payload, endian)
raise ParameterException('Invalid collection of coils supplied')
def reset(self):
''' Reset the decoder pointer back to the start
'''
self._pointer = 0x00
def decode_8bit_uint(self):
''' Decodes a 8 bit unsigned int from the buffer
'''
self._pointer += 1
fstring = self._endian + 'B'
handle = self._payload[self._pointer - 1:self._pointer]
return unpack(fstring, handle)[0]
def decode_bits(self):
''' Decodes a byte worth of bits from the buffer
'''
self._pointer += 1
fstring = self._endian + 'B'
handle = self._payload[self._pointer - 1:self._pointer]
return unpack_bitstring(handle)
def decode_16bit_uint(self):
''' Decodes a 16 bit unsigned int from the buffer
'''
self._pointer += 2
fstring = self._endian + 'H'
handle = self._payload[self._pointer - 2:self._pointer]
return unpack(fstring, handle)[0]
def decode_32bit_uint(self):
''' Decodes a 32 bit unsigned int from the buffer
'''
self._pointer += 4
fstring = self._endian + 'I'
handle = self._payload[self._pointer - 4:self._pointer]
return unpack(fstring, handle)[0]
def decode_64bit_uint(self):
''' Decodes a 64 bit unsigned int from the buffer
'''
self._pointer += 8
fstring = self._endian + 'Q'
handle = self._payload[self._pointer - 8:self._pointer]
return unpack(fstring, handle)[0]
def decode_8bit_int(self):
''' Decodes a 8 bit signed int from the buffer
'''
self._pointer += 1
fstring = self._endian + 'b'
handle = self._payload[self._pointer - 1:self._pointer]
return unpack(fstring, handle)[0]
def decode_16bit_int(self):
''' Decodes a 16 bit signed int from the buffer
'''
self._pointer += 2
fstring = self._endian + 'h'
handle = self._payload[self._pointer - 2:self._pointer]
return unpack(fstring, handle)[0]
def decode_32bit_int(self):
''' Decodes a 32 bit signed int from the buffer
'''
self._pointer += 4
fstring = self._endian + 'i'
handle = self._payload[self._pointer - 4:self._pointer]
return unpack(fstring, handle)[0]
def decode_64bit_int(self):
''' Decodes a 64 bit signed int from the buffer
'''
self._pointer += 8
fstring = self._endian + 'q'
handle = self._payload[self._pointer - 8:self._pointer]
return unpack(fstring, handle)[0]
def decode_32bit_float(self):
''' Decodes a 32 bit float from the buffer
'''
self._pointer += 4
fstring = self._endian + 'f'
handle = self._payload[self._pointer - 4:self._pointer]
return unpack(fstring, handle)[0]
def decode_64bit_float(self):
''' Decodes a 64 bit float(double) from the buffer
'''
self._pointer += 8
fstring = self._endian + 'd'
handle = self._payload[self._pointer - 8:self._pointer]
return unpack(fstring, handle)[0]
def decode_string(self, size=1):
''' Decodes a string from the buffer
:param size: The size of the string to decode
'''
self._pointer += size
return self._payload[self._pointer - size:self._pointer]
def CONNECT_TO_METER():
try:
client = None
METER_PORT = find_tty_usb(ID_VENDOR, ID_PRODUCT)
#print METER_PORT#reading to which port rs485(client) is connected
client = ModbusClient(method='rtu', port=METER_PORT, baudrate=9600, stopbits=1, parity='E',timeout=0.3, bytesize=8)
client.connect()
return client
except Exception as e:
#lgr.error('Error while connecting client: ', exc_info = True)
print "Error while connecting client: \n"+e.__str__()
def find_tty_usb(idVendor, idProduct):
"""find_tty_usb('067b', '2302') -> '/dev/ttyUSB0'"""
# Note: if searching for a lot of pairs, it would be much faster to search
# for the enitre lot at once instead of going over all the usb devices
# each time.
for dnbase in os.listdir('/sys/bus/usb/devices'):
dn = join('/sys/bus/usb/devices', dnbase)
if not os.path.exists(join(dn, 'idVendor')):
continue
idv = open(join(dn, 'idVendor')).read().strip()
if idv != idVendor:
continue
idp = open(join(dn, 'idProduct')).read().strip()
if idp != idProduct:
continue
for subdir in os.listdir(dn):
if subdir.startswith(dnbase+':'):
for subsubdir in os.listdir(join(dn, subdir)):
if subsubdir.startswith('ttyUSB'):
return join('/dev', subsubdir)
def main():
client = CONNECT_TO_METER()
i=0;
addr=10;
while True:
i=i+1;
addr=addr+1;
try:
# regObject =client.read_holding_registers(address=26,count=1,unit=146)
# decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
# pow = -1*decoder.decode_16bit_int()
# print "Ckt 1 phase A active power : ", pow
regObject =client.read_holding_registers(address=0,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
volt = 0.01*decoder.decode_16bit_uint()
print "Phase A voltage :" , volt
# regObject =client.read_holding_registers(address=1,count=1,unit=146)
# decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
# volt = 0.01*decoder.decode_16bit_uint()
# print "Phase B voltage :" , volt
# regObject =client.read_holding_registers(address=2,count=1,unit=146)
# decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
# volt = 0.01*decoder.decode_16bit_uint()
# print "Phase C voltage :" , volt
# regObject =client.read_holding_registers(address=5,count=1,unit=146)
# decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
# value = 0.01*decoder.decode_16bit_uint()
# print "Line AB voltage :" , value
# CURRENT IA1
regObject =client.read_holding_registers(address=10,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 1 Phase A current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IA2
regObject =client.read_holding_registers(address=13,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 2 Phase A current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IB1
regObject =client.read_holding_registers(address=11,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 1 Phase B current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IB2
regObject =client.read_holding_registers(address=14,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 2 Phase B current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IC1
regObject =client.read_holding_registers(address=12,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 1 Phase C current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IC2
regObject =client.read_holding_registers(address=15,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 2 Phase C current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IA3
regObject =client.read_holding_registers(address=16,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 3 Phase A current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IA4
regObject =client.read_holding_registers(address=19,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 4 Phase A current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IB3
regObject =client.read_holding_registers(address=17,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 3 Phase B current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IB4
regObject =client.read_holding_registers(address=20,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 4 Phase B current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IC3
regObject =client.read_holding_registers(address=19,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 3 Phase C current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# CURRENT IC4
regObject =client.read_holding_registers(address=21,count=1,unit=146)
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
after_decimal = decoder.decode_16bit_uint()
decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
before_decimal=decoder.decode_8bit_int()
if (after_decimal%1000)>500:
before_decimal-=1
print "Ckt 4 Phase C current : " , before_decimal+(after_decimal%1000)/1000.0
# CURENT
# print "Circuit 1 phase A current : ", pow/volt
# regObject =client.read_holding_registers(address=9,count=1,unit=146)
# decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
# value = decoder.decode_16bit_uint()
# print "Frequency is : " , 0.01*value
#regObject =client.read_holding_registers(address=10,count=1,unit=146)
# decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
# value = decoder.decode_16bit_uint()
# print "Ckt 1 Phase A current : " , 0.01*value
# regObject =client.read_holding_registers(address=46,count=1,unit=146)
# decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
# value = -1*decoder.decode_16bit_int()
# print "Ckt 1 phase A reactive power: " , value
#regObject =client.read_holding_registers(address=66,count=1,unit=146)
#decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
#value = -1*decoder.decode_16bit_int()
#print "Ckt 1 phase A power factor: " , 0.001*value
# regObject =client.read_holding_registers(address=78,count=1,unit=146)
# decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
# value = -1*decoder.decode_16bit_int()
# print "Ckt 1 total power factor: " , 0.001*value
# regObject =client.read_holding_registers(address=82,count=1,unit=146)
# decoder =BinaryPayloadDecoder.fromRegisters(regObject.registers, endian=Endian.Big)
# value = decoder.decode_16bit_uint()
# print "Ckt 1 Phase A apparent power: " , value
# print regObject
# for j in range(0,50):
# print regObject.registers[j]
except Exception as e:
print "Internal Exception: Meter: "+'\n'+e.__str__()
client = None
client = CONNECT_TO_METER()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment