Skip to content

Instantly share code, notes, and snippets.

@P4UL-M
Last active April 25, 2024 18:15
Show Gist options
  • Save P4UL-M/0ac0287f19808da89729237d267e0eef to your computer and use it in GitHub Desktop.
Save P4UL-M/0ac0287f19808da89729237d267e0eef to your computer and use it in GitHub Desktop.
Changing playback sound speed in realtime with python and multiprocessing

Changing playback sound speed in realtime with python and multiprocessing

This is a script to change the sound speed in real time like for example background music in pygame.

The script generates the raw data of a sound file each time there is space in the queue. The sample will scale to the desired speed which you can modify with sound_Factor.

To play the music, you just need to get buffers from the queue and pass them to your music stream. Exemple for pygame :

# musique update
if channel.get_queue() == None:
    _buffer = generate_music.Sounds_buffer.get()
    channel.queue(pygame.mixer.Sound(_buffer))
elif not channel.get_busy() and not channel.get_queue():
    _buffer = generate_music.Sounds_buffer.get()
    channel.play(pygame.mixer.Sound(_buffer))

The script executes the main function in a Process so as to not overload our main loop.

from multiprocessing import Process,Value,Queue
from pydub import AudioSegment
import logging
"""
A scrit that allow real time musique speed modification with calcul on another Proccess to avoid overloading the main boucle
"""
# method from https://stackoverflow.com/questions/51434897/how-to-change-audio-playback-speed-using-pydub
def speed_change(sound:AudioSegment, speed=1.0):
# Manually override the frame_rate. This tells the computer how many
# samples to play per second
sound_with_altered_frame_rate = sound._spawn(sound.raw_data, overrides={
"frame_rate": int(sound.frame_rate * speed)
})
# convert the sound with altered frame rate to a standard frame rate
# so that regular playback programs will work right. They often only
# know how to play audio at standard frame rate (like 44.1k)
return sound_with_altered_frame_rate.set_frame_rate(sound.frame_rate)
musique_original = AudioSegment.from_file("Halloween LOOP.wav")
sound_factor = Value('d',1)
sound_offset = 500
Sounds_buffer = Queue(maxsize=2) # maxsize will be interfering with the update (less = more reactive but less resilient to lag)
def generator(sound_buffer:Queue,factor:Value,offset:int=500):
# var declaration
musique_alternate = musique_original # current study of the musique
player_offset = offset # offset of each sample
max_occilation = 0.01 # max modification of speed between two sample (lesser => more faded)
local_factor = factor.value # local factor of speed
while True:
if not Sounds_buffer.full(): # if the Queue is not full
# if local factor need update
if local_factor<factor.value:
local_factor += max_occilation
elif local_factor>factor.value:
local_factor -= max_occilation
if len(musique_alternate) > player_offset*local_factor: # check if we can take a normal sample in the study
sample, musique_alternate = musique_alternate[:player_offset*local_factor], musique_alternate[player_offset*local_factor:]
else: # else we just play what rest and loop the study back
sample,musique_alternate = musique_alternate,musique_original
_sample = speed_change(sample,local_factor)
buffer = bytearray(_sample.raw_data)
sound_buffer.put(buffer) # buffer send to the partaged Queue
p = Process(target=generator,args=(Sounds_buffer,sound_factor,sound_offset),daemon=True)
@18d-shady
Copy link

is it possible to check the framerate to see if you have previously changed the speed or not

@P4UL-M
Copy link
Author

P4UL-M commented Apr 25, 2024

Sorry I'm not sure to understand what you means. If it's that you want to access the actual factor of the player. It's totally possible you just have to access the attribute .value of the factor value you passed when creating your Process.

Here is an example of the class I personally used to manage my worker :

from multiprocessing import Process, Value, Manager, Queue
from pydub import AudioSegment
from queue import Empty

class Generator:
    p = None

    def __init__(self, path) -> None:
        self.m = Manager()
        self.Sounds_buffer: Queue = self.m.Queue(maxsize=2)
        self.musique_original = AudioSegment.from_file(path)
        self.speed_factor = Value('d', 1)
        self.sound_offset = 500

    def start(self):
        if self.p is None:
            self.p = Process(target=worker, args=(self.Sounds_buffer, self.speed_factor, self.musique_original, self.sound_offset))
            self.p.start()
        else:
            print(self.p)

    def reset(self):
        if self.p is not None:
            self.p.terminate()
            self.p = None
        self.speed_factor.value = 1
        try:
            while True:
                self.Sounds_buffer.get_nowait()
        except Empty:
            pass
        self.start()
        
 mygenerator = Generator("path/to/my/audiofile.waw")

I just init my class in a variable accessible in my main loop and when I want to access or change the value of the factor I just do mygenerator.speed_factor.value and mygenerator.speed_factor.value = x. If you want to access the frame_rate of the AudioSegment directly without interacting with the factor I guess you will need to adapt the function a little bit. Since you are working in another process you will need to use a Value() to communicate between both process.

@18d-shady
Copy link

I understand but is there a way to check the playback speed if is 1 or 1.5 or another value

@P4UL-M
Copy link
Author

P4UL-M commented Apr 25, 2024

Yes you just have to access the value of the argument (factor) you pass in arg of the Process. Value class of the multiprocessing library allow creating variable accessible both in the original and the created process. Here is the doc if you want more details on it. Basically, you can access the value with the .value attribute of the instance and interact with it as if it was a classic variable.

@P4UL-M
Copy link
Author

P4UL-M commented Apr 25, 2024

Here is an example of a whole script interacting with the playback speed.

from multiprocessing import Process, Value, Manager, Queue
from pydub import AudioSegment
from queue import Empty

# method from https://stackoverflow.com/questions/51434897/how-to-change-audio-playback-speed-using-pydub


def speed_change(sound: AudioSegment, speed=1.0):
    # Manually override the frame_rate. This tells the computer how many
    # samples to play per second
    sound_with_altered_frame_rate = sound._spawn(sound.raw_data, overrides={
        "frame_rate": int(sound.frame_rate * speed)
    })

    # convert the sound with altered frame rate to a standard frame rate
    # so that regular playback programs will work right. They often only
    # know how to play audio at standard frame rate (like 44.1k)
    return sound_with_altered_frame_rate.set_frame_rate(sound.frame_rate)


class generator:
    p = None

    def __init__(self, path) -> None:
        self.m = Manager()
        self.Sounds_buffer: Queue = self.m.Queue(maxsize=2)
        self.musique_original = AudioSegment.from_file(path)
        self.sound_factor = Value('d', 1)
        self.sound_offset = 500

    def start(self):
        if self.p is None:
            self.p = Process(target=worker, args=(self.Sounds_buffer, self.sound_factor, self.musique_original, self.sound_offset))
            self.p.start()
        else:
            print(self.p)

    def reset(self):
        if self.p is not None:
            self.p.terminate()
            self.p = None
        self.sound_factor.value = 1
        try:
            while True:
                self.Sounds_buffer.get_nowait()
        except Empty:
            pass
        self.start()


def worker(buffer, factor, musique_original, sound_offset):
    # var declaration
    musique_alternate = musique_original  # current study of the musique
    player_offset = sound_offset  # offset of each sample
    local_factor = factor.value  # local factor of speed

    while True:
        if not buffer.full():  # if the Queue is not full
            # if local factor need update
            local_factor += (factor.value - local_factor) / 3

            if len(musique_alternate) > player_offset * local_factor:  # check if we can take a normal sample in the study
                sample, musique_alternate = musique_alternate[:player_offset * local_factor], musique_alternate[player_offset * local_factor:]
            else:  # else we just play what rest and loop the study back
                sample, musique_alternate = musique_alternate, musique_original

            _sample = speed_change(sample, local_factor)
            _sample += min(5 * local_factor - 15, 0)
            _buffer = bytearray(_sample.raw_data)
            buffer.put(_buffer)  # buffer send to the partaged Queue

mygenerator = Generator("path/to/my/audiofile.waw")
mygenerator.start()

# main loop
while true:
    print(mygenerator.speed_factor.value)
    mygenerator.speed_factor += 0.01
    if mygenerator.speed_factor.value > 2:
        break
    # musique update
    if channel.get_queue() == None:
        _buffer = mygenerator.Sounds_buffer.get()
        channel.queue(pygame.mixer.Sound(_buffer))
    elif not channel.get_busy() and not channel.get_queue():
        _buffer = mygenerator.Sounds_buffer.get()
        channel.play(pygame.mixer.Sound(_buffer))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment