Skip to content

Instantly share code, notes, and snippets.

@peralta
Created April 27, 2025 20:15
Show Gist options
  • Save peralta/e183966e96b6722fc629549bf54519bf to your computer and use it in GitHub Desktop.
Save peralta/e183966e96b6722fc629549bf54519bf to your computer and use it in GitHub Desktop.
Identify individual tracks in a dj set
import asyncio
from shazamio import Shazam, Serialize
import sys
import datetime
import logging
import time
import argparse
import os
import io
import csv
# --- Dependency Check & Imports for pydub ---
try:
import pydub
from pydub.exceptions import CouldntDecodeError
except ImportError:
print("Error: 'pydub' library not found...", file=sys.stderr); sys.exit(1)
# --- Constants ---
DEFAULT_TIME_STEP_SECONDS = 60
DEFAULT_SEGMENT_DURATION_SECONDS = 10
MIN_API_INTERVAL_SECONDS = 1.0
# --- Helper Functions (setup_logging, parse_arguments, etc.) ---
# Assume setup_logging, parse_arguments, recognize_song_from_bytes,
# _extract_spotify_link, and process_recognition_result are defined
# as in the previous complete version.
# --- recognize_song_from_bytes ---
async def recognize_song_from_bytes(shazam: Shazam, audio_bytes: bytes) -> dict | None:
# (Implementation from previous version - unchanged)
if not audio_bytes: logging.warning("Received empty audio bytes..."); return None
logging.debug(f"Attempting recognition on {len(audio_bytes)} bytes...")
start_time = time.monotonic()
try: out = await shazam.recognize(audio_bytes); t = time.monotonic()-start_time; logging.debug(f"Shazam completed in {t:.3f}s."); return out
except Exception as e: logging.error(f"Error during Shazam API call: {e}", exc_info=False); return None
# --- _extract_spotify_link ---
def _extract_spotify_link(track_data: dict) -> str:
# (Implementation from previous version - unchanged)
if not track_data: return ""
try:
hub_actions = track_data.get('hub', {}).get('actions', [])
for action in hub_actions:
if isinstance(action, dict):
action_type = action.get('type', '').lower(); uri = action.get('uri')
if uri and (action_type == 'spotifyopen' or action_type == 'uri'):
if "spotify:track:" in uri or "open.spotify.com/track/" in uri: return uri
providers = track_data.get('hub', {}).get('providers', [])
for provider in providers:
if isinstance(provider, dict) and provider.get('type', '').lower() == 'spotify':
actions = provider.get('actions', [])
for action in actions:
if isinstance(action, dict) and action.get('type', '').lower() == 'uri':
uri = action.get('uri')
if uri and ("spotify:track:" in uri or "open.spotify.com/track/" in uri): return uri
url = track_data.get('url');
if url and "open.spotify.com/track/" in url: return url
except Exception as e: logging.warning(f"Error parsing for Spotify link: {e}", exc_info=False)
return ""
# --- process_recognition_result ---
def process_recognition_result(result: dict | None) -> tuple[str | None, str | None, str]:
# (Implementation from previous version - unchanged)
spotify_link = ""
if result is None: return None, "Recognition failed", spotify_link
logging.debug(f"Raw recognition result: {result}")
if 'track' in result and isinstance(result.get('track'), dict) and result['track']:
track_data = result['track']; logging.debug(f"Processing track data: {track_data}")
required_keys = ['key', 'title']
if not all(key in track_data for key in required_keys):
logging.warning(f"Track data missing required keys ({required_keys})."); return None, "Missing essential details", spotify_link
try:
track_key = track_data.get('key'); title = track_data.get('title'); artist = track_data.get('subtitle', "Unknown Artist")
if track_key and title:
display_string = f"{title} - {artist}"; spotify_link = _extract_spotify_link(track_data)
logging.info(f"Processed: {display_string}, Spotify: '{spotify_link or 'N/A'}'")
return track_key, display_string, spotify_link
else: logging.warning(f"Required keys existed but had falsy values?"); return None, "Essential details empty", spotify_link
except Exception as e: logging.error(f"Unexpected error processing track data: {e}", exc_info=True); return None, "Error processing track data", spotify_link
else: logging.info("No valid 'track' dictionary found."); return None, "No track recognized", spotify_link
# --- Modified Analysis Function ---
async def analyze_audio_by_time(file_path: str, time_step_s: int, segment_duration_s: int, output_csv_file: str):
"""
Analyzes audio file by time steps, rate-limited, filters spurious changes,
and writes confirmed results to CSV.
"""
print("--- Audio Analysis (Rate-Limited, Filtered, CSV Output) ---")
print(f"Analyzing: '{os.path.basename(file_path)}'")
# ...(print other parameters)...
print(f"Output CSV: '{output_csv_file}'")
print("Loading audio file...")
try:
audio = pydub.AudioSegment.from_file(file_path)
total_duration_s = audio.duration_seconds
logging.info(f"Audio loaded. Duration: {total_duration_s:.2f}s.")
print(f"Total Duration: {datetime.timedelta(seconds=int(total_duration_s))}")
print("Starting analysis (will report changes only after confirmation)...")
except (CouldntDecodeError, FileNotFoundError) as e:
# ...(error handling for loading)...
return
shazam = Shazam()
current_offset_s = 0
segment_duration_ms = segment_duration_s * 1000
last_api_call_start_time = time.monotonic() - MIN_API_INTERVAL_SECONDS
# --- State for Filtering/Debouncing ---
confirmed_track_key = None # The key of the last reported song/state
# confirmed_display_info = "Startup" # Optional: Store display info too if needed outside reporting
candidate_track_key = None # Key identified in the previous step, if different from confirmed
candidate_display_info = None # Display info corresponding to the candidate
candidate_timestamp_str = None # Timestamp when the candidate was *first* seen
candidate_spotify_link = None # Spotify link corresponding to the candidate
try:
with open(output_csv_file, 'w', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL)
csv_header = ['Timestamp', 'Song', 'Spotify Link']
csv_writer.writerow(csv_header)
logging.info(f"Opened '{output_csv_file}' and wrote header.")
while True: # Main analysis loop
start_ms = current_offset_s * 1000
end_ms = start_ms + segment_duration_ms
if start_ms >= total_duration_s * 1000: break # End of audio
end_ms = min(end_ms, total_duration_s * 1000)
if start_ms >= end_ms: current_offset_s += time_step_s; continue
# --- Segment Extraction & Export (simplified for brevity) ---
try: segment = audio[start_ms:end_ms]; buffer = io.BytesIO(); segment.export(buffer, format="wav"); segment_bytes = buffer.getvalue(); buffer.close()
except Exception as e: logging.error(f"Error processing segment @{current_offset_s}s: {e}"); current_offset_s += time_step_s; continue
# --- Rate Limiting ---
now = time.monotonic(); time_since_last_call = now - last_api_call_start_time
if time_since_last_call < MIN_API_INTERVAL_SECONDS:
delay_needed = MIN_API_INTERVAL_SECONDS - time_since_last_call
logging.info(f"Rate limiting: Waiting for {delay_needed:.3f}s")
await asyncio.sleep(delay_needed)
last_api_call_start_time = time.monotonic()
# --- Recognition & Processing ---
current_time_str = str(datetime.timedelta(seconds=int(current_offset_s)))
logging.info(f"--- Analyzing segment @ {current_time_str} ({current_offset_s}s) ---")
recognition_result = await recognize_song_from_bytes(shazam, segment_bytes)
# Get potential key, info, and link for the *current* segment
potential_track_key, potential_display_info, potential_spotify_link = process_recognition_result(recognition_result)
logging.debug(f"State: Confirmed='{confirmed_track_key}', Candidate='{candidate_track_key}', Potential='{potential_track_key}'")
# --- Filtering and Confirmation Logic ---
report_confirmed_change = False # Flag to trigger reporting/writing
if potential_track_key == confirmed_track_key:
# Still matches the confirmed state. Reset any pending candidate.
if candidate_track_key is not None:
logging.info(f"Potential ('{potential_display_info or 'None'}') reverted to confirmed state. Discarding candidate ('{candidate_display_info or 'None'}').")
candidate_track_key = None # Discard candidate
elif potential_track_key == candidate_track_key:
# Matches the candidate! Confirmation.
if candidate_track_key is not None: # Ensure we actually had a candidate
logging.info(f"CONFIRMED change to '{candidate_display_info}'. First seen at {candidate_timestamp_str}.")
confirmed_track_key = candidate_track_key # Promote candidate to confirmed
# Prepare data for reporting using candidate's info and timestamp
report_data = {
"timestamp": candidate_timestamp_str,
"display": candidate_display_info,
"spotify": candidate_spotify_link # Use candidate's spotify link
}
report_confirmed_change = True
candidate_track_key = None # Reset candidate, it's now confirmed
else:
# This case (potential==candidate==None) might happen if unrecognized persists
logging.debug("Potential matches candidate, but candidate was None. No change to report.")
pass # Still unrecognized, confirmed state remains None if it was
else: # Potential is different from both confirmed and candidate
logging.info(f"New potential candidate '{potential_display_info or 'None'}' detected @{current_time_str}. Storing.")
# Store this new potential change as the candidate for the *next* iteration
candidate_track_key = potential_track_key
candidate_display_info = potential_display_info
candidate_timestamp_str = current_time_str
candidate_spotify_link = potential_spotify_link # Store link with candidate
# Do not report yet.
# --- Report Confirmed Change & Write CSV ---
if report_confirmed_change:
# Use the stored report_data from the confirmation step
rpt_time = report_data["timestamp"]
rpt_display = report_data["display"]
rpt_spotify = report_data["spotify"] or "" # Ensure empty string, not None
if confirmed_track_key: # A new song was confirmed
print(f"[{rpt_time}] NEW SONG: {rpt_display}")
csv_writer.writerow([rpt_time, rpt_display, rpt_spotify])
logging.info(f"Wrote confirmed change to CSV: {[rpt_time, rpt_display, rpt_spotify]}")
else: # Change confirmed to "Unrecognized" (confirmed_track_key is None)
print(f"[{rpt_time}] SONG ENDED / UNRECOGNIZED")
logging.info(f"Change to 'Unrecognized' confirmed effective at {rpt_time}.")
# Not writing this state to CSV based on previous request
# Reset candidate state *after* reporting confirmation
candidate_track_key = None
candidate_display_info = None
candidate_timestamp_str = None
candidate_spotify_link = None
# --- Move to the next time step ---
current_offset_s += time_step_s
# --- End of While Loop ---
except IOError as e:
logging.critical(f"IOError with CSV '{output_csv_file}': {e}", exc_info=True)
print(f"\nError interacting with CSV file '{output_csv_file}'.", file=sys.stderr)
except Exception as e:
logging.critical(f"Unexpected error during analysis loop: {e}", exc_info=True)
print(f"\nAn unexpected error occurred: {e}", file=sys.stderr)
print(f"--- Analysis complete --- (Output: {output_csv_file})")
logging.info(f"Finished analysis. Output CSV: {output_csv_file}")
# --- Main Execution (including setup_logging, parse_arguments, main) ---
async def main():
"""Orchestrates the setup and execution."""
audio_file, time_step, segment_duration, log_level, output_csv_file = parse_arguments()
setup_logging(log_level)
try:
await analyze_audio_by_time(audio_file, time_step, segment_duration, output_csv_file)
except Exception as e:
logging.critical(f"Unhandled exception in main execution: {e}", exc_info=True); print(f"\nCritical error: {e}", file=sys.stderr)
def setup_logging(log_level=logging.INFO): # Define if not copy-pasted fully
logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - [%(funcName)s] %(message)s')
logging.info(f"Logging level set to: {logging.getLevelName(log_level)}")
def parse_arguments(): # Define if not copy-pasted fully
parser = argparse.ArgumentParser(description="Analyze audio with Shazam, rate-limited, filtered, outputs to CSV.", formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("audio_file", help="Path to the audio file.")
parser.add_argument("-t", "--time-step",type=int, default=DEFAULT_TIME_STEP_SECONDS, help="Time step in seconds.")
parser.add_argument("-d", "--duration", type=int, default=DEFAULT_SEGMENT_DURATION_SECONDS, help="Segment duration (seconds).")
parser.add_argument("-v", "--verbose", action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO, help="Enable debug logging.")
parser.add_argument("-o", "--output", help="Output CSV file path. Defaults to '<audio_filename_base>.csv'.")
args = parser.parse_args()
if not os.path.exists(args.audio_file): print(f"Error: File not found: {args.audio_file}", file=sys.stderr); sys.exit(1)
if args.time_step <= 0: print("Error: Time step must be > 0.", file=sys.stderr); sys.exit(1)
if args.duration <= 0: print("Error: Duration must be > 0.", file=sys.stderr); sys.exit(1)
output_csv_file = args.output or os.path.splitext(args.audio_file)[0] + ".csv"
logging.info(f"Output CSV set to: {output_csv_file}")
return args.audio_file, args.time_step, args.duration, args.loglevel, output_csv_file
if __name__ == "__main__":
print("NOTE: Requires 'pydub' (pip install pydub) and FFmpeg/Libav (ffmpeg.org).")
try: asyncio.run(main())
except KeyboardInterrupt: print("\nAnalysis interrupted."); logging.warning("Interrupted by user.")
except SystemExit as e: logging.info(f"Exited with code {e.code}.")
finally: logging.info("Application finished.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment