Skip to content

Instantly share code, notes, and snippets.

@duhanebel
Created March 28, 2021 18:25
Show Gist options
  • Save duhanebel/68063b43a2548ec6716262a01cb036a3 to your computer and use it in GitHub Desktop.
Save duhanebel/68063b43a2548ec6716262a01cb036a3 to your computer and use it in GitHub Desktop.
Proxy service that forward everything from/to the fanju weather station, intercepting the local data and uploading it to influxDB
#!/usr/bin/env python
import argparse
import signal
import logging
import select
import socket
import datetime
from time import gmtime, strftime
from influxdb import InfluxDBClient
from influxdb import SeriesHelper
# InfluxDB connections settings
host = 'URL'
port = 8086
user = 'user'
password = 'password'
dbname = 'fanju_weather'
class Measurement:
sensorID = "000"
humidity = 0
battery = 0
def __init__(self, id, humidity, battery):
sensorID = id
self.humidity = humidity
self.battery = battery
dbclient = InfluxDBClient(host, port, user, password, dbname)
class FanjuSeries(SeriesHelper):
class Meta:
client = dbclient
# The series name must be a string. Add dependent fields/tags
# in curly brackets.
series_name = 'sensordata'
# Defines all the fields in this time series.
fields = ['temperature', 'humidity']
# Defines all the tags for the series.
tags = ['device']
# Defines the number of data points to store prior to writing
# on the wire.
bulk_size = 4
# autocommit must be set to True when using bulk_size
autocommit = True
FORMAT = '%(asctime)-15s %(message)s'
logging.basicConfig(format=FORMAT)
LOGGER = logging.getLogger()
def remove_header_footer(str):
assert(str.startswith("aa3c5701009569f05180"))
assert(str.endswith("cc3e"))
new_str = str[20:-4]
return new_str
def is_local_data_request(str):
return str.startswith("53300100")
def is_weather_request(str):
return str.startswith("5231010000008104")
def is_stats_request(str):
return str.startswith("5230010000008004")
LOCAL_DATA_HANDLER = lambda req:req
REMOTE_DATA_HANDLER = lambda x:x
BUFFER_SIZE = 2 ** 10 # 1024. Keep buffer size as power of 2.
def ftoc(f):
return round((f - 32) * 5/9, 1)
def convert_num(num):
return (num - 900) / 10
def process_local_request(str):
tempi = ftoc(convert_num(int(str[32:34] + str[30:32], 16)))
humi = int(str[34:36], 16)
temp1 = ftoc(convert_num(int(str[50:52] + str[48:50], 16)))
hum1 = int(str[52:54], 16)
temp2 = ftoc(convert_num(int(str[68:70] + str[66:68], 16)))
hum2 = int(str[70:72], 16)
temp3 = ftoc(convert_num(int(str[86:88] + str[84:86], 16)))
hum3 = int(str[88:90], 16)
LOGGER.debug('Processing request: i: {}C {}%, 1: {}C {}%, 2: {}C {},% 3: {}C {}%'.format(tempi, humi, temp1, hum1, temp2, hum2, temp3, hum3))
FanjuSeries(device="inside", temperature=tempi, humidity=humi)
FanjuSeries(device="outside1", temperature=temp1, humidity=hum1)
FanjuSeries(device="outside2", temperature=temp2, humidity=hum2)
FanjuSeries(device="outside3", temperature=temp3, humidity=hum3)
FanjuSeries.commit()
def udp_proxy(src, dst):
"""Run UDP proxy.
Arguments:
src -- Source IP address and port string. I.e.: '127.0.0.1:8000'
dst -- Destination IP address and port. I.e.: '127.0.0.1:8888'
"""
LOGGER.debug('Starting UDP proxy...')
LOGGER.debug('Src: {}'.format(src))
LOGGER.debug('Dst: {}'.format(dst))
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
proxy_socket.bind(ip_to_tuple(src))
client_address = None
server_address = ip_to_tuple(dst)
LOGGER.debug('Looping proxy (press Ctrl-Break to stop)...')
while True:
data, address = proxy_socket.recvfrom(BUFFER_SIZE)
hex_data_trim = remove_header_footer(data.hex())
if client_address == None:
client_address = address
if address == client_address:
LOGGER.debug("")
LOGGER.debug("REQUEST: {}".format(hex_data_trim))
data = LOCAL_DATA_HANDLER(data)
if is_local_data_request(hex_data_trim) == True:
LOGGER.debug("Received local data request")
process_local_request(hex_data_trim)
proxy_socket.sendto(data, server_address)
elif address == server_address:
LOGGER.debug("RESPONSE: Received from server: {}".format(hex_data_trim))
data = REMOTE_DATA_HANDLER(data)
proxy_socket.sendto(data, client_address)
client_address = None
else:
LOGGER.warning('Unknown address: {}'.format(str(address)))
# end-of-function udp_proxy
def tcp_proxy(src, dst):
"""Run TCP proxy.
Arguments:
src -- Source IP address and port string. I.e.: '127.0.0.1:8000'
dst -- Destination IP address and port. I.e.: '127.0.0.1:8888'
"""
LOGGER.debug('Starting TCP proxy...')
LOGGER.debug('Src: {}'.format(src))
LOGGER.debug('Dst: {}'.format(dst))
sockets = []
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(ip_to_tuple(src))
s.listen(1)
s_src, _ = s.accept()
s_dst = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s_dst.connect(ip_to_tuple(dst))
sockets.append(s_src)
sockets.append(s_dst)
while True:
s_read, _, _ = select.select(sockets, [], [])
for s in s_read:
data = s.recv(BUFFER_SIZE)
if s == s_src:
d = LOCAL_DATA_HANDLER(data)
s_dst.sendall(d)
elif s == s_dst:
d = REMOTE_DATA_HANDLER(data)
s_src.sendall(d)
# end-of-function tcp_proxy
def ip_to_tuple(ip):
"""Parse IP string and return (ip, port) tuple.
Arguments:
ip -- IP address:port string. I.e.: '127.0.0.1:8000'.
"""
ip, port = ip.split(':')
return (ip, int(port))
# end-of-function ip_to_tuple
def main():
"""Main method."""
parser = argparse.ArgumentParser(description='TCP/UPD proxy.')
# TCP UPD groups
proto_group = parser.add_mutually_exclusive_group(required=True)
proto_group.add_argument('--tcp', action='store_true', help='TCP proxy')
proto_group.add_argument('--udp', action='store_true', help='UDP proxy')
parser.add_argument('-s', '--src', required=True, help='Source IP and port, i.e.: 127.0.0.1:8000')
parser.add_argument('-d', '--dst', required=True, help='Destination IP and port, i.e.: 127.0.0.1:8888')
output_group = parser.add_mutually_exclusive_group()
output_group.add_argument('-q', '--quiet', action='store_true', help='Be quiet')
output_group.add_argument('-v', '--verbose', action='store_true', help='Be loud')
args = parser.parse_args()
if args.quiet:
LOGGER.setLevel(logging.CRITICAL)
if args.verbose:
LOGGER.setLevel(logging.NOTSET)
if args.udp:
udp_proxy(args.src, args.dst)
elif args.tcp:
tcp_proxy(args.src, args.dst)
# end-of-function main
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment