-
-
Save kennybytes/4bbbec29df847e94859f1ff28cc24244 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/gateway/src/fake_rx.py b/gateway/src/fake_rx.py | |
index d43b3e0..c9a99b0 100644 | |
--- a/gateway/src/fake_rx.py | |
+++ b/gateway/src/fake_rx.py | |
@@ -1,8 +1,9 @@ | |
import os | |
import errno | |
+import threading | |
import time | |
-def symlink_force(target, link_name): | |
+def symlink_override(target, link_name): | |
try: | |
os.symlink(target, link_name) | |
except OSError as e: | |
@@ -14,19 +15,20 @@ def symlink_force(target, link_name): | |
def valid_packets(): | |
packets = {} | |
- packets['heartbeat'] = "\x7e\x00\x16\x90\x00\x7d\x33\xa2\x00\x40\xe6\x4b\x5e\x03\xfd\x01\x00\x00\xff\xff\xf0\xfa\x23\x00\x2b\x02\xb2" | |
- packets['apple'] = "\x7e\x00\x22\x90\x00\x7d\x33\xa2\x00\x40\x9f\x27\xa7\x29\x6c\x01\x01\x00\xff\xff\x80\x6f\x69\x3d\x06\x0f\x71\x7d\x33\x5a\x8a\x01\x00\x76\x01\x22\x00\x6e\x09\x55" | |
- packets['cranberry'] = "\x7e\x00\x22\x90\x00\x7d\x33\xa2\x00\x41\x25\xe5\x88\x0c\x83\x01\x02\x00\xff\xff\x7c\xf3\x05\x00\xba\x0f\x5c\x08\x05\x00\x20\x73\x3b\x00\xdd\x8b\x01\x00\x7a" | |
- packets['dragonfruit'] = "\x7e\x00\x24\x90\x00\x7d\x33\xa2\x00\x40\xe6\x72\x7d\x5e\x30\x18\x01\x03\x00\xff\xff\x30\xc8\x07\x00\x6b\x0d\xf4\x00\x06\x00\x00\x00\xb6\x72\x37\x00\xfe\x8b\x01\x00\x00" | |
- | |
+ packets['heartbeat'] = b"\x6e\x00\x16\x90\x00\x7d\x33\xa2\x00\x40\xe6\x4b\x5e\x03\xfd\x01\x00\x00\xff\xff\xf0\xfa\x23\x00\x2b\x02\xb2" | |
+ packets['apple'] = b"\x7e\x00\x22\x90\x00\x7d\x33\xa2\x00\x40\x9f\x27\xa7\x29\x6c\x01\x01\x00\xff\xff\x80\x6f\x69\x3d\x06\x0f\x71\x7d\x33\x5a\x8a\x01\x00\x76\x01\x22\x00\x6e\x09\x55" | |
+ packets['cranberry'] = b"\x7e\x00\x22\x90\x00\x7d\x33\xa2\x00\x41\x25\xe5\x88\x0c\x83\x01\x02\x00\xff\xff\x7c\xf3\x05\x00\xba\x0f\x5c\x08\x05\x00\x20\x73\x3b\x00\xdd\x8b\x01\x00\x7a" | |
return packets | |
-master_fd, slave_fd = os.openpty() | |
-symlink_force(os.ttyname(slave_fd), './fakexbee') | |
- | |
+def fake_loop(master_fd): | |
+ packets = valid_packets() | |
+ while True: | |
+ for key in packets: | |
+ os.write(master_fd, bytes(packets[key])) | |
+ time.sleep(1) | |
-packets = valid_packets() | |
-while True: | |
- for key in packets: | |
- os.write(master_fd, bytes(packets[key])) | |
- time.sleep(1) | |
+def start_fake_serial(ptyPath): | |
+ master_fd, slave_fd = os.openpty() | |
+ symlink_override(os.ttyname(slave_fd), ptyPath) | |
+ t = threading.Thread(target=fake_loop, args=[master_fd,], daemon=True) | |
+ t.start() | |
\ No newline at end of file | |
diff --git a/gateway/src/gateway_server.py b/gateway/src/gateway_server.py | |
index 32bac51..9edfbb6 100644 | |
--- a/gateway/src/gateway_server.py | |
+++ b/gateway/src/gateway_server.py | |
@@ -1,17 +1,14 @@ | |
#!/usr/bin/env python | |
# This script is the main gateway server | |
-from xbee import ZigBee | |
import sys | |
import os | |
-import serial | |
import datetime | |
-import struct | |
-import collections | |
import threading | |
-from .decoder import Decoder | |
-from .xbee_gateway import XBeeGateway | |
+from decoder import Decoder | |
+from xbee_gateway import XBeeGateway | |
+from fake_rx import start_fake_serial | |
args = sys.argv | |
@@ -28,6 +25,10 @@ if args[1] == 'auto' or args[1] == 'a': | |
# set port to usb FTDI Device | |
port = '/dev/serial/by-id/usb-FTDI_FT231X_USB_UART_DN01DBGI-if00-port0' | |
+elif args[1] == 'fake': | |
+ port = './ttyFake' | |
+ start_fake_serial(port) | |
+ | |
# if we have no special arguments | |
# port can be accessed by /dev/serial/by-id/<device name> as opposed to /dev/tty/USB0. The latter will never change | |
# use the line of code below when running simulation to manually enter port | |
@@ -74,11 +75,12 @@ while True: | |
decoder.register_callback(decoder.print_dictionary) | |
decoder.register_callback(decoder.write_to_file) | |
- decoder.register_callback(decoder.write_to_db) | |
+ if port != './ttyFake': | |
+ decoder.register_callback(decoder.write_to_db) | |
xbg.register_callback(decoder.decode_data) | |
xbg.setup_xbee(port, baud_rate) | |
- xbg.begin_test(1,t_flag,kill_flag) | |
+ xbg.begin_loop() | |
#newThread = threading.Thread(target=xbg.begin_test, args=(1,t_flag,kill_flag)) | |
#newThread.daemon = True | |
#newThread.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment