Skip to content

Instantly share code, notes, and snippets.

@jaggzh
Last active May 25, 2024 09:41
Show Gist options
  • Save jaggzh/e9a5b31afc218b8d44fd5ddb976c8c96 to your computer and use it in GitHub Desktop.
Save jaggzh/e9a5b31afc218b8d44fd5ddb976c8c96 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# gist-paste -u https://gist.github.com/jaggzh/e9a5b31afc218b8d44fd5ddb976c8c96 bansi.py kbnb.py audiosplit.py
import librosa
import numpy as np
import random
from typing import Generator
import matplotlib.pyplot as plt
from bansi import *
import kbnb
import re
import threading # playbg
import sys
fig=ax=None # need global
def plt_cb():
if plt: ax.figure.canvas.draw(); ax.figure.canvas.flush_events()
def pltkbnb_init():
global fig, ax
plt.ion()
fig, ax = plt.subplots(figsize=(10, 4))
kbnb.init(delay=.05, cb=plt_cb)
def p(*x,**y): print(*x,**y)
def play_audio_segment(au, sr):
import simpleaudio as sa
au = (au * 32767).astype(np.int16) # Convert to 16-bit data
play_obj = sa.play_buffer(au, 1, 2, sr) # 1 ch, 2 bytes per sample
play_obj.wait_done() # Wait for play finish
def play_bg(au, sr):
thread = threading.Thread(target=play_audio_segment, args=(au, sr))
thread.start()
def gen_clips(filename: str,
sr: int = 16000,
offset_s: float = 0,
chunklen_s: float = 13,
chunkskip_s: float = 0, # skip secs between clips
offset_rand: bool = False, # random offset into file
maxlen_s: float = 5.5,
minsil_s: float = 0.3,
exam_skip_s: float = 0.10,
exam_len_s: float = 0.10,
maxamp_discard_frac: float = 0.05,
silthresh_frac: float = .08, # fraction between min/max amp sil. tol.
silthresh_power: float = 1.0,
plot: bool = False,
verbose: int = 0,
) -> Generator[np.ndarray, None, None]:
# Returns: clip numpy array,
# time of clip in s
if plot:
pltkbnb_init()
# Load audio file
audio, _ = librosa.load(filename, sr=sr, mono=True)
audio_dur_s = len(audio) / sr
# Check for validity of chunklen_s and maxlen_s
if chunklen_s < maxlen_s:
raise ValueError("chunklen_s must be greater than or equal to maxlen_s")
# Determine the start offset
if offset_rand:
offset_s = random.uniform(0, audio_dur_s - chunklen_s)
# Process audio in chunks
start_sample_au = int(offset_s * sr)
while start_sample_au + int(chunklen_s * sr) <= len(audio):
end_sample_au = start_sample_au + int(chunklen_s * sr)
audio_chunk = audio[start_sample_au:end_sample_au]
audio_chunk_abs = np.abs(audio_chunk)
x_values_smoothed, smoothed_audio = \
calculate_smoothed_envelope(audio_chunk, sr=sr, win=exam_len_s)
x_sm_scale = audio_chunk.shape[0] / smoothed_audio.shape[0]
# Compute amplitudes and determine thresholds
amplitudes = smoothed_audio
max_amplitude = np.percentile(amplitudes, 100 * (1 - maxamp_discard_frac))
min_amplitude = amplitudes.min()
#threshold = min_amplitude + (np.min(amplitudes) ** silthresh_power) * silthresh_frac
threshold = min_amplitude + \
((max_amplitude - min_amplitude) * silthresh_frac) ** silthresh_power
# Find silence point to split
end_idx = max(0, int((len(audio_chunk) - maxlen_s * sr) / x_sm_scale))
end_idx = int(min(audio_chunk.shape[0], maxlen_s)*sr/x_sm_scale)
poten_split_au = None
for idx in range(end_idx, -1, -1):
if amplitudes[idx] < threshold and idx>0:
poten_split_au = int(idx * x_sm_scale + start_sample_au)
if verbose: p(f'Potential split chosen at (idx={idx})*(x_sm_scale={x_sm_scale:4})+(start_sample_au={start_sample_au}) === {poten_split_au}. Min amp: {{{amplitudes[idx]:.6}}} < thr {{{threshold:.6}}}. realmin {{{amplitudes.min():.6}}}')
break
if plot:
ax.clear()
ax.plot(audio_chunk, label='Original Audio')
ax.plot(x_values_smoothed, smoothed_audio, label='Smoothed Envelope', marker='o', linestyle='-', alpha=0.7)
ax.axvline(x=maxlen_s*sr, color='green', linestyle='-', linewidth=6, alpha=0.3, label=f'Max {maxlen_s}s')
ax.axhline(y=max_amplitude, color='black', linestyle='--', linewidth=1, alpha=0.4)
ax.axhline(y=amplitudes.min(), color='black', linestyle='--', linewidth=1, alpha=0.4)
ax.axhline(y=threshold, color='black', linestyle='--', linewidth=2, alpha=0.6)
if poten_split_au is not None:
view_splitx = poten_split_au-start_sample_au
view_splitx_color = 'black'
view_splitx_label = 'Min Amp. Split'
else:
view_splitx = start_sample_au + maxlen_s*sr
view_splitx_color = 'darkgreen'
view_splitx_label = 'Fallback split at Max Secs'
if verbose: p(f' [Placing split] {yel}{view_splitx_label}{rst} {view_splitx}')
ax.axvline(x=view_splitx,
color=view_splitx_color,
linestyle='--',
linewidth=2,
alpha=0.7, label=view_splitx_label)
ax.legend()
while True:
def hlkeys(s): # highlight prompt keys
# ex: s='(q)uit, (n)ext, (p)lay view: '
st=bmag # already defined
en=rst
return re.sub(r'\(([^)]*)\)', lambda m: f"({st}{m.group(1)}{en})", s)
prompt=hlkeys('(q)uit, (Enter/n)ext, Play (e)NTIRE, Play (Space/p/s)plit, Play (a)fter: ')
ch = kbnb.waitkey(prompt)
if ch == 'e':
p('Play ENTIRE VIEW')
play_bg(audio_chunk, sr)
elif ch == 's' or ch == 'p' or ch == ' ':
p('Play left split section')
en = poten_split_au if poten_split_au \
else end_sample_au
play_bg(audio[start_sample_au:en], sr)
elif ch == 'a':
p('Play AFTER split section')
st = poten_split_au if poten_split_au \
else maxlen_s*sr + start_sample_au
play_bg(audio[st:end_sample_au], sr)
elif ch == 'n' or ch == '\n': p('Next'); break
elif ch == 'q': p('Quit'); sys.exit()
else: p(f"Unknown key {ch}")
# Yield and adjust last split based on found silence
if poten_split_au:
if verbose: print(f'{red}Found min for split ({start_sample_au}:{poten_split_au}){rst}')
yield audio[start_sample_au:poten_split_au], start_sample_au / sr
start_sample_au = poten_split_au
else:
if verbose: print(f'{gre}Using max chunk for split ({start_sample_au}:{end_sample_au}){rst}')
yield audio[start_sample_au:end_sample_au], start_sample_au / sr
start_sample_au = end_sample_au
# Apply chunkskip_s
start_sample_au += int(chunkskip_s * sr)
# Check if it's the last possible chunk
if start_sample_au + int(maxlen_s * sr) > len(audio):
if verbose: print(f'{blu}Final chunk is not split ({start_sample_au}:len(audio)=={len(audio)}){rst}')
yield audio[start_sample_au:len(audio)], start_sample_au / sr
break
def calculate_smoothed_envelope(audio_chunk, *, sr, win):
"""
Calculate smoothed envelope of audio with centered x-values.
- win: Length of smoothing window in seconds.
Returns:
- x_values_smoothed: Centered x-values for the smoothed envelope.
- smoothed_audio: Smoothed envelope values.
"""
audio_chunk_abs = np.abs(audio_chunk)
window_size = int(sr * win)
smoothed_audio = np.maximum.reduceat(audio_chunk_abs, np.arange(0, len(audio_chunk_abs), window_size))
# centered x-values for each max point
x_values_smoothed = np.arange(window_size // 2, len(audio_chunk), window_size)[:len(smoothed_audio)]
return x_values_smoothed, smoothed_audio
if __name__ == '__main__':
if len(sys.argv) < 2:
print("Give me an audio filename to evaluate splitting.")
for clip in gen_clips(sys.argv[1], plot=True):
print(clip.shape) # Show the size of each clip
from __future__ import print_function
import sys
# Some color codes for terminals.
# You just print the text and color codes, and print rst to
# send the color reset sequence.
# The color sequence names are in the first bit of code below.
# Usually, just use with something like:
# from jaggz.ansi import * # Convenient! Imports into global namespace
# # wonders like: red, bred, yel, bgblu, etc.
# print(red, "Hi, ", blu, username, rst, sep="")
# This mess by jaggz.h who periodically reads his gmail.com
# I usually import some routines from my utils.py:
# pfp() (print..plain) is print() with sep='', so I don't get spaces
# between my text and ansi sequences
#
# def pf(*x, **y): # Print-flush
# print(*x, **y)
# sys.stdout.flush()
# def pfp(*x, **y): # Print-flush, plain (no separator)
# y.setdefault('sep', '')
# print(*x, **y)
# sys.stdout.flush()
# def pfl(*x, **y): # Print-flush, line (ie. no newline)
# y.setdefault('end', '')
# print(*x, **y)
# sys.stdout.flush()
# def pfpl(*x, **y): # Print-flush, plain, line (no sep, no NL)
# y.setdefault('sep', '')
# y.setdefault('end', '')
# print(*x, **y)
# sys.stdout.flush()
esc="^["
esc="\033"
bgbla=esc + "[40m"
bgred=esc + "[41m"
bggre=esc + "[42m"
bgbro=esc + "[43m"
bgblu=esc + "[44m"
bgmag=esc + "[45m"
bgcya=esc + "[46m"
bggra=esc + "[47m"
bla=esc + "[30m"
red=esc + "[31m"
gre=esc + "[32m"
bro=esc + "[33m"
blu=esc + "[34m"
mag=esc + "[35m"
cya=esc + "[36m"
gra=esc + "[37m"
bbla=esc + "[30;1m"
bred=esc + "[31;1m"
bgre=esc + "[32;1m"
yel=esc + "[33;1m"
bblu=esc + "[34;1m"
bmag=esc + "[35;1m"
bcya=esc + "[36;1m"
whi=esc + "[37;1m"
rst=esc + "[0;m"
chide=esc + "[?25l"
cshow=esc + "[?25h"
cll=esc + "[2K"
cllr=esc + "[K"
clsb="\033[J";
def apfl(*x, **y):
y.setdefault('sep', '')
y.setdefault('end', '')
print(*x, **y)
sys.stdout.flush()
aseq_rg = [16,52,58,94,100,136,142,178,184,220]
aseq_rb = [16,52,53,89,90,126,127,163,164,200]
aseq_gb = [16,22,23,29,30,36,37,43,44,50]
aseq_r = [16,52,88,124,160,196]
aseq_g = [16,22,28,34,40,46]
aseq_b = [16,17,18,19,20,21]
aseq_gr = [232,233,234,235,236,237,238,239,240,241,242,243,244,245,246,247,248,249,250,251,252,253,254,255]
def a256fg(a):
return esc + "[38;5;" + str(a) + "m";
def a256bg(a):
return esc + "[48;5;" + str(a) + "m";
def aseq_norm(seq, i): # Takes sequence and a value from [0-1]
return seq[int((len(seq)-2) * i)+1]
def a24fg(r,g,b):
return "\033[38;2;"+str(r)+";"+str(g)+";"+str(b)+"m"
def a24bg(r,g,b):
return "\033[48;2;"+str(r)+";"+str(g)+";"+str(b)+"m"
# Hex versions #rrggbb
def a24fgh(hexrgb):
r = int(hexrgb[1:3], 16)
g = int(hexrgb[3:5], 16)
b = int(hexrgb[5:7], 16)
return f"\033[38;2;{r};{g};{b}m"
def a24bgh(hexrgb):
r = int(hexrgb[1:3], 16)
g = int(hexrgb[3:5], 16)
b = int(hexrgb[5:7], 16)
return f"\033[48;2;{r};{g};{b}m"
def a24fg_rg(v): # 0-1 for red-green
return a24fg(int(255.0*(1-v)), int(255.0*v), 0)
def a24bg_rg(v): # 0-1 for red-green
return a24bg(int(255.0*(1-v)), int(255.0*v), 0)
def a24fg_ry(v): # 0-1 for red-green
return a24fg(255, int(255.0*v), 0)
def a24bg_ry(v): # 0-1 for red-green
return a24bg(255, int(255.0*v), 0)
# outputs colorized letters based on corresponding same-length
# list of values, using sequence codes from aseq
# skips first color (being too dark) by using aseq_norm()
# s: String input (might work with anything where s[i] is printable)
# values: List of numerical values. Same length as s.
# These are the magnitudes of the colors
# -- the range is calculated linearly between min() and max()
# aseq: The ansi color sequence (ex. aseq_rb). See those in this file.
# bg=True: To set background instead of fg. (Default: False)
# color: Optionally set a fixed color for fg or bg
# Use the color variables from this file, like:
# red, bred (brightred), bgred, etc.
def str_colorize(s, values, aseq, bg=False, color=None):
minv = min( values )
maxv = max( values )
for i in range(len(s)):
val = values[i]
norm = (float(val)-minv)/(maxv - minv)
ansival=aseq_norm(aseq, norm)
if not color == None: print(color, end='')
if bg:
ansistr=a256bg(ansival)
else:
ansistr=a256fg(ansival)
print(ansistr, s[i], sep='', end='')
print(rst)
def uncolor():
global bgbla, bgred, bggre, bgbro, bggre, bgmag, bgcya, bggra
global bla, red, gre, bro, gre, mag, cya, gra
global bbla, bred, bgre, yel, bgre, bmag, bcya, whi
global rst
bgbla, bgred, bggre, bgbro, bggre, bgmag, bgcya, bggra = [""]*8
bla, red, gre, bro, gre, mag, cya, gra = [""]*8
bbla, bred, bgre, yel, bgre, bmag, bcya, whi = [""]*8
rst = ""
def get_linux_termsize_xy(): # x,y
return get_linux_terminal()
def get_linux_terminal(): # x,y
import os
env = os.environ
def ioctl_GWINSZ(fd):
try:
import fcntl, termios, struct, os
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ,
'1234'))
except:
return
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
cr = (env.get('LINES', 25), env.get('COLUMNS', 80))
### Use get(key[, default]) instead of a try/catch
#try:
# cr = (env['LINES'], env['COLUMNS'])
#except:
# cr = (25, 80)
return int(cr[1]), int(cr[0])
def gy(sy): apfl(esc + "[{}H".format(sy))
def gxy(sx,sy): apfl(esc + "[{};{}H".format(sy,sx))
def gyx(sy,sx): apfl(esc + "[{};{}H".format(sy,sx))
def cls(): apfl(esc + "[2J")
def clsb(): apfl(esc + "[J")
def gright(v=None):
if v is none: apfl(esc + "[C")
else: apfl(esc + f"[{v}C")
def gleft(): apfl(esc + "[D")
def gup(): apfl(esc + "[A")
def gdown(): apfl(esc + "[B")
def get_num_colors():
import subprocess
import sys
try:
# Use 'tput colors' to get the number of supported colors
num_colors = subprocess.check_output(['tput', 'colors']).strip()
return int(num_colors)
except Exception as e:
print(f"Error querying terminal info: {e}", file=sys.stderr)
return -1 # Return -1 or any indicator for an error or unsupported terminal
def is24():
"""Check if the terminal supports 24-bit color."""
return get_num_colors() >= 16777216 # 2^24 colors for 24-bit
def is256():
"""Check if the terminal supports at least 256 colors."""
return get_num_colors() >= 256
def is256only():
"""Check if the terminal supports exactly 256 colors (and not 24-bit color)."""
num_colors = get_num_colors()
return num_colors >= 256 and num_colors < 16777216
# kbnb.py
# gist-paste -u https://gist.github.com/jaggzh/0bc0fe23872c6980af29d7bd89d032cf
# Description:
# Python non-blocking input for Unix/Linux with callback support for wait loops
# (The main termios routine lines came from a stackexchange post by swdev)
# Ctrl-c interrupt is not taken over, so ctrl-c should still break out.
# Date: 2017-07-01
# Version: 0.1b
# Author: jaggz.h who happens to be @ gmail.com
# Usage:
# Setup:
# kbnb.init() or kbnb.init(cb=some_function)
# kbnb.reset_flags() when done
# Example callback function:
# def some_function(): plt.pause(0.05)
# This I use to call matplotlib's pyplot's pause() to benefit from its
# event handler (for window updates or mouse events).
# Later, to wait for a key in your own code:
# print("Hit any key")
# ch = kbnb.waitch() or waitch(cb=different_callback)
# To check (and get any pending) key:
# ch = kbnb.getch()
# Returns: None if no input is pending
# To check (and get any pending) char sequence as a list:
# str = kbnb.getstrnb()
# Returns:
# [] If no input is pending, or
# A list of integer values
# To check (and get any pending) char sequence as a string:
# str = kbnb.getstrnb()
# Returns: "" or "string of input characters"
# Set a new callback post-init(): kbnb.setcb(new_cb_function)
# Full example using waitch() to wait for a kb char input:
# import kbnb, time
# plt = None
# def plt_sleep():
# # waitch()'s default loop will call our function instead of
# # its built-in time.sleep(delay), so we are sleeping, ourselves,
# # because we really don't need high time resolution in our loop.
# if plt: plt.pause(.1) # In case we didn't setup plt
# else: time.sleep(.1)
# kbnb.init(cb=plt_sleep)
# # ...Do something here that needs our callback called even while
# # we're waiting for input.
# # ...In this example, we would have popped up a pyplot window which
# # requires its event handler to be called for window updates and
# # mouse/kb events.
# print("Hit any key to continue")
# kbnb.waitch()
# kbnb.gobble() # Consume remaining pending input before leaving
# # (Useful if a key was hit, like up or down, which
# # Might send a multi-character sequence)
# Full example using getch() to do something while waiting for input:
# import kbnb, time
# kbnb.init()
# while True:
# ch=kbnb.getch()
# if ch == 'q': break
# print("I'm not thrown for a loop! (And 'q' to quit)")
# time.sleep(1)
# kbnb.gobble() # Consume remaining pending input before leaving
# # (Useful if a key was hit, like up or down, which
# # Might send a multi-character sequence)
from __future__ import print_function
import sys, atexit, termios, time, os, signal
orig_flags=None
loop_callback=None
loop_delay=0.1 # Init sets this, but we'll put .1 for "safety"
def init(cb=None, delay=.05):
global orig_flags
global loop_callback
if cb: loop_callback = cb
loop_delay = delay
orig_flags = termios.tcgetattr(sys.stdin)
new_flags = termios.tcgetattr(sys.stdin)
# Disable echo and disable blocking (disable canonical mode)
new_flags[3] = new_flags[3] & ~(termios.ECHO | termios.ICANON)
new_flags[6][termios.VMIN] = 0 # cc (dunno what swdev meant by cc)
new_flags[6][termios.VTIME] = 0 # cc
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_flags)
signal.signal(signal.SIGINT, reset_sighand)
@atexit.register
def atexit_reset():
reset_flags()
def reset_sighand(signum, frame):
reset_flags()
def reset_flags():
# print("kbnb cleanup")
if orig_flags:
#print("kbnb reset")
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, orig_flags)
def setcb(cb):
loop_callback = cb
def setdelay(delay):
loop_delay = delay
def getch():
return os.read(sys.stdin.fileno(), 1).decode()
def getkey():
ch = getch()
nums = ""
if ch == "\033":
next_char = getch() # This could be '[' for sequences or 'O' for F1-F4 in some terminals
if next_char == '[':
while not ch.isalpha():
nums += ch
ch = getch()
if nums == "": # Plain codes: esc[L
if ch == 'A': ch = 'up'
if ch == 'B': ch = 'down'
if ch == 'C': ch = 'right'
if ch == 'D': ch = 'left'
else:
if ch == 'A' and nums == '1;5': ch = "c-up"
elif ch == 'B' and nums == '1;5': ch = "c-down"
elif ch == 'C' and nums == '1;3': ch = "a-right"
elif ch == 'D' and nums == '1;3': ch = "a-left"
elif next_char == 'O': # Older terminals for F1-F4
ch = getch()
if ch == 'P': return 'F1'
elif ch == 'Q': return 'F2'
elif ch == 'R': return 'F3'
elif ch == 'S': return 'F4'
elif ch == "\x7f": # Handle Backspace (often sent as DEL ASCII 127)
return "backspace"
elif len(ch) == 1 and ord(ch) < 32: # ASCII control characters (1-31)
och = ord(ch)
if och == 0: return "^@"
return f'^{chr(96+och)}'
return ch
def getkey():
ch = getch()
nums = ""
if ch == "\033":
getch() # skip the [
ch = getch()
while not ch.isalpha():
nums += ch
ch = getch()
if nums == "": # Plain codes: esc[L
if ch == 'A': ch = 'up'
if ch == 'B': ch = 'down'
if ch == 'C': ch = 'right'
if ch == 'D': ch = 'left'
else:
if ch == 'A' and nums == '1;5': ch = "c-up"
elif ch == 'B' and nums == '1;5': ch = "c-down"
elif ch == 'C' and nums == '1;3': ch = "a-right"
elif ch == 'D' and nums == '1;3': ch = "a-left"
return ch
def getlist():
ch_set = []
ch = getch()
while ch != None and len(ch) > 0:
ch_set.append( ord(ch[0]) )
ch = getch()
return ch_set;
def getstr():
ch_str = ""
ch = getch()
while ch != None and len(ch) > 0:
ch_str += ch
ch = getch()
return ch_str;
def gobble():
while getch(): pass
def waitkey(prompt="Hit a key to continue", cb='default', keys=True):
# Keys is for processing complex keystrokes, returning
# strings like 'up', 'down', etc. See getkey()
return waitch(prompt, cb, keys)
def waitch(prompt="Hit a key to continue", cb='default', keys=False):
if prompt:
print(prompt, end="")
sys.stdout.flush()
while True:
key = getch() if not keys else getkey()
if len(key): return key
else:
if cb == 'default':
if loop_callback: loop_callback()
else:
time.sleep(loop_delay)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment