Created
December 20, 2021 02:50
-
-
Save bootrino/9c6c82e3243782da23308e50a38686db to your computer and use it in GitHub Desktop.
Rotary phone code for Raspberry Pi zero
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import atexit | |
# sudo apt install python3-gpiozero | |
import gpiozero | |
import math, sys, os, time | |
import subprocess | |
import time | |
from threading import Timer | |
from os import listdir | |
from os.path import isfile, join | |
pin_rotaryenable = 23 # yellow | |
pin_countrotary = 18 # red | |
pin_hook = 25 | |
rotaryenable = gpiozero.Button(pin_rotaryenable) | |
countrotary = gpiozero.Button(pin_countrotary) | |
hook = gpiozero.Button(pin_hook) | |
# debounce function https://jonlabelle.com/snippets/view/python/python-debounce-decorator-function | |
# debounce function is MPL license Jon LaBelle | |
def debounce(wait): | |
"""Postpone a functions execution until after some time has elapsed | |
:type wait: int | |
:param wait: The amount of Seconds to wait before the next call can execute. | |
""" | |
def decorator(fun): | |
def debounced(*args, **kwargs): | |
def call_it(): | |
fun(*args, **kwargs) | |
try: | |
debounced.t.cancel() | |
except AttributeError: | |
pass | |
debounced.t = Timer(wait, call_it) | |
debounced.t.start() | |
return debounced | |
return decorator | |
phone_number_to_audioclip_map = { | |
'1': 'dialtone.mp3', | |
'8360775': '8360775.mp3', | |
} | |
class Dial(): | |
# valid states: | |
# 'dial' | |
# 'hungup' | |
# 'call_connected' | |
def __init__(self): | |
print("Initializing...") | |
self.reset() | |
self.assess_is_handle_lifted() | |
try: | |
self.audiofiles = [f for f in listdir('/home/pi/audio') if isfile(join('/home/pi/audio', f))] | |
except: | |
self.audiofiles = [] | |
print(self.audiofiles) | |
def reset(self): | |
self.pulses = 0 | |
self.counting = True | |
self.accumulated_phone_number = '' | |
self.player = None | |
def assess_is_handle_lifted(self): | |
if hook.is_pressed: | |
print("handle_is_lifted = True") | |
self.handle_is_lifted = True | |
self.enter_state_dial() | |
else: | |
print("handle_is_lifted = False") | |
self.handle_is_lifted = False | |
self.enter_state_hungup() | |
def cancel_all_audio(self): | |
print("cancel_all_audio") | |
try: | |
#self.player.kill() | |
#self.player.terminate() | |
#self.player.wait() | |
p = subprocess.call(["/usr/bin/killall", "mpg123"]) | |
except Exception as e: | |
pass | |
def enter_state_hungup(self): | |
print("enter_state_hungup") | |
self.reset() | |
# cancel all audio | |
self.cancel_all_audio() | |
self.state = 'hungup' | |
def enter_state_dial(self): | |
print("enter_state_dial") | |
self.state = 'dial' | |
self.reset() | |
print('starting dialtone') | |
self.play_sound_dialtone() | |
def enter_state_call_connected(self, phone_number_as_mp3_filename): | |
print("enter_state_call_connected") | |
self.state = 'call_connected' | |
self.cancel_all_audio() | |
# initial ringback indicates to user that the phone is ringing at the other end | |
self.play_sound_ringback() | |
self.play_sound_matching_phone_number(phone_number_as_mp3_filename) | |
def play_sound_dialtone(self): | |
print("playing dialtone") | |
self.cancel_all_audio() | |
# play dialtone | |
self.player = subprocess.Popen( | |
["mpg123", "--loop", "20", "-q", 'dialtone.mp3'], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
cwd='/home/pi' | |
) | |
def play_sound_ringback(self): | |
# play for a random number of seconds | |
print("play_sound_ringback") | |
self.cancel_all_audio() | |
# play ringback | |
def countonepulse(self): | |
self.pulses += 1 | |
def stopcountingpulses(self): | |
pass | |
def lookup_phone_number(self): | |
phone_number_as_mp3_filename = f'{self.accumulated_phone_number}.mp3' | |
if phone_number_as_mp3_filename in self.audiofiles: | |
return phone_number_as_mp3_filename | |
def play_sound_matching_phone_number(self, phone_number_as_mp3_filename): | |
self.cancel_all_audio() | |
print(f'play_sound_matching_phone_number: {phone_number_as_mp3_filename}') | |
print(["mpg123", "-q", f'/home/pi/audio/{self.lookup_phone_number()}']) | |
self.player = subprocess.Popen( | |
["mpg123", "-q", f'/home/pi/audio/{self.lookup_phone_number()}'], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE, | |
cwd='/home/pi' | |
) | |
def assess_current_phone_number(self): | |
phone_number_as_mp3_filename = self.lookup_phone_number() | |
if phone_number_as_mp3_filename: | |
print(f'phone_number_as_mp3_filename: {phone_number_as_mp3_filename}') | |
self.enter_state_call_connected(phone_number_as_mp3_filename) | |
def countrotary_when_deactivated(self): | |
# if the phone is hung up, ignore the rotary dialler | |
# if the phone is currently playing, ignore the rotary dialler | |
print('dial') | |
if self.state != 'dial': | |
return | |
dialled_digit = self.pulses - 1 | |
if dialled_digit == 10: | |
dialled_digit = 0 | |
self.accumulated_phone_number += str(dialled_digit) | |
self.pulses = 0 | |
print(f'accumulated_phone_number: {self.accumulated_phone_number}') | |
self.assess_current_phone_number() | |
def countrotary_when_activated(self): | |
pass | |
@debounce(1) | |
def hook_event(self): | |
print('hook event') | |
# reset everything when hook goes up or down | |
self.reset() | |
self.assess_is_handle_lifted() | |
def cleanup(self): | |
self.cancel_all_audio() | |
if __name__ == "__main__": | |
dial = Dial() | |
countrotary.when_deactivated = dial.countrotary_when_deactivated | |
countrotary.when_activated = dial.countrotary_when_activated | |
rotaryenable.when_activated = dial.countonepulse | |
rotaryenable.when_deactivated = dial.stopcountingpulses | |
hook.when_activated = dial.hook_event | |
hook.when_deactivated = dial.hook_event | |
atexit.register(dial.cleanup) | |
while True: | |
time.sleep(1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment