Skip to content

Instantly share code, notes, and snippets.

@include-yy
Last active September 2, 2023 10:57
Show Gist options
  • Save include-yy/06bb825ff3b5df75a7cc87c7b1a897b3 to your computer and use it in GitHub Desktop.
Save include-yy/06bb825ff3b5df75a7cc87c7b1a897b3 to your computer and use it in GitHub Desktop.
soft serial port implemented in python

使用 Python 实现的软串口

该程序实现的功能是:两个线程通过 UART 串口协议进行通信,发送比特的时间间隔为 1s,虚拟时钟为 4hz。线程在收到数字后加 1 重新发送,直到接收的数字大于 3 为止。

直接使用 Python 运行即可:

python port.py

你会看到:

computer2: start receive
computer1: start send 1
computer1: end send
computer1: start receive
computer2: end receive 1
computer2: start send 2
computer2: end send
computer2: start receive
computer1: end receive 2
computer1: start send 3
computer1: end send
computer1: start receive
computer2: end receive 3
computer2: start send 4
computer2: end send
computer1: end receive 4
import time
import threading
IDLE = 0
RX_START = 1
RECEIVE = 2
RX_STOP = 3
TX_START = 4
SEND = 5
TX_STOP = 6
def sleep250():
time.sleep(0.25)
def sleep750():
time.sleep(0.75)
def sleep1000():
time.sleep(1.0)
def byte2int(array):
acc = 0
for i in range(0, 8):
acc = acc * 2 + array[7 - i]
return acc
def int2byte(val):
arr = [0, 0, 0, 0, 0, 0, 0, 0]
for i in range(0, 8):
arr[i] = val & 1
val = val // 2
return arr
class UART:
def __init__(self, TX, RX):
# 绑定数据线
self.TX = TX
self.RX = RX
def send(self, tx_byte):
state = IDLE
# 时钟与状态机
while state != TX_STOP:
if (state == IDLE):
state = TX_START
elif (state == TX_START):
self.TX[0] = 0
sleep1000()
state = SEND
elif (state == SEND):
self.TX[0] = tx_byte[0]
sleep1000()
self.TX[0] = tx_byte[1]
sleep1000()
self.TX[0] = tx_byte[2]
sleep1000()
self.TX[0] = tx_byte[3]
sleep1000()
self.TX[0] = tx_byte[4]
sleep1000()
self.TX[0] = tx_byte[5]
sleep1000()
self.TX[0] = tx_byte[6]
sleep1000()
self.TX[0] = tx_byte[7]
sleep1000()
state = TX_STOP
else:
raise Exception('wtfsend')
self.TX[0] = 1
sleep1000()
state = IDLE
return
def rec(self):
state = IDLE
RX_cache = self.RX[0]
data = [0, 0, 0, 0, 0, 0, 0, 0]
while state != RX_STOP:
if (state == IDLE):
if (self.RX[0] == 0 and RX_cache == 1):
state = RX_START
else:
RX_cache = self.RX[0]
sleep250()
elif (state == RX_START):
state = RECEIVE
sleep1000()
elif (state == RECEIVE):
sleep250()
data[0] = self.RX[0]
sleep1000()
data[1] = self.RX[0]
sleep1000()
data[2] = self.RX[0]
sleep1000()
data[3] = self.RX[0]
sleep1000()
data[4] = self.RX[0]
sleep1000()
data[5] = self.RX[0]
sleep1000()
data[6] = self.RX[0]
sleep1000()
data[7] = self.RX[0]
sleep750()
state = RX_STOP
else:
raise Exception('wtfrecieve')
sleep1000()
state = IDLE
return data
rx = [1]
tx = [1]
u1 = UART(tx, rx)
u2 = UART(rx, tx)
def computer1():
cnt = 0
while cnt < 3:
cnt = cnt + 1
cntb = int2byte(cnt)
print ("computer1: start send %d" % cnt)
u1.send(cntb)
print ("computer1: end send")
print ("computer1: start receive")
cntb = u1.rec()
cnt = byte2int(cntb)
print ("computer1: end receive %d" % cnt)
def computer2():
cnt = 0
while cnt < 3:
print ("computer2: start receive")
cntb = u2.rec()
cnt = byte2int(cntb)
print ("computer2: end receive %d" % cnt)
cnt = cnt + 1
cntb = int2byte(cnt)
print ("computer2: start send %d" % cnt)
u2.send(cntb)
print ("computer2: end send")
x = threading.Thread(target=computer2)
x.start()
time.sleep(3)
x = threading.Thread(target=computer1)
x.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment