Skip to content

Instantly share code, notes, and snippets.

@lmmx
Last active November 7, 2023 21:37
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save lmmx/0970a01295e12531f6a3f0ac5537e0b8 to your computer and use it in GitHub Desktop.
Save lmmx/0970a01295e12531f6a3f0ac5537e0b8 to your computer and use it in GitHub Desktop.
Python commands to create speaker diarisation
# ffmpeg -i foo.m4a foo.wav
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization")
diarization = pipeline("foo.wav")
# RTTM format https://catalog.ldc.upenn.edu/docs/LDC2004T12/RTTM-format-v13.pdf
with open("foo.rttm", "w") as rttm:
diarization.write_rttm(rttm)
import pandas as pd
df = pd.read_csv("foo.rttm", sep=" ", header=None, usecols=[3,4,7], names="tbeg tdur stype".split())
def td_time_format(td):
parts = td.components
return f"{parts.minutes}:{parts.seconds:02}.{parts.milliseconds:03}"
df["tbeg_fmt"] = pd.to_timedelta(df.tbeg, unit="s").apply(td_time_format)
df["tend_fmt"] = pd.to_timedelta(df.tbeg + df.tdur, unit="s").apply(td_time_format)
# Get consecutive speaker runs, or single points
# via https://stackoverflow.com/a/71214440/2668831
speaker_runs = {
speaker: [
np.array(grp)[[0,-1]].tolist()
for grp in np.split(group, np.where(np.diff(group) != 1)[0]+1)]
for speaker, group in df.groupby("stype").agg("tbeg_fmt").groups.items()
}
# 'Roll up' the timestamps over consecutive runs by inverting the dict
speaker_order = sorted(
[{speaker: run} for speaker, runs in speaker_runs.items() for run in runs],
key=lambda d: [*d.values()]
)
rollup_records = [
{
"tbeg": df.tbeg[start_idx],
"tdur": df.tbeg[stop_idx] + df.tdur[stop_idx] - df.tbeg[start_idx],
"stype": df.stype[start_idx],
"tbeg_fmt": df.tbeg_fmt[start_idx],
"tend_fmt": df.tend_fmt[stop_idx],
}
for order in speaker_order
for speaker, (start_idx, stop_idx) in order.items()
]
rollup_df = df.from_records(rollup_records)
# rollup_df["stype"] = rollup_df.stype.replace("SPEAKER_00", "Name0").replace("SPEAKER_01", "Name1").replace("SPEAKER_02", "Name2").replace("SPEAKER_03", "Name3")
@jonathanjfshaw
Copy link

jonathanjfshaw commented Mar 19, 2023

This approach seems simpler and works well too:

import os
from pydub import AudioSegment
from pyannote.audio import Pipeline
from pyannote.core import Segment, Timeline, Annotation

class Diarizer:

    def __init__(self):
        """
        Initialize speaker diarization model.

        """
        self.pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="HF_TOKEN")
        self.timeline = None
        self.annotation = None

    def diarize(self, audio_path: str):
        """
        Perform speaker diarization on an audio signal.

        Parameters
        ----------
        audio_path : str
            Path to audio file to perform diarization on.

        Returns
        -------
        None

        """
        assert os.path.isfile(audio_path), f"File not found at path: {audio_path}"
        print("Running diarization model")
        self.annotation = self.pipeline(str(audio_path))
        self.timeline = self.annotation.get_timeline()
        return self.annotation

    def merge_consecutive(self):
        """Merge consecutive segments.

        Note
        ----
        This will assign any time period when there are overlapping speakers to the speaker who started 
        speaking first. This is an acceptable compromise if overlapping speech is rare, and you're trying to 
        annotate the output of an automatic speech recognition system like Whisper that handles
        overlapping speech poorly.
        """
        print("Merging adjacent speakers in timeline")
        annotation = Annotation()
        previous_label = None
        previous_segment_end = 0
        previous_segment_start = 0
        for segment, _, label in self.annotation.itertracks(yield_label=True):
            if label is not previous_label:
                # If the current segment overlaps completely with previous segment, ignore it.
                if previous_segment_end > segment.end:
                    next(self.annotation.itertracks)
                # Otherwise, because the speaker is different, close the previous segment
                annotation[Segment(previous_segment_start, previous_segment_end)] = previous_label
                previous_segment_start = previous_segment_end
                previous_label = label
            previous_segment_end = segment.end

        # Add the last iterated segment to the new annotation
        annotation[Segment(previous_segment_start, previous_segment_end)] = previous_label
        self.annotation = annotation
        return annotation

    def get_list(self):
        output = []
        for segment, _, label in self.annotation.itertracks(yield_label=True):
            output.append( {
                "start": segment.start,
                "end": segment.end,
                "speaker": label,
            })
        return output

@lmmx
Copy link
Author

lmmx commented Mar 19, 2023

Hey heads up you included your auth token there, might want to delete comment and repost !

@jonathanjfshaw
Copy link

Actually not mine, I got it from a space I copied some of the code from. I must remember to sort it in my space. Not sure yet how to work with secrets in HF spaces.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment