Skip to content

Instantly share code, notes, and snippets.

@maxam2017
Created June 20, 2019 08:04
Show Gist options
  • Save maxam2017/deef7eba5b5055c2737d54d24ea625a9 to your computer and use it in GitHub Desktop.
Save maxam2017/deef7eba5b5055c2737d54d24ea625a9 to your computer and use it in GitHub Desktop.
import RPi.GPIO as GPIO
import os
import time
import logging
import bluetooth
import subprocess
import requests
import base64
'''
<layout>
1(o)(o) 2---grey
sensor--3 (o) 4
5 (x) 6
red---7 8---orange
black---9(x) 10---green
yellow--11 12---blue
purple--13 (x)14
15 16---black<trig>
17 18---white<echo>
'''
class PiBin:
def __init__(self, en1, en2, in1, in2, in3, in4, ir, trig, echo):
GPIO.setmode(GPIO.BOARD)
# find self bd_address
self.bd_addr = subprocess.check_output("hciconfig | grep -o -E '([[:xdigit:]]{1,2}:){5}[[:xdigit:]]{1,2}'",shell=True)[:-1].decode('utf-8')
# set macro
self.width = 1024
self.height = 768
self.fn = 'test.png'
self.done = True
###
self.__EN1 = en1 # L293D enable1
self.__EN2 = en2 # L293D enable2
self.__IN1 = in1 # L293D in1
self.__IN2 = in2 # L293D in2
self.__IN3 = in3 # L293D in3
self.__IN4 = in4 # L293D in4
self.__IR = ir # IR sensor outport
self.__TRIG = trig # HC-SR04 trigger pin
self.__ECHO = echo # HC-SR04 echo pin
GPIO.setup(self.__EN1, GPIO.OUT)
GPIO.setup(self.__EN2, GPIO.OUT)
GPIO.setup(self.__IN1, GPIO.OUT)
GPIO.setup(self.__IN2, GPIO.OUT)
GPIO.setup(self.__IN3, GPIO.OUT)
GPIO.setup(self.__IN4, GPIO.OUT)
GPIO.setup(self.__IR, GPIO.IN)
GPIO.setup(self.__TRIG, GPIO.OUT)
GPIO.setup(self.__ECHO, GPIO.IN)
self.reset()
def capture_or_receive(self):
hash_map = {
'glass' :'B8:27:EB:C6:8E:FF', # maxam
'plastic': 'B8:27:EB:57:D8:D0', # joying
'cardboard': 'B8:27:EB:BF:91:E8', # dada
'paper': 'B8:27:EB:BF:91:E8', # dada
}
if self.bd_addr == 'B8:27:EB:C6:8E:FF': #CLIENT
while True:
# shooting image
os.system('raspistill -n -t 1 -o {} -e png -w {} -h {}'.format(self.fn,self.width,self.height))
img = open("test.png", "rb")
b64 = base64.b64encode(img.read())
payload = {'pid': b64}
session = requests.Session()
res = session.post('192.168.0.100:5000/api/score_board/record',data= payload)
# get return
result = res.content.decode('utf-8')
# assign rubbish bin
if result in hash_map:
if hash_map[result] == 'B8:27:EB:C6:8E:FF':
return True
else:
socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
logging.info('[{}] notify `{}` bin'.format(self.bd_addr,result))
socket.connect((hash_map[result], 1))
socket.send('GOGO')
socket.close()
logging.info('[{}] wait for `{}` bin'.format(self.bd_addr,result))
socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
port = 1
socket.bind(("",port))
socket.listen(1)
client = socket.accept()
data,addr = client.recv(1024)
if addr == hash_map[result]:
return False # work is done
else:#SERVER
if self.done == True:
logging.info('[{}] wake up B8:27:EB:C6:8E:FF'.format(self.bd_addr))
socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
socket.connect(('B8:27:EB:C6:8E:FF', 1))
socket.send('DONE')
socket.close()
self.done = False
logging.info('[{}] wait for B8:27:EB:C6:8E:FF'.format(self.bd_addr))
socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
port = 1
socket.bind(("",port))
socket.listen(1)
client = socket.accept()
data,addr = client.recv(1024)
logging.info('[{}] receive a request....'.format(self.bd_addr))
return addr == 'B8:27:EB:C6:8E:FF'
def go_ahead(self):
logging.info("Go ahead")
'''it's TRICKY! because GPIO.output's return value is `NONE`'''
# left wheel
GPIO.output(self.__IN1, GPIO.HIGH) or GPIO.output(self.__IN2, GPIO.LOW)
# right wheel
GPIO.output(self.__IN3, GPIO.HIGH) or GPIO.output(self.__IN4, GPIO.LOW)
self.start = time.time()
def count_distance(self, start, end):
t = end - start
d = t * 343
d = d / 2
return d
def is_safe(self, distance):
# the unit of distance is meter
''' Initalize HC-SR04 '''
GPIO.output(self.__TRIG, GPIO.HIGH)
time.sleep(0.00001)
GPIO.output(self.__TRIG, GPIO.LOW)
pulse_start = 0
pulse_end = 0
while GPIO.input(self.__ECHO) == GPIO.LOW:
pulse_start = time.time()
#logging.info(pulse_start)
while GPIO.input(self.__ECHO) == GPIO.HIGH:
pulse_end = time.time()
#logging.info(pulse_end)
d = self.count_distance(pulse_start, pulse_end)
logging.info('Obstacle is far from {:.2f}m'.format(d))
if d <= distance:
return False
else:
return True
'''Need Discussion!!! '''
def go_back(self):
logging.info("Go back")
duration = self.end - self.start
self.start = time.time()
# left wheel
GPIO.output(self.__IN1, GPIO.LOW) or GPIO.output(self.__IN2, GPIO.HIGH)
# right wheel
GPIO.output(self.__IN3, GPIO.LOW) or GPIO.output(self.__IN4, GPIO.HIGH)
while (time.time() - self.start) < duration:
pass
self.stop()
def stop(self):
logging.info("Stop")
# left wheel
GPIO.output(self.__IN1, GPIO.LOW) or GPIO.output(self.__IN2, GPIO.LOW)
# right wheel
GPIO.output(self.__IN3, GPIO.LOW) or GPIO.output(self.__IN4, GPIO.LOW)
self.end = time.time()
def wait_for_garbage_in(self):
return GPIO.input(self.__IR) == True
def reset(self):
# left wheel
GPIO.output(self.__IN1, GPIO.LOW) or GPIO.output(self.__IN2, GPIO.LOW)
# right wheel
GPIO.output(self.__IN3, GPIO.LOW) or GPIO.output(self.__IN4, GPIO.LOW)
GPIO.output(self.__TRIG, GPIO.HIGH)
time.sleep(0.00001)
GPIO.output(self.__TRIG, GPIO.LOW)
def main():
logging.basicConfig(level=logging.DEBUG)
Test = PiBin(True,7,8,11,13,10,12,3,16,18) # obeyed by GPIO.BOARD
try:
while True:
# every 0.5sec
if Test.capture_or_receive():
# go ahead until bump into obstacle
Test.go_ahead()
# check safety every 0.5 sec
while Test.is_safe(.1):
time.sleep(.5)
# Danger!!! Need `STOP` right now
Test.stop()
# open garbage bin
# detect into it every 0.5sec - IR sensor
while Test.wait_for_garbage_in():
time.sleep(.5)
# close garbage bin
Test.go_back() # go back to orignal position `use time to record
Test.reset()
else:
time.sleep(.5)
except KeyboardInterrupt:
logging.warning("Exception: KeyboardInterrupt")
finally:
GPIO.cleanup()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment