Skip to content

Instantly share code, notes, and snippets.

@reo6
Last active January 8, 2023 21:04
Show Gist options
  • Save reo6/ec64f51f101d9324612bcbcc597fabba to your computer and use it in GitHub Desktop.
Save reo6/ec64f51f101d9324612bcbcc597fabba to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
This is a GUI for testing the looper.
Will be replaced with Gtk later.
"""
import tkinter as tk
def tk_gui(on_press):
window = tk.Tk()
btn = tk.Button(text="Click me", command=on_press)
btn.pack()
window.mainloop()
#!/usr/bin/env python3
import numpy
from enum import Enum
import queue
class LoopMode(Enum):
"""
TODO: Create overdub mode.
"""
RECORD = 1
PLAY = 2
class Take:
"""
Represents a take.
"""
def __init__(self):
self.q = queue.Queue()
self.__take = []
def record_blocks(self, blocks):
self.__take.extend(blocks)
self.q = queue.Queue() # Reset queue.
def clear_take(self):
"""
Reset all the values.
"""
self.__take = []
self.q = queue.Queue()
def next(self):
try:
return self.q.get_nowait()
except queue.Empty:
# This means our queue is empty and it needs to be filled
# again.
if self.__take != []:
for b in self.__take:
self.q.put(b)
return self.q.get()
else:
return 0.0 # Return no audio in case the take is empty.
class Looper:
"""
This class will represent a loop channel.
"""
mode = LoopMode.PLAY
def __init__(self):
self.takes = [
Take(),
Take()
]
def get_next_takes(self):
"""
Get next blocks from both takes and return them.
"""
return (self.takes[0].next(), self.takes[1].next())
def record_takes(self, blocks1, blocks2):
self.takes[0].record_blocks(blocks1)
self.takes[1].record_blocks(blocks2)
#!/usr/bin/env python3
import jack
import threading
import numpy
from gui import tk_gui
from looper import LoopMode, Looper
CLIENT_NAME = "foss505"
class Foss505:
def __init__(self, client_name):
self.client = jack.Client(client_name)
self.client.set_process_callback(self.process)
self.initialize_ports()
self.looper = Looper()
def process(self, frames):
current_blocks = [port.get_array() for port in self.input_ports]
if self.looper.mode == LoopMode.PLAY:
for outport, take, block in zip(
self.output_ports,
self.looper.get_next_takes(),
current_blocks):
outport.get_buffer()[:] = take + block
elif self.looper.mode == LoopMode.RECORD:
self.temp_take.append(current_blocks)
for outport, block in zip(self.output_ports, current_blocks):
outport.get_buffer()[:] = block
def initialize_ports(self):
"""
Initialize input-output ports.
"""
self.input_ports = [
self.client.inports.register("input_1"),
self.client.inports.register("input_2"),
]
self.output_ports = [
self.client.outports.register("output_1"),
self.client.outports.register("output_2"),
]
pass
def switch_modes(self):
if self.looper.mode == LoopMode.PLAY:
# Record mode is on.
self.temp_take = []
self.looper.mode = LoopMode.RECORD
elif self.looper.mode == LoopMode.RECORD:
# This means the record is over,
# and we are ready to put it to our take.
blocks1 = list(map(lambda b: b[0], self.temp_take))
blocks2 = list(map(lambda b: b[1], self.temp_take))
self.looper.record_takes(blocks1, blocks2)
self.looper.mode = LoopMode.PLAY
def jack_thread(instance):
event = threading.Event()
with foss505.client:
try:
event.wait()
except KeyboardInterrupt:
print("\nInterrupted.")
if __name__ == "__main__":
foss505 = Foss505(CLIENT_NAME)
jack_thread_obj = threading.Thread(target=jack_thread, args=(foss505,))
tk_gui_obj = threading.Thread(target=tk_gui, args=(foss505.switch_modes,))
jack_thread_obj.start()
tk_gui_obj.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment