-
-
Save peralta/e183966e96b6722fc629549bf54519bf to your computer and use it in GitHub Desktop.
Identify individual tracks in a dj set
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from shazamio import Shazam, Serialize | |
import sys | |
import datetime | |
import logging | |
import time | |
import argparse | |
import os | |
import io | |
import csv | |
# --- Dependency Check & Imports for pydub --- | |
try: | |
import pydub | |
from pydub.exceptions import CouldntDecodeError | |
except ImportError: | |
print("Error: 'pydub' library not found...", file=sys.stderr); sys.exit(1) | |
# --- Constants --- | |
DEFAULT_TIME_STEP_SECONDS = 60 | |
DEFAULT_SEGMENT_DURATION_SECONDS = 10 | |
MIN_API_INTERVAL_SECONDS = 1.0 | |
# --- Helper Functions (setup_logging, parse_arguments, etc.) --- | |
# Assume setup_logging, parse_arguments, recognize_song_from_bytes, | |
# _extract_spotify_link, and process_recognition_result are defined | |
# as in the previous complete version. | |
# --- recognize_song_from_bytes --- | |
async def recognize_song_from_bytes(shazam: Shazam, audio_bytes: bytes) -> dict | None: | |
# (Implementation from previous version - unchanged) | |
if not audio_bytes: logging.warning("Received empty audio bytes..."); return None | |
logging.debug(f"Attempting recognition on {len(audio_bytes)} bytes...") | |
start_time = time.monotonic() | |
try: out = await shazam.recognize(audio_bytes); t = time.monotonic()-start_time; logging.debug(f"Shazam completed in {t:.3f}s."); return out | |
except Exception as e: logging.error(f"Error during Shazam API call: {e}", exc_info=False); return None | |
# --- _extract_spotify_link --- | |
def _extract_spotify_link(track_data: dict) -> str: | |
# (Implementation from previous version - unchanged) | |
if not track_data: return "" | |
try: | |
hub_actions = track_data.get('hub', {}).get('actions', []) | |
for action in hub_actions: | |
if isinstance(action, dict): | |
action_type = action.get('type', '').lower(); uri = action.get('uri') | |
if uri and (action_type == 'spotifyopen' or action_type == 'uri'): | |
if "spotify:track:" in uri or "open.spotify.com/track/" in uri: return uri | |
providers = track_data.get('hub', {}).get('providers', []) | |
for provider in providers: | |
if isinstance(provider, dict) and provider.get('type', '').lower() == 'spotify': | |
actions = provider.get('actions', []) | |
for action in actions: | |
if isinstance(action, dict) and action.get('type', '').lower() == 'uri': | |
uri = action.get('uri') | |
if uri and ("spotify:track:" in uri or "open.spotify.com/track/" in uri): return uri | |
url = track_data.get('url'); | |
if url and "open.spotify.com/track/" in url: return url | |
except Exception as e: logging.warning(f"Error parsing for Spotify link: {e}", exc_info=False) | |
return "" | |
# --- process_recognition_result --- | |
def process_recognition_result(result: dict | None) -> tuple[str | None, str | None, str]: | |
# (Implementation from previous version - unchanged) | |
spotify_link = "" | |
if result is None: return None, "Recognition failed", spotify_link | |
logging.debug(f"Raw recognition result: {result}") | |
if 'track' in result and isinstance(result.get('track'), dict) and result['track']: | |
track_data = result['track']; logging.debug(f"Processing track data: {track_data}") | |
required_keys = ['key', 'title'] | |
if not all(key in track_data for key in required_keys): | |
logging.warning(f"Track data missing required keys ({required_keys})."); return None, "Missing essential details", spotify_link | |
try: | |
track_key = track_data.get('key'); title = track_data.get('title'); artist = track_data.get('subtitle', "Unknown Artist") | |
if track_key and title: | |
display_string = f"{title} - {artist}"; spotify_link = _extract_spotify_link(track_data) | |
logging.info(f"Processed: {display_string}, Spotify: '{spotify_link or 'N/A'}'") | |
return track_key, display_string, spotify_link | |
else: logging.warning(f"Required keys existed but had falsy values?"); return None, "Essential details empty", spotify_link | |
except Exception as e: logging.error(f"Unexpected error processing track data: {e}", exc_info=True); return None, "Error processing track data", spotify_link | |
else: logging.info("No valid 'track' dictionary found."); return None, "No track recognized", spotify_link | |
# --- Modified Analysis Function --- | |
async def analyze_audio_by_time(file_path: str, time_step_s: int, segment_duration_s: int, output_csv_file: str): | |
""" | |
Analyzes audio file by time steps, rate-limited, filters spurious changes, | |
and writes confirmed results to CSV. | |
""" | |
print("--- Audio Analysis (Rate-Limited, Filtered, CSV Output) ---") | |
print(f"Analyzing: '{os.path.basename(file_path)}'") | |
# ...(print other parameters)... | |
print(f"Output CSV: '{output_csv_file}'") | |
print("Loading audio file...") | |
try: | |
audio = pydub.AudioSegment.from_file(file_path) | |
total_duration_s = audio.duration_seconds | |
logging.info(f"Audio loaded. Duration: {total_duration_s:.2f}s.") | |
print(f"Total Duration: {datetime.timedelta(seconds=int(total_duration_s))}") | |
print("Starting analysis (will report changes only after confirmation)...") | |
except (CouldntDecodeError, FileNotFoundError) as e: | |
# ...(error handling for loading)... | |
return | |
shazam = Shazam() | |
current_offset_s = 0 | |
segment_duration_ms = segment_duration_s * 1000 | |
last_api_call_start_time = time.monotonic() - MIN_API_INTERVAL_SECONDS | |
# --- State for Filtering/Debouncing --- | |
confirmed_track_key = None # The key of the last reported song/state | |
# confirmed_display_info = "Startup" # Optional: Store display info too if needed outside reporting | |
candidate_track_key = None # Key identified in the previous step, if different from confirmed | |
candidate_display_info = None # Display info corresponding to the candidate | |
candidate_timestamp_str = None # Timestamp when the candidate was *first* seen | |
candidate_spotify_link = None # Spotify link corresponding to the candidate | |
try: | |
with open(output_csv_file, 'w', newline='', encoding='utf-8') as csvfile: | |
csv_writer = csv.writer(csvfile, quoting=csv.QUOTE_MINIMAL) | |
csv_header = ['Timestamp', 'Song', 'Spotify Link'] | |
csv_writer.writerow(csv_header) | |
logging.info(f"Opened '{output_csv_file}' and wrote header.") | |
while True: # Main analysis loop | |
start_ms = current_offset_s * 1000 | |
end_ms = start_ms + segment_duration_ms | |
if start_ms >= total_duration_s * 1000: break # End of audio | |
end_ms = min(end_ms, total_duration_s * 1000) | |
if start_ms >= end_ms: current_offset_s += time_step_s; continue | |
# --- Segment Extraction & Export (simplified for brevity) --- | |
try: segment = audio[start_ms:end_ms]; buffer = io.BytesIO(); segment.export(buffer, format="wav"); segment_bytes = buffer.getvalue(); buffer.close() | |
except Exception as e: logging.error(f"Error processing segment @{current_offset_s}s: {e}"); current_offset_s += time_step_s; continue | |
# --- Rate Limiting --- | |
now = time.monotonic(); time_since_last_call = now - last_api_call_start_time | |
if time_since_last_call < MIN_API_INTERVAL_SECONDS: | |
delay_needed = MIN_API_INTERVAL_SECONDS - time_since_last_call | |
logging.info(f"Rate limiting: Waiting for {delay_needed:.3f}s") | |
await asyncio.sleep(delay_needed) | |
last_api_call_start_time = time.monotonic() | |
# --- Recognition & Processing --- | |
current_time_str = str(datetime.timedelta(seconds=int(current_offset_s))) | |
logging.info(f"--- Analyzing segment @ {current_time_str} ({current_offset_s}s) ---") | |
recognition_result = await recognize_song_from_bytes(shazam, segment_bytes) | |
# Get potential key, info, and link for the *current* segment | |
potential_track_key, potential_display_info, potential_spotify_link = process_recognition_result(recognition_result) | |
logging.debug(f"State: Confirmed='{confirmed_track_key}', Candidate='{candidate_track_key}', Potential='{potential_track_key}'") | |
# --- Filtering and Confirmation Logic --- | |
report_confirmed_change = False # Flag to trigger reporting/writing | |
if potential_track_key == confirmed_track_key: | |
# Still matches the confirmed state. Reset any pending candidate. | |
if candidate_track_key is not None: | |
logging.info(f"Potential ('{potential_display_info or 'None'}') reverted to confirmed state. Discarding candidate ('{candidate_display_info or 'None'}').") | |
candidate_track_key = None # Discard candidate | |
elif potential_track_key == candidate_track_key: | |
# Matches the candidate! Confirmation. | |
if candidate_track_key is not None: # Ensure we actually had a candidate | |
logging.info(f"CONFIRMED change to '{candidate_display_info}'. First seen at {candidate_timestamp_str}.") | |
confirmed_track_key = candidate_track_key # Promote candidate to confirmed | |
# Prepare data for reporting using candidate's info and timestamp | |
report_data = { | |
"timestamp": candidate_timestamp_str, | |
"display": candidate_display_info, | |
"spotify": candidate_spotify_link # Use candidate's spotify link | |
} | |
report_confirmed_change = True | |
candidate_track_key = None # Reset candidate, it's now confirmed | |
else: | |
# This case (potential==candidate==None) might happen if unrecognized persists | |
logging.debug("Potential matches candidate, but candidate was None. No change to report.") | |
pass # Still unrecognized, confirmed state remains None if it was | |
else: # Potential is different from both confirmed and candidate | |
logging.info(f"New potential candidate '{potential_display_info or 'None'}' detected @{current_time_str}. Storing.") | |
# Store this new potential change as the candidate for the *next* iteration | |
candidate_track_key = potential_track_key | |
candidate_display_info = potential_display_info | |
candidate_timestamp_str = current_time_str | |
candidate_spotify_link = potential_spotify_link # Store link with candidate | |
# Do not report yet. | |
# --- Report Confirmed Change & Write CSV --- | |
if report_confirmed_change: | |
# Use the stored report_data from the confirmation step | |
rpt_time = report_data["timestamp"] | |
rpt_display = report_data["display"] | |
rpt_spotify = report_data["spotify"] or "" # Ensure empty string, not None | |
if confirmed_track_key: # A new song was confirmed | |
print(f"[{rpt_time}] NEW SONG: {rpt_display}") | |
csv_writer.writerow([rpt_time, rpt_display, rpt_spotify]) | |
logging.info(f"Wrote confirmed change to CSV: {[rpt_time, rpt_display, rpt_spotify]}") | |
else: # Change confirmed to "Unrecognized" (confirmed_track_key is None) | |
print(f"[{rpt_time}] SONG ENDED / UNRECOGNIZED") | |
logging.info(f"Change to 'Unrecognized' confirmed effective at {rpt_time}.") | |
# Not writing this state to CSV based on previous request | |
# Reset candidate state *after* reporting confirmation | |
candidate_track_key = None | |
candidate_display_info = None | |
candidate_timestamp_str = None | |
candidate_spotify_link = None | |
# --- Move to the next time step --- | |
current_offset_s += time_step_s | |
# --- End of While Loop --- | |
except IOError as e: | |
logging.critical(f"IOError with CSV '{output_csv_file}': {e}", exc_info=True) | |
print(f"\nError interacting with CSV file '{output_csv_file}'.", file=sys.stderr) | |
except Exception as e: | |
logging.critical(f"Unexpected error during analysis loop: {e}", exc_info=True) | |
print(f"\nAn unexpected error occurred: {e}", file=sys.stderr) | |
print(f"--- Analysis complete --- (Output: {output_csv_file})") | |
logging.info(f"Finished analysis. Output CSV: {output_csv_file}") | |
# --- Main Execution (including setup_logging, parse_arguments, main) --- | |
async def main(): | |
"""Orchestrates the setup and execution.""" | |
audio_file, time_step, segment_duration, log_level, output_csv_file = parse_arguments() | |
setup_logging(log_level) | |
try: | |
await analyze_audio_by_time(audio_file, time_step, segment_duration, output_csv_file) | |
except Exception as e: | |
logging.critical(f"Unhandled exception in main execution: {e}", exc_info=True); print(f"\nCritical error: {e}", file=sys.stderr) | |
def setup_logging(log_level=logging.INFO): # Define if not copy-pasted fully | |
logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - [%(funcName)s] %(message)s') | |
logging.info(f"Logging level set to: {logging.getLevelName(log_level)}") | |
def parse_arguments(): # Define if not copy-pasted fully | |
parser = argparse.ArgumentParser(description="Analyze audio with Shazam, rate-limited, filtered, outputs to CSV.", formatter_class=argparse.ArgumentDefaultsHelpFormatter) | |
parser.add_argument("audio_file", help="Path to the audio file.") | |
parser.add_argument("-t", "--time-step",type=int, default=DEFAULT_TIME_STEP_SECONDS, help="Time step in seconds.") | |
parser.add_argument("-d", "--duration", type=int, default=DEFAULT_SEGMENT_DURATION_SECONDS, help="Segment duration (seconds).") | |
parser.add_argument("-v", "--verbose", action="store_const", dest="loglevel", const=logging.DEBUG, default=logging.INFO, help="Enable debug logging.") | |
parser.add_argument("-o", "--output", help="Output CSV file path. Defaults to '<audio_filename_base>.csv'.") | |
args = parser.parse_args() | |
if not os.path.exists(args.audio_file): print(f"Error: File not found: {args.audio_file}", file=sys.stderr); sys.exit(1) | |
if args.time_step <= 0: print("Error: Time step must be > 0.", file=sys.stderr); sys.exit(1) | |
if args.duration <= 0: print("Error: Duration must be > 0.", file=sys.stderr); sys.exit(1) | |
output_csv_file = args.output or os.path.splitext(args.audio_file)[0] + ".csv" | |
logging.info(f"Output CSV set to: {output_csv_file}") | |
return args.audio_file, args.time_step, args.duration, args.loglevel, output_csv_file | |
if __name__ == "__main__": | |
print("NOTE: Requires 'pydub' (pip install pydub) and FFmpeg/Libav (ffmpeg.org).") | |
try: asyncio.run(main()) | |
except KeyboardInterrupt: print("\nAnalysis interrupted."); logging.warning("Interrupted by user.") | |
except SystemExit as e: logging.info(f"Exited with code {e.code}.") | |
finally: logging.info("Application finished.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment