Skip to content

Instantly share code, notes, and snippets.

@philippkraft
Last active March 25, 2021 08:15
Show Gist options
  • Save philippkraft/f941a7fe2332158724236ecccb25edaf to your computer and use it in GitHub Desktop.
Save philippkraft/f941a7fe2332158724236ecccb25edaf to your computer and use it in GitHub Desktop.
"""
A simple and ugly tool to program SDI12 sensors
@author: philippkraft
"""
# The serial port, for linux something like /dev/ttyUSB1, None meens use mock
port = None
# Timeout for the serial port
timeout = 0.3
import tkinter as tk
import serial
import contextlib
import time
class MockSerial:
"""
Mocks a serial port for testing purposes
"""
port='COMock'
value = ''
def write(self, data: bytes):
content = data.decode()
self.adress= content[0]
self.value = content[1:]
print(data.decode())
def read(self, n: int=0):
time.sleep(timeout)
if self.adress == '?':
return '0'.encode()
elif self.value[0] == 'I':
return f'{self.adress}MOCKDevice.ILR.000'.encode()
elif self.value[0] == 'A':
return self.value[1].encode()
elif self.value[0] == 'R':
return (self.adress + '+10.3-17.4').encode()
else:
return (self.adress + self.value).encode()
class SDI12:
"""
A simple wrapper to create SDI12 commands
"""
def __init__(self, stream):
self.stream = stream
def __call__(self, adress: str, *command: str):
command = ''.join(command)
cmd = f'{adress}{command}!'.encode('ascii')
self.stream.write(cmd)
response = self.stream.read(100)
return cmd.decode(), response.decode().strip()
def __repr__(self):
return 'SDI12@' + self.stream.port
class Application(tk.Frame):
"""
The graphic user interface
"""
def __init__(self, sdi12: SDI12):
self.master = tk.Tk()
self.master.title(sdi12)
super().__init__(self.master)
self.pack()
self.create_widgets()
self.sdi12 = sdi12
def grid_layout(self, widgets):
"""
Widgets is a list of list of tk widgets
"""
for row, row_list in enumerate(widgets):
for col, widget in enumerate(row_list):
if widget:
widget.grid(column=col, row=row, ipadx=5, pady=5, sticky=tk.W)
def create_widgets(self):
self.adress= tk.StringVar()
self.new_adress= tk.StringVar()
self.command = tk.StringVar()
self.response_label = tk.Label(self)
self.info_label = tk.Label(self)
# self.quit = tk.Button(self, text="close", command=self.master.destroy)
self.widgets = [
[
tk.Button(self, text='adress:', command=self.scan_bus),
tk.Entry(self, width=3, textvariable=self.adress)
],
[
tk.Button(self, text='info', command=self.info),
self.info_label
],
[
tk.Button(self, text='change adress', command=self.change_adress),
tk.Entry(self, width=3, textvariable=self.new_adress)
],
[
tk.Button(self, text='run other command', command=self.do),
tk.Entry(self, width=10, textvariable=self.command)
],
[
tk.Button(self, text='Read buffer', command=self.read_buffer)
]
]
self.grid_layout(self.widgets)
self.response_label.grid(column=0, row=len(self.widgets), columnspan=2)
def log(self, *text):
"""Writes a message on the window"""
self.response_label['bg'] = 'SystemButtonFace'
self.response_label['fg'] = 'black'
self.response_label['text'] = ' '.join(text)
def warning(self, *text):
"""Writes a warning on the window"""
self.response_label['bg'] = 'red'
self.response_label['fg'] = 'white'
self.response_label['text'] = ' '.join(text)
def scan_bus(self):
"""Scans all adresses, use only with a single device"""
c, r = self.sdi12('', '?')
self.adress.set(r)
self.log(c, '->', r)
def info(self):
"""Run the SDI12 (I)nfo command and print the information"""
a = self.adress.get()
if a:
c, r = self.sdi12(a, 'I')
self.log(c, '->', r)
self.info_label['text'] = r
else:
self.warning('No adress!')
def change_adress(self):
"""Change the adress"""
a = self.adress.get()
if not a:
self.warning('No adress!')
return
n = self.new_adress.get()
if n:
c, r = self.sdi12(a, 'A', n)
self.log(c, '->', r)
self.adress.set(r)
self.new_adress.set('')
self.info_label['text'] = ''
else:
self.warning('No new adress!')
def do(self):
"""Perform any command"""
a = self.adress.get()
if not a:
self.warning('No adress!')
return
c = self.command.get()
if c:
c, r = self.sdi12(a, c)
self.log(c, ' -> ', r)
else:
self.warning('No command!')
def read_buffer(self):
"""Read and clear the buffer"""
r = self.sdi12.stream.read(1000).decode()
self.log(r)
@contextlib.contextmanager
def open_sdi12(port:str, timeout: float) -> SDI12:
"""
Opens a serial port (or MochSerial if no port is given) and returns
an SDI12 object
Parameters
----------
port :
The port name None for an offline test case, on Windows something
like 'COM6' and on linux / MacOS eg. '/dev/ttyUSB0'
timeout : float
Time to scan for a response
Yields
------
SDI12
A new SDI12 object with an open serial connection
"""
if not port:
yield SDI12(MockSerial())
else:
with serial.Serial(port, timeout=timeout) as S:
yield SDI12(S)
print(f'{port} closed')
if __name__ == '__main__':
with open_sdi12(port, timeout) as sdi12:
app = Application(sdi12)
app.mainloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment