Skip to content

Instantly share code, notes, and snippets.

@danricho
Created January 1, 2021 05:04
Show Gist options
  • Save danricho/d3f406ff48f7cd428eafcf169f6bc3a4 to your computer and use it in GitHub Desktop.
Save danricho/d3f406ff48f7cd428eafcf169f6bc3a4 to your computer and use it in GitHub Desktop.
Arduino Python Comms
This is a simple communication scheme between python and Arduino over Serial.
int TO_RPI = 4;
int FR_RPI = 5;
// NON BLOCKING SERIAL!
int millisSince(unsigned long start){
return (millis()-start);
}
void waitUntil(int millis, unsigned long start){
delay(millis - millisSince(start));
}
int headerFromMillis(int millis){
if ((millis > 85) || (millis < 15)){ return 0; }
else if ((millis > 15) && (millis < 25)) { return 1; }
else if ((millis > 35) && (millis < 45)) { return 2; }
else if ((millis > 55) && (millis < 65)) { return 3; }
else if ((millis > 75) && (millis < 85)) { return 4; }
}
void writeBit(bool state){
digitalWrite(4, LOW);
if (state){ delay(10); } else { delay(5); }
digitalWrite(4, HIGH);
if (state){ delay(5); } else { delay(10); }
}
int message = 1;
long centiVolts;
byte encodedVolts;
void setup() {
pinMode(LED_BUILTIN, OUTPUT);
pinMode(TO_RPI, OUTPUT);
pinMode(FR_RPI, INPUT);
}
void loop() {
unsigned long loop_start = millis();
digitalWrite(LED_BUILTIN, HIGH);
digitalWrite(TO_RPI, LOW);
delay(20 * (message));
digitalWrite(TO_RPI, HIGH);
if (message == 1){
delay(20); // make sure RPi is ready
// STANDIN - will be a analogue read
centiVolts = random(0, 1000);
encodedVolts = map(centiVolts,0,1000,0,255);
// Serial.print("Data binary (big-endian): ");
for (byte mask = 00000001; mask>0; mask <<= 1){
if (encodedVolts & mask) {
// Serial.print("1");
writeBit(1);
} else {
// Serial.print("0");
writeBit(0);
}
}
}
Serial.print("Sent ");
Serial.print(message);
Serial.print(", Pulse Length: ");
Serial.print(20 * (message));
if (message == 1){
Serial.print(", Volts: ");
Serial.print(float(centiVolts)/100.0);
// Serial.print(", Data: ");
// Serial.print(encodedVolts);
}
Serial.println("");
digitalWrite(LED_BUILTIN, LOW);
waitUntil(950, loop_start);
int previous_state = digitalRead(FR_RPI);
int this_state;
unsigned long fall_start = 0;
unsigned long pulseLength = 0;
int header = 0;
while (millisSince(loop_start) < 1950){
this_state = digitalRead(FR_RPI);
if (fall_start == 0){
if (previous_state) {
if (!this_state) { // FALLING EDGE
fall_start = millis();
}
}
}
else {
if (this_state) { // RISING EDGE
pulseLength = millisSince(fall_start);
header = headerFromMillis(pulseLength);
break;
}
}
}
Serial.print("Got ");
Serial.print(header);
Serial.print(", Pulse Length: ");
Serial.print(pulseLength);
Serial.println("");
Serial.println("");
message = (message % 4) + 1;
waitUntil(2000, loop_start);
}
import RPi.GPIO as GPIO
from datetime import datetime
import random
import time
import struct
TO_RPI = 4
FR_RPI = 5
def myround(x, base=5):
return base * round(x/base)
def delay(millis):
time.sleep(max(0,millis/1000))
def millisSince(start):
return round((datetime.now() - start).microseconds / 100) / 10
def waitUntil(millis, start):
delay(millis - millisSince(start));
def headerFromMillis(millis):
if millis > 85 or millis < 15:
return None # NOT A MESSAGE TYPE HEADER - PROBABLY END OF DATA
else:
return round(myround(millis, 20) / 20)
def getBit(pin):
GPIO.wait_for_edge(pin, GPIO.FALLING, timeout=1000)
rise = datetime.now()
GPIO.wait_for_edge(pin, GPIO.RISING, timeout=1000)
return round((datetime.now() - rise).microseconds / 100) / 10 > 7.5
GPIO.setmode(GPIO.BCM)
GPIO.setup(TO_RPI, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(FR_RPI, GPIO.OUT)
try:
while(1):
GPIO.wait_for_edge(TO_RPI, GPIO.FALLING, timeout=1000)
loop_start = datetime.now() # UPS SEND SYNC.
GPIO.wait_for_edge(TO_RPI, GPIO.RISING, timeout=1000)
pulseLength = millisSince(loop_start)
header = headerFromMillis(pulseLength)
if header:
voltage = None
if header == 1:
number = 0
for i in range(0,8):
number += (getBit(4) << i)
# print("Data binary (little-endian): {:08b}".format(number))
voltage = round(number / 255.0 * 1000) / 100.0
print("Got " + str(header) + ", Pulse Length: " + str(pulseLength) + ", Volts: {:.2f}".format(voltage));
else:
print("Got " + str(header) + ", Pulse Length: " + str(pulseLength));
waitUntil(1000, loop_start);
GPIO.output(FR_RPI, GPIO.LOW)
time.sleep(((header * 20))/1000)
GPIO.output(FR_RPI, GPIO.HIGH)
print("Sent " + str(header) + ", Pulse Length: " + str(pulseLength));
print("")
except KeyboardInterrupt:
None
GPIO.cleanup() # clean up GPIO on normal exit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment