Skip to content

Instantly share code, notes, and snippets.

@bootrino
Created December 20, 2021 02:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bootrino/9c6c82e3243782da23308e50a38686db to your computer and use it in GitHub Desktop.
Save bootrino/9c6c82e3243782da23308e50a38686db to your computer and use it in GitHub Desktop.
Rotary phone code for Raspberry Pi zero
#!/usr/bin/python3
import atexit
# sudo apt install python3-gpiozero
import gpiozero
import math, sys, os, time
import subprocess
import time
from threading import Timer
from os import listdir
from os.path import isfile, join
pin_rotaryenable = 23 # yellow
pin_countrotary = 18 # red
pin_hook = 25
rotaryenable = gpiozero.Button(pin_rotaryenable)
countrotary = gpiozero.Button(pin_countrotary)
hook = gpiozero.Button(pin_hook)
# debounce function https://jonlabelle.com/snippets/view/python/python-debounce-decorator-function
# debounce function is MPL license Jon LaBelle
def debounce(wait):
"""Postpone a functions execution until after some time has elapsed
:type wait: int
:param wait: The amount of Seconds to wait before the next call can execute.
"""
def decorator(fun):
def debounced(*args, **kwargs):
def call_it():
fun(*args, **kwargs)
try:
debounced.t.cancel()
except AttributeError:
pass
debounced.t = Timer(wait, call_it)
debounced.t.start()
return debounced
return decorator
phone_number_to_audioclip_map = {
'1': 'dialtone.mp3',
'8360775': '8360775.mp3',
}
class Dial():
# valid states:
# 'dial'
# 'hungup'
# 'call_connected'
def __init__(self):
print("Initializing...")
self.reset()
self.assess_is_handle_lifted()
try:
self.audiofiles = [f for f in listdir('/home/pi/audio') if isfile(join('/home/pi/audio', f))]
except:
self.audiofiles = []
print(self.audiofiles)
def reset(self):
self.pulses = 0
self.counting = True
self.accumulated_phone_number = ''
self.player = None
def assess_is_handle_lifted(self):
if hook.is_pressed:
print("handle_is_lifted = True")
self.handle_is_lifted = True
self.enter_state_dial()
else:
print("handle_is_lifted = False")
self.handle_is_lifted = False
self.enter_state_hungup()
def cancel_all_audio(self):
print("cancel_all_audio")
try:
#self.player.kill()
#self.player.terminate()
#self.player.wait()
p = subprocess.call(["/usr/bin/killall", "mpg123"])
except Exception as e:
pass
def enter_state_hungup(self):
print("enter_state_hungup")
self.reset()
# cancel all audio
self.cancel_all_audio()
self.state = 'hungup'
def enter_state_dial(self):
print("enter_state_dial")
self.state = 'dial'
self.reset()
print('starting dialtone')
self.play_sound_dialtone()
def enter_state_call_connected(self, phone_number_as_mp3_filename):
print("enter_state_call_connected")
self.state = 'call_connected'
self.cancel_all_audio()
# initial ringback indicates to user that the phone is ringing at the other end
self.play_sound_ringback()
self.play_sound_matching_phone_number(phone_number_as_mp3_filename)
def play_sound_dialtone(self):
print("playing dialtone")
self.cancel_all_audio()
# play dialtone
self.player = subprocess.Popen(
["mpg123", "--loop", "20", "-q", 'dialtone.mp3'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd='/home/pi'
)
def play_sound_ringback(self):
# play for a random number of seconds
print("play_sound_ringback")
self.cancel_all_audio()
# play ringback
def countonepulse(self):
self.pulses += 1
def stopcountingpulses(self):
pass
def lookup_phone_number(self):
phone_number_as_mp3_filename = f'{self.accumulated_phone_number}.mp3'
if phone_number_as_mp3_filename in self.audiofiles:
return phone_number_as_mp3_filename
def play_sound_matching_phone_number(self, phone_number_as_mp3_filename):
self.cancel_all_audio()
print(f'play_sound_matching_phone_number: {phone_number_as_mp3_filename}')
print(["mpg123", "-q", f'/home/pi/audio/{self.lookup_phone_number()}'])
self.player = subprocess.Popen(
["mpg123", "-q", f'/home/pi/audio/{self.lookup_phone_number()}'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd='/home/pi'
)
def assess_current_phone_number(self):
phone_number_as_mp3_filename = self.lookup_phone_number()
if phone_number_as_mp3_filename:
print(f'phone_number_as_mp3_filename: {phone_number_as_mp3_filename}')
self.enter_state_call_connected(phone_number_as_mp3_filename)
def countrotary_when_deactivated(self):
# if the phone is hung up, ignore the rotary dialler
# if the phone is currently playing, ignore the rotary dialler
print('dial')
if self.state != 'dial':
return
dialled_digit = self.pulses - 1
if dialled_digit == 10:
dialled_digit = 0
self.accumulated_phone_number += str(dialled_digit)
self.pulses = 0
print(f'accumulated_phone_number: {self.accumulated_phone_number}')
self.assess_current_phone_number()
def countrotary_when_activated(self):
pass
@debounce(1)
def hook_event(self):
print('hook event')
# reset everything when hook goes up or down
self.reset()
self.assess_is_handle_lifted()
def cleanup(self):
self.cancel_all_audio()
if __name__ == "__main__":
dial = Dial()
countrotary.when_deactivated = dial.countrotary_when_deactivated
countrotary.when_activated = dial.countrotary_when_activated
rotaryenable.when_activated = dial.countonepulse
rotaryenable.when_deactivated = dial.stopcountingpulses
hook.when_activated = dial.hook_event
hook.when_deactivated = dial.hook_event
atexit.register(dial.cleanup)
while True:
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment