Skip to content

Instantly share code, notes, and snippets.

@kennybytes
Created October 9, 2022 16:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kennybytes/4bbbec29df847e94859f1ff28cc24244 to your computer and use it in GitHub Desktop.
Save kennybytes/4bbbec29df847e94859f1ff28cc24244 to your computer and use it in GitHub Desktop.
diff --git a/gateway/src/fake_rx.py b/gateway/src/fake_rx.py
index d43b3e0..c9a99b0 100644
--- a/gateway/src/fake_rx.py
+++ b/gateway/src/fake_rx.py
@@ -1,8 +1,9 @@
import os
import errno
+import threading
import time
-def symlink_force(target, link_name):
+def symlink_override(target, link_name):
try:
os.symlink(target, link_name)
except OSError as e:
@@ -14,19 +15,20 @@ def symlink_force(target, link_name):
def valid_packets():
packets = {}
- packets['heartbeat'] = "\x7e\x00\x16\x90\x00\x7d\x33\xa2\x00\x40\xe6\x4b\x5e\x03\xfd\x01\x00\x00\xff\xff\xf0\xfa\x23\x00\x2b\x02\xb2"
- packets['apple'] = "\x7e\x00\x22\x90\x00\x7d\x33\xa2\x00\x40\x9f\x27\xa7\x29\x6c\x01\x01\x00\xff\xff\x80\x6f\x69\x3d\x06\x0f\x71\x7d\x33\x5a\x8a\x01\x00\x76\x01\x22\x00\x6e\x09\x55"
- packets['cranberry'] = "\x7e\x00\x22\x90\x00\x7d\x33\xa2\x00\x41\x25\xe5\x88\x0c\x83\x01\x02\x00\xff\xff\x7c\xf3\x05\x00\xba\x0f\x5c\x08\x05\x00\x20\x73\x3b\x00\xdd\x8b\x01\x00\x7a"
- packets['dragonfruit'] = "\x7e\x00\x24\x90\x00\x7d\x33\xa2\x00\x40\xe6\x72\x7d\x5e\x30\x18\x01\x03\x00\xff\xff\x30\xc8\x07\x00\x6b\x0d\xf4\x00\x06\x00\x00\x00\xb6\x72\x37\x00\xfe\x8b\x01\x00\x00"
-
+ packets['heartbeat'] = b"\x6e\x00\x16\x90\x00\x7d\x33\xa2\x00\x40\xe6\x4b\x5e\x03\xfd\x01\x00\x00\xff\xff\xf0\xfa\x23\x00\x2b\x02\xb2"
+ packets['apple'] = b"\x7e\x00\x22\x90\x00\x7d\x33\xa2\x00\x40\x9f\x27\xa7\x29\x6c\x01\x01\x00\xff\xff\x80\x6f\x69\x3d\x06\x0f\x71\x7d\x33\x5a\x8a\x01\x00\x76\x01\x22\x00\x6e\x09\x55"
+ packets['cranberry'] = b"\x7e\x00\x22\x90\x00\x7d\x33\xa2\x00\x41\x25\xe5\x88\x0c\x83\x01\x02\x00\xff\xff\x7c\xf3\x05\x00\xba\x0f\x5c\x08\x05\x00\x20\x73\x3b\x00\xdd\x8b\x01\x00\x7a"
return packets
-master_fd, slave_fd = os.openpty()
-symlink_force(os.ttyname(slave_fd), './fakexbee')
-
+def fake_loop(master_fd):
+ packets = valid_packets()
+ while True:
+ for key in packets:
+ os.write(master_fd, bytes(packets[key]))
+ time.sleep(1)
-packets = valid_packets()
-while True:
- for key in packets:
- os.write(master_fd, bytes(packets[key]))
- time.sleep(1)
+def start_fake_serial(ptyPath):
+ master_fd, slave_fd = os.openpty()
+ symlink_override(os.ttyname(slave_fd), ptyPath)
+ t = threading.Thread(target=fake_loop, args=[master_fd,], daemon=True)
+ t.start()
\ No newline at end of file
diff --git a/gateway/src/gateway_server.py b/gateway/src/gateway_server.py
index 32bac51..9edfbb6 100644
--- a/gateway/src/gateway_server.py
+++ b/gateway/src/gateway_server.py
@@ -1,17 +1,14 @@
#!/usr/bin/env python
# This script is the main gateway server
-from xbee import ZigBee
import sys
import os
-import serial
import datetime
-import struct
-import collections
import threading
-from .decoder import Decoder
-from .xbee_gateway import XBeeGateway
+from decoder import Decoder
+from xbee_gateway import XBeeGateway
+from fake_rx import start_fake_serial
args = sys.argv
@@ -28,6 +25,10 @@ if args[1] == 'auto' or args[1] == 'a':
# set port to usb FTDI Device
port = '/dev/serial/by-id/usb-FTDI_FT231X_USB_UART_DN01DBGI-if00-port0'
+elif args[1] == 'fake':
+ port = './ttyFake'
+ start_fake_serial(port)
+
# if we have no special arguments
# port can be accessed by /dev/serial/by-id/<device name> as opposed to /dev/tty/USB0. The latter will never change
# use the line of code below when running simulation to manually enter port
@@ -74,11 +75,12 @@ while True:
decoder.register_callback(decoder.print_dictionary)
decoder.register_callback(decoder.write_to_file)
- decoder.register_callback(decoder.write_to_db)
+ if port != './ttyFake':
+ decoder.register_callback(decoder.write_to_db)
xbg.register_callback(decoder.decode_data)
xbg.setup_xbee(port, baud_rate)
- xbg.begin_test(1,t_flag,kill_flag)
+ xbg.begin_loop()
#newThread = threading.Thread(target=xbg.begin_test, args=(1,t_flag,kill_flag))
#newThread.daemon = True
#newThread.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment