Skip to content

Instantly share code, notes, and snippets.

@robertoloja
Last active December 23, 2017 18:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robertoloja/317e7d1c8ef49b6d5e510e3c1082518a to your computer and use it in GitHub Desktop.
Save robertoloja/317e7d1c8ef49b6d5e510e3c1082518a to your computer and use it in GitHub Desktop.
Counts population. Runs in the background and constantly updates the local mongo instance.
#!/usr/bin/python
from pymongo import MongoClient
import RPi.GPIO as GPIO
import threading
import time
import sys
# Using a state machine to track bee ingress/egress.
states = {
'IDLE': 0,
'EGRESS_1_0': 1,
'EGRESS_1_1': 2,
'EGRESS_0_1': 3,
'INGRESS_0_1': 4,
'INGRESS_1_1': 5,
'INGRESS_1_0': 6
}
# Put all related data into one data structure. Constants are capitalized.
gates = [
{'IN': 16, 'OUT': 18, 'state': states['IDLE']},
{'IN': 21, 'OUT': 13, 'state': states['IDLE']},
{'IN': 12, 'OUT': 15, 'state': states['IDLE']},
{'IN': 29, 'OUT': 31, 'state': states['IDLE']},
{'IN': 33, 'OUT': 35, 'state': states['IDLE']}
]
GPIO.setmode(GPIO.BOARD)
# This list comprehension puts every gate's 'IN' and 'OUT' into one big list...
pins = [x['IN'] for x in gates] + [x['OUT'] for x in gates]
# ...so we can setup every pin by just iterating over that list.
for pin in pins:
GPIO.setup(pin, GPIO.IN)
hiveInfo = MongoClient().sensorData.hiveInfo
hI = hiveInfo.find()[0]
if 'population' in hI:
count = hI['population']
else:
count = 0
# This function asynchronously stores the count in Mongo every 5 seconds
def storeCount():
hiveInfo.update({}, {'$set': {'population': count}})
threading.Timer(5, storeCount).start()
storeCount()
# Event loop. Cheap asynchronous behaviour, hence the state machine.
while True:
for gate in gates:
inSensor = not GPIO.input(gate['IN']) # Compensating for active low
outSensor = not GPIO.input(gate['OUT'])
# State machine logic.
if gate['state'] is states['IDLE']:
if inSensor:
gate['state'] = states['EGRESS_1_0']
elif outSensor:
gate['state'] = states['INGRESS_0_1']
elif gate['state'] is states['EGRESS_1_0']:
if outSensor:
gate['state'] = states['EGRESS_1_1']
elif not inSensor:
gate['state'] = states['IDLE']
elif gate['state'] is states['EGRESS_1_1']:
if not inSensor:
gate['state'] = states['EGRESS_0_1']
elif not outSensor:
gate['state'] = states['EGRESS_1_0']
elif gate['state'] is states['EGRESS_0_1']:
if inSensor:
gate['state'] = states['EGRESS_1_1']
elif not outSensor:
gate['state'] = states['IDLE']
count += 1 # A bee has entered the hive.
sys.stdout.write('\r%s ' % repr(count))
elif gate['state'] is states['INGRESS_0_1']:
if inSensor:
gate['state'] = states['INGRESS_1_1']
elif not outSensor:
gate['state'] = states['IDLE']
elif gate['state'] is states['INGRESS_1_1']:
if not outSensor:
gate['state'] = states['INGRESS_1_0']
elif not inSensor:
gate['state'] = states['INGRESS_0_1']
elif gate['state'] is states['INGRESS_1_0']:
if outSensor:
gate['state'] = states['INGRESS_1_1']
elif not inSensor:
gate['state'] = states['IDLE']
if count > 0: # No (arithmetically) negative bees.
count -= 1 # A bee has exited the hive.
sys.stdout.write('\r%s ' % repr(count))
time.sleep(0.01)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment