Created
June 20, 2019 08:04
-
-
Save maxam2017/deef7eba5b5055c2737d54d24ea625a9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import RPi.GPIO as GPIO | |
import os | |
import time | |
import logging | |
import bluetooth | |
import subprocess | |
import requests | |
import base64 | |
''' | |
<layout> | |
1(o)(o) 2---grey | |
sensor--3 (o) 4 | |
5 (x) 6 | |
red---7 8---orange | |
black---9(x) 10---green | |
yellow--11 12---blue | |
purple--13 (x)14 | |
15 16---black<trig> | |
17 18---white<echo> | |
''' | |
class PiBin: | |
def __init__(self, en1, en2, in1, in2, in3, in4, ir, trig, echo): | |
GPIO.setmode(GPIO.BOARD) | |
# find self bd_address | |
self.bd_addr = subprocess.check_output("hciconfig | grep -o -E '([[:xdigit:]]{1,2}:){5}[[:xdigit:]]{1,2}'",shell=True)[:-1].decode('utf-8') | |
# set macro | |
self.width = 1024 | |
self.height = 768 | |
self.fn = 'test.png' | |
self.done = True | |
### | |
self.__EN1 = en1 # L293D enable1 | |
self.__EN2 = en2 # L293D enable2 | |
self.__IN1 = in1 # L293D in1 | |
self.__IN2 = in2 # L293D in2 | |
self.__IN3 = in3 # L293D in3 | |
self.__IN4 = in4 # L293D in4 | |
self.__IR = ir # IR sensor outport | |
self.__TRIG = trig # HC-SR04 trigger pin | |
self.__ECHO = echo # HC-SR04 echo pin | |
GPIO.setup(self.__EN1, GPIO.OUT) | |
GPIO.setup(self.__EN2, GPIO.OUT) | |
GPIO.setup(self.__IN1, GPIO.OUT) | |
GPIO.setup(self.__IN2, GPIO.OUT) | |
GPIO.setup(self.__IN3, GPIO.OUT) | |
GPIO.setup(self.__IN4, GPIO.OUT) | |
GPIO.setup(self.__IR, GPIO.IN) | |
GPIO.setup(self.__TRIG, GPIO.OUT) | |
GPIO.setup(self.__ECHO, GPIO.IN) | |
self.reset() | |
def capture_or_receive(self): | |
hash_map = { | |
'glass' :'B8:27:EB:C6:8E:FF', # maxam | |
'plastic': 'B8:27:EB:57:D8:D0', # joying | |
'cardboard': 'B8:27:EB:BF:91:E8', # dada | |
'paper': 'B8:27:EB:BF:91:E8', # dada | |
} | |
if self.bd_addr == 'B8:27:EB:C6:8E:FF': #CLIENT | |
while True: | |
# shooting image | |
os.system('raspistill -n -t 1 -o {} -e png -w {} -h {}'.format(self.fn,self.width,self.height)) | |
img = open("test.png", "rb") | |
b64 = base64.b64encode(img.read()) | |
payload = {'pid': b64} | |
session = requests.Session() | |
res = session.post('192.168.0.100:5000/api/score_board/record',data= payload) | |
# get return | |
result = res.content.decode('utf-8') | |
# assign rubbish bin | |
if result in hash_map: | |
if hash_map[result] == 'B8:27:EB:C6:8E:FF': | |
return True | |
else: | |
socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM) | |
logging.info('[{}] notify `{}` bin'.format(self.bd_addr,result)) | |
socket.connect((hash_map[result], 1)) | |
socket.send('GOGO') | |
socket.close() | |
logging.info('[{}] wait for `{}` bin'.format(self.bd_addr,result)) | |
socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM) | |
port = 1 | |
socket.bind(("",port)) | |
socket.listen(1) | |
client = socket.accept() | |
data,addr = client.recv(1024) | |
if addr == hash_map[result]: | |
return False # work is done | |
else:#SERVER | |
if self.done == True: | |
logging.info('[{}] wake up B8:27:EB:C6:8E:FF'.format(self.bd_addr)) | |
socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM) | |
socket.connect(('B8:27:EB:C6:8E:FF', 1)) | |
socket.send('DONE') | |
socket.close() | |
self.done = False | |
logging.info('[{}] wait for B8:27:EB:C6:8E:FF'.format(self.bd_addr)) | |
socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM) | |
port = 1 | |
socket.bind(("",port)) | |
socket.listen(1) | |
client = socket.accept() | |
data,addr = client.recv(1024) | |
logging.info('[{}] receive a request....'.format(self.bd_addr)) | |
return addr == 'B8:27:EB:C6:8E:FF' | |
def go_ahead(self): | |
logging.info("Go ahead") | |
'''it's TRICKY! because GPIO.output's return value is `NONE`''' | |
# left wheel | |
GPIO.output(self.__IN1, GPIO.HIGH) or GPIO.output(self.__IN2, GPIO.LOW) | |
# right wheel | |
GPIO.output(self.__IN3, GPIO.HIGH) or GPIO.output(self.__IN4, GPIO.LOW) | |
self.start = time.time() | |
def count_distance(self, start, end): | |
t = end - start | |
d = t * 343 | |
d = d / 2 | |
return d | |
def is_safe(self, distance): | |
# the unit of distance is meter | |
''' Initalize HC-SR04 ''' | |
GPIO.output(self.__TRIG, GPIO.HIGH) | |
time.sleep(0.00001) | |
GPIO.output(self.__TRIG, GPIO.LOW) | |
pulse_start = 0 | |
pulse_end = 0 | |
while GPIO.input(self.__ECHO) == GPIO.LOW: | |
pulse_start = time.time() | |
#logging.info(pulse_start) | |
while GPIO.input(self.__ECHO) == GPIO.HIGH: | |
pulse_end = time.time() | |
#logging.info(pulse_end) | |
d = self.count_distance(pulse_start, pulse_end) | |
logging.info('Obstacle is far from {:.2f}m'.format(d)) | |
if d <= distance: | |
return False | |
else: | |
return True | |
'''Need Discussion!!! ''' | |
def go_back(self): | |
logging.info("Go back") | |
duration = self.end - self.start | |
self.start = time.time() | |
# left wheel | |
GPIO.output(self.__IN1, GPIO.LOW) or GPIO.output(self.__IN2, GPIO.HIGH) | |
# right wheel | |
GPIO.output(self.__IN3, GPIO.LOW) or GPIO.output(self.__IN4, GPIO.HIGH) | |
while (time.time() - self.start) < duration: | |
pass | |
self.stop() | |
def stop(self): | |
logging.info("Stop") | |
# left wheel | |
GPIO.output(self.__IN1, GPIO.LOW) or GPIO.output(self.__IN2, GPIO.LOW) | |
# right wheel | |
GPIO.output(self.__IN3, GPIO.LOW) or GPIO.output(self.__IN4, GPIO.LOW) | |
self.end = time.time() | |
def wait_for_garbage_in(self): | |
return GPIO.input(self.__IR) == True | |
def reset(self): | |
# left wheel | |
GPIO.output(self.__IN1, GPIO.LOW) or GPIO.output(self.__IN2, GPIO.LOW) | |
# right wheel | |
GPIO.output(self.__IN3, GPIO.LOW) or GPIO.output(self.__IN4, GPIO.LOW) | |
GPIO.output(self.__TRIG, GPIO.HIGH) | |
time.sleep(0.00001) | |
GPIO.output(self.__TRIG, GPIO.LOW) | |
def main(): | |
logging.basicConfig(level=logging.DEBUG) | |
Test = PiBin(True,7,8,11,13,10,12,3,16,18) # obeyed by GPIO.BOARD | |
try: | |
while True: | |
# every 0.5sec | |
if Test.capture_or_receive(): | |
# go ahead until bump into obstacle | |
Test.go_ahead() | |
# check safety every 0.5 sec | |
while Test.is_safe(.1): | |
time.sleep(.5) | |
# Danger!!! Need `STOP` right now | |
Test.stop() | |
# open garbage bin | |
# detect into it every 0.5sec - IR sensor | |
while Test.wait_for_garbage_in(): | |
time.sleep(.5) | |
# close garbage bin | |
Test.go_back() # go back to orignal position `use time to record | |
Test.reset() | |
else: | |
time.sleep(.5) | |
except KeyboardInterrupt: | |
logging.warning("Exception: KeyboardInterrupt") | |
finally: | |
GPIO.cleanup() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment