Skip to content

Instantly share code, notes, and snippets.

@dannymichel
Created December 6, 2016 16:19
Show Gist options
  • Save dannymichel/a4f1abfde9d7f7748dc2732607d83356 to your computer and use it in GitHub Desktop.
Save dannymichel/a4f1abfde9d7f7748dc2732607d83356 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
from __future__ import print_function
__doc__ = '''This script is designed to make uploads to Gazelle private trackers
easier and faster.
It will:
0. create a torrent file if necessary
1. inspect audio files
2. extract the AcoustID
3. lookup the files on Musicbrainz to extract metadata
4. look for duplicates on a Gazelle instance
5. prompt for missing information
6. upload given torrent file on upload.php
You can either pass an existing torrent file (``--torrent``) or ask
the script to generate one for you (``--mktorrent``). If both are
provided, the torrent will be overwritten with the new settings. This
is to make sure you reupload existing what.cd torrents if you have
them. Use the --announce argument to point to your personal announce
URL as specified in upload.php. This replaces the announce URL in
existing torrent files, so you can reuse your what.cd torrents.
This processes a single Album at a time. The rationale is that file
layout varies and it is difficult to find the right folder hierachy
that should be shipped in a torrent file.
Only music is supported for now.
Known issues:
* user interface is very rough: unicode-encoded strings may be
unreadable, but should still be submitted correctly. similarly, you
will need to convert MB media formats into something Gazelle knows about
* parsing a large number of files for AcoustID is slow
* can't upload pre-1982 CDs: we need to support multiple releases and
this is currently broken
* duplication detector sometimes matches too much, and doesn't know
trumping rules
In general, do things the way upload.php does it.
'''
# Copyright 2011, Adrian Sampson.
# Copyright 2016, Fuda Fafa
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
# relevant documentation:
# https://github.com/WhatCD/Gazelle/wiki/JSON-API-Documentation#torrent
# http://click.pocoo.org/
# http://docs.python-requests.org/
# https://python-musicbrainzngs.readthedocs.io/
# http://musicbrainz.org/development/mmd
# https://musicbrainz.org/doc/Release#Status
# changelog:
# 1.0: first release
# 1.1: automatically modify the announce URL when provided with --announce
# 1.2: torrent generation --mktorrent, --version support
# 1.2.1: fixed 1.1 and 1.2 features so they actually work.
# 1.2.2: fixed tags so they are correctly comma-separated
# 1.2.3: fixed rare artist parsing problem (with "feat." artists)
# 1.2.4: report exception when acoustid fails, thanks frizkie
# 1.3.0: format / bitrate detection
# 1.3.1: fix issues with unicode filenames and mktorrent
# 1.3.2: fix handling of errors in exiftool
# 1.3.3: fix error handling, try to handle failures better
# 1.3.4: handle missing exiftool
# 1.3.5: small bugfixes
# 1.4.0: retry metadata on upload failures, remove duplicate / in
# URLs, deal better with garbage all over
# 1.5.0: source support for PTH, better error handling again
# 1.6.0: refactoring, no overwrite by default, --mktorrent and
# --torrent supported together
# 1.6.1: deal with more corner cases, put version and name in release_desc
__version__ = '1.6.1'
import errno
import json
import logging
import operator
import os
import os.path
import re
import subprocess
import sys
__prog__ = os.path.basename(sys.argv and sys.argv[0] or __file__)
import acoustid
import click
from musicbrainzngs import get_recording_by_id, get_image_list, set_useragent, get_release_by_id
from musicbrainzngs.musicbrainz import ResponseError
import requests
import requests.utils
# API key for this demo script only. Get your own API key at the
# Acoustid Web for your application.
# http://acoustid.org/
API_KEY = 'cSpUJKpD'
def dump(*kargs, **kwargs):
return json.dumps(*kargs, indent=2, **kwargs)
def dir_size(path='.'):
total_size = 0
for dirpath, dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return total_size
def make_torrent(directory, announce):
torrent = None
try:
from BitTorrent.btmakemetafile import make_meta_file
except ImportError as e:
logging.error("can't generate torrent: %s", e)
else:
size = dir_size(directory)
torrent = directory + '.torrent'
logging.warn('found %s bytes in %s, torrent %s',
size, directory, torrent)
with click.progressbar(label='creating torrent',
length=size) as bar:
make_meta_file(bytes(directory.encode('utf-8')),
bytes(announce.encode('utf-8')),
target=torrent,
progress=bar.update, progress_percent=False)
return torrent
def rewrite_torrent_data(torrent, announce=False, source=False):
try:
from BitTorrent import bencode
except ImportError:
import bencode
with open(torrent, 'rb') as torrentfile:
torrent_data = bencode.bdecode(torrentfile.read())
if announce:
torrent_data['announce'] = str(announce)
if source:
torrent_data['info']['source'] = str(source)
torrent_data['info']['private'] = 1 # for good measure
torrent_data['encoding'] = 'UTF-8'
return bencode.bencode(torrent_data)
def tracker_login(tracker, username, password):
# make sure we have a trailing slash
session = requests.Session()
ua = '%s (%s/%s)' % (requests.utils.default_user_agent(),
__prog__, __version__)
session.headers['User-Agent'] = ua
r = session.get(tracker + 'login.php')
logging.debug('headers: %s', r.headers)
if r.status_code != requests.codes.ok:
logging.error('could not access site %s', tracker)
creds = {'username': username, 'password': password}
r = session.post(tracker + 'login.php',
data=creds, allow_redirects=False)
logging.debug('headers: %s', r.headers)
logging.debug('body: %s', r.text)
r.raise_for_status()
if r.status_code != requests.codes.found:
raise requests.HTTPError('wrong password?')
else:
logging.info('logged into tracker %s', tracker)
return session
def identify_format(paths):
command = ['exiftool', '-json', '-FileType', '-LameVBRQuality', '-AudioBitrate']
command += paths
bitrate = 'Other'
try:
content = subprocess.check_output(command)
except subprocess.CalledProcessError as e:
logging.info('exiftool complained: %s', e)
# this will happen on .cue files and so on
content = e.output
except OSError as e:
if e.errno == errno.ENOENT:
logging.warn('exiftool not found, cannot identify bit rate')
return
else:
raise e
metas = json.loads(content)
for meta in metas:
fmt = meta.get('FileType', None)
# covered: 192, 256, 320, Vx (VBR), FLAC (Lossless)
# not covered: APS (VBR), APX (VBR), q8.x (VBR), 24bit Lossless, Other
if fmt == 'MP3':
bitrate = meta.get('LameVBRQuality', False)
if bitrate is not False:
bitrate = 'V%s (VBR)' % bitrate
else:
bitrate = meta.get('AudioBitrate', 'Other')
bitrate = bitrate.replace(' kbps', '')
elif fmt == 'FLAC':
bitrate = 'Lossless'
elif fmt in ['M2TS', 'JPEG', 'HTML', 'PDF']:
continue
elif fmt is None:
continue
yield fmt, bitrate
def process_album(files, tracker, session):
logging.debug('checking files %s', files)
releases_meta = {} # copy of the release metadata returned by recordings
releases_scores = {}
with click.progressbar(files, label='analyzing files') as bar:
for path in bar:
logging.debug('checking %s' % path)
for score, recid, title, artist in match_recording(path.encode('utf-8')):
logging.debug('matched with recording %s %s %s %s',
score, recid, title, artist)
includes = ['artists', 'releases']
try:
recording = get_recording_by_id(recid,
includes=includes)
except ResponseError as e:
if '404' not in str(e):
raise
else:
logging.warn('recording id %s not found', recid)
continue
logging.debug('releases: %s', dump(recording))
for release in recording['recording']['release-list']:
rid = release['id']
if rid not in releases_meta:
releases_scores[rid] = 0.0
release['artist-credit'] = recording['recording']['artist-credit']
releases_meta[rid] = release
releases_scores[rid] += score
if not releases_meta:
logging.warn('could not find release on Musicbrainz!')
return {}
output = check_releases(releases_meta, releases_scores)
if tracker and session:
try:
dupes = find_duplicates(tracker, session, output)
except (ValueError, requests.HTTPError):
logging.warn("warning: invalid response, couldn't check for duplicates")
dupes = False
if dupes:
logging.warn('warning: duplicates found')
results = [(result['groupId'], result['torrents'])
for result in dupes]
logging.info('results: %s %s', results, dupes)
for group, torrents in results:
for torrent in torrents:
torrent['tracker'] = tracker
torrent['groupId'] = group
logging.warn('''{media} {format} {encoding} log: {hasLog} {logScore} remastered: {remastered} {remasterYear} {remasterCatalogueNumber} {remasterTitle}
scene: {scene} files: {fileCount} size: {size} D/S/L: {snatches} {seeders} {leechers}
{tracker}torrents.php?id={groupId}&torrendid={torrentId}'''.format(**torrent))
return output
def match_recording(filename):
try:
results = acoustid.match(API_KEY, filename)
except (acoustid.FingerprintGenerationError, EOFError) as e:
logging.warn("fingerprint could not be calculated on %s: %s", filename, e)
return
except acoustid.WebServiceError as exc:
logging.warn("web service request failed: %s", exc.message)
return
for score, recid, title, artist in results:
logging.info('%s (%s - %s, %f%%)',
recid, artist, title, score * 100)
yield score, recid, title, artist
def cover_url(rid):
data = get_image_list(rid)
for image in data["images"]:
if "Front" in image["types"] and image["approved"]:
return image["thumbnails"]["large"]
def find_duplicates(tracker, session, output):
if len(output['artists[]']) > 1:
logging.warn('more than one artists found, duplicate search may fail')
params = {'action': 'browse',
'artistname': output['artists[]'][0],
'groupname': output['title']}
r = session.get(tracker + 'ajax.php', params=params)
logging.debug('headers: %s', r.headers)
logging.debug('content: %s', r.text)
logging.debug('status: %s', r.status_code)
r.raise_for_status()
answer = r.json()
if answer['status'] == 'success' and answer['response']['results']:
return answer['response']['results']
else:
return False
def check_releases(releases_meta, releases_scores):
release_id = max(releases_scores, key=releases_scores.get)
s = sorted(releases_scores.items(), key=operator.itemgetter(1))
logging.info('releases_scores: %s', dump(s))
release = releases_meta[release_id]
logging.debug('full release metadata: %s', dump(release))
# delete useless metadata from output:
output = {k: v for k, v in release.iteritems()
if k in ['date', 'id', 'status', 'title']}
output['year'] = output.get('date', '').split('-')[0]
output['score'] = '%f' % max(releases_scores.values())
output['release-url'] = 'https://musicbrainz.org/release/%s' % release_id
output['artists[]'] = [a['artist']['name']
for a in release['artist-credit']
if type(a) is dict]
includes = ['labels', 'discids', 'tags', 'media',
'release-groups', 'recordings']
more_meta = get_release_by_id(release_id, includes=includes)['release']
logging.debug('more release metadata: %s', dump(more_meta))
# this should be converted between MB and Gazelle formats:
# https://musicbrainz.org/doc/Release/Format
medias = [m.get('format', 'CD')
for m in more_meta.get('medium-list', [])]
output['media'] = " ".join(medias)
labels = [l.get('label', {}).get('name', '')
for l in more_meta.get('label-info-list', [])]
output['record_label'] = " ".join(labels)
catalogs = [l.get('catalog-number', '')
for l in more_meta.get('label-info-list', [])]
output['catalog_number'] = " ".join(catalogs)
for field in ['barcode', 'asin', 'country']:
output[field] = more_meta.get(field, '')
output['release_group_id'] = more_meta['release-group']['id']
output['releasetype'] = more_meta['release-group'].get('type')
output['tags'] = [tag['name'].replace(' ', '.')
for tag in more_meta['release-group'].get('tag-list', [])]
output['tags'] = ", ".join(output['tags'])
output['tracknum'] = sum([len(m['track-list'])
for m in more_meta['medium-list']])
output['album_desc'] = '''
[url=https://musicbrainz.org/release-group/{release_group_id}]MusicBrainz[/url]
[url=http://www.amazon.com/exec/obidos/ASIN/{asin}]Amazon[/url]
Country: {country}
Barcode: {barcode}
Tracks: {tracknum}
Track list:
'''.format(**output)
tracks = [t for m in more_meta.get('medium-list', [])
for t in m.get('track-list', [])]
for track in tracks:
output['album_desc'] += '[#]' + track['recording']['title'] + "\n"
if 'release_desc' not in output:
marker = 'uploaded using %s %s' % (__prog__, __version__)
output['release_desc'] = marker
# should be taken from the above release group info, probably
try:
output['image'] = cover_url(release_id)
if output['image'] is None:
del output['image']
else:
output['image'].replace('http://', 'https://', 1)
except ResponseError as e:
if '404' not in str(e):
raise
return output
def confirm_data(output):
data = {'type': 'Music',
'importance[]': '1', # Main, hardcoded
}
artists = click.prompt('confirm artists', output.get('artists[]', []))
if type(artists) is not list:
artists = artists.split(',')
data['artists[]'] = artists
field_list = ['title',
'year',
'record_label',
'catalog_number',
# remaster, remaster_year,
# remaster_record_label,
# remaster_catalog_number...
# 'scene',
'media',
'format', # missing
'bitrate', # missing
# vbr ("other bitrates"?)
# 'logfiles[]'
'tags',
'album_desc',
'release_desc', # missing
'image']
logging.info('formats: MP3, FLAC, Ogg Vorbis, AAC, AC3, DTS')
logging.info('bitrate: 192, APS (VBR), V2 (VBR), V1 (VBR), 256, APX (VBR), V0 (VBR), q8.x (VBR), 320, Lossless, 24bit Lossless, Other')
logging.warn('confirm metadata, use "none" to avoid using the default value if not checked')
for field in field_list:
if field in output:
data[field] = output[field]
for field in field_list:
logging.debug('old value: %s', repr(data.get(field)))
if field in data:
data[field] = click.prompt('confirm %s' % field, data[field])
if data[field].lower() == 'none':
data[field] = ''
else:
data[field] = click.prompt('enter %s' % field, '')
logging.debug('new value: %s', repr(data.get(field)))
types = {'Album': 1,
'Soundtrack': 3,
'EP': 5,
'Anthology': 6,
'Compilation': 7,
'Single': 9,
'Live album': 11,
'Remix': 13,
'Bootleg': 14,
'Interview': 15,
'Mixtape': 16,
'Unknown': 21}
rtypes = {v: k for k, v in types.iteritems()}
click.echo('release types: %s' % types.keys())
if 'releasetype' in output:
if output['releasetype'] not in types:
# coming from MB
output['releasetype'] = rtypes.get(output['releasetype'])
releasetype = click.prompt('confirm release type',
output['releasetype'])
else:
releasetype = click.prompt('enter release type')
if not types.get(releasetype, False):
logging.warn('unknown release type: %s, defaulting to Album',
releasetype)
data['releasetype'] = types.get(releasetype, 1)
return data
@click.command(epilog=__doc__)
@click.version_option(version=__version__)
@click.argument('directory')
@click.option('--loglevel', 'loglevel',
help='show only warning messages',
type=click.Choice(['WARNING', 'INFO', 'DEBUG']),
flag_value='WARNING', default=True)
@click.option('-v', '--verbose', 'loglevel', help='be more verbose',
flag_value='INFO')
@click.option('-d', '--debug', 'loglevel', help='even more verbose',
flag_value='DEBUG')
@click.option('--tracker', help='use Gazelle instance at URL')
@click.option('--username', prompt=True,
help='username to login with, default: prompted')
@click.password_option(confirmation_prompt=False, help='default: prompted')
@click.option('--torrent', help='torrent file to upload',
type=click.Path(exists=True, readable=True))
@click.option('--announce', help='announce URL to use in torrent')
@click.option('--source', help='source tag to add to the torrent, e.g. "PTH"')
@click.option('--mktorrent', show_default=True, is_flag=True,
help='create a torrent file with the given directory '
'or rewrite provided torrent')
def identify(directory, loglevel, tracker, username, password,
torrent, announce, source, mktorrent):
'''upload given directory and torrent to Gazelle'''
logging.basicConfig(format='%(message)s', level=loglevel)
# required by btmakemetafile
directory = os.path.abspath(directory.rstrip('/'))
if mktorrent:
if not tracker:
raise click.UsageError('--mktorrent needs --tracker')
if announce and not torrent:
torrent = make_torrent(directory, announce)
else:
raise click.UsageError('--mktorrent needs --announce')
auth_token = None
if tracker:
tracker = tracker.rstrip('/') + '/'
session = None
try:
session = tracker_login(tracker, username, password)
except requests.HTTPError as e:
session = None
if torrent or mktorrent:
raise click.UsageError('login failed: %s', e)
else:
logging.warn('login failed: %s', e)
else:
r = session.get(tracker + 'upload.php')
# <input type="hidden" name="auth" value="<32 char hex string>">
m = re.search(r'<input\s+type="hidden"\s+name="auth"\s+value="(\w+)"\s*/\s*>',
r.text)
if m:
auth_token = m.group(1)
logging.info('found authentication token')
elif torrent or mktorrent:
raise click.UsageError('could not parse upload form, are you logged in?')
else:
logging.warn('no form token found, upload impossible')
if auth_token:
if announce or source:
torrent_data = rewrite_torrent_data(torrent,
announce=announce,
source=source)
if mktorrent:
open(torrent, 'wb').write(torrent_data)
elif torrent:
torrent_data = open(torrent, 'rb').read()
set_useragent(__prog__, __version__)
files = []
for dirpath, _, filenames in os.walk(directory):
for f in filenames:
files.append(os.path.join(dirpath, f))
if not files:
logging.warn('no files provided, aborting')
return
meta = identify_format(files)
if meta:
# XXX: just take last file
fmt, bitrate = set(meta).pop()
output = process_album(files, tracker, session)
output['format'] = fmt
output['bitrate'] = bitrate
if not torrent:
logging.warn('no torrent provided, nothing to upload')
return
elif not auth_token:
logging.warn('invalid auth token, aborting')
return
confirmed = False
data = output
click.echo('found release: %s' % output.get('release-url'))
while not confirmed:
data = confirm_data(data)
click.echo(dump(data))
confirmed = click.confirm('metadata ok?', default=True)
if not click.confirm('upload torrent %s' % (torrent), default=True):
return
uploaded = False
while not uploaded:
data['auth'] = auth_token
data['submit'] = 'Upload torrent'
# hardcode torrent name because requests crashes on unicode filenames
files = {'file_input': ('torrent.torrent', torrent_data)}
logging.debug('data: %s', data)
logging.debug('files: %s', files)
click.echo('uploading...')
r = session.post(tracker + 'upload.php', data=data,
files=files, allow_redirects=False)
logging.debug('headers: %s', r.headers)
logging.debug('body: %s', r.text)
r.raise_for_status()
if r.status_code != requests.codes.found:
m = re.search(r'<h1>Warning</h1>.*?<strong>(Your torrent has been uploaded;.*?)</strong>',
r.text, re.DOTALL)
if m:
uploaded = True
click.echo('uploaded, but warning: %s' % m.group(1))
else:
m = re.search(r'<p style="color: red; text-align: center;">(.*?)</p>',
r.text, re.DOTALL)
if m:
logging.warning('upload failed: %s' % m.group(1))
else:
logging.warning('upload failed!')
del data['auth']
del data['submit']
data = confirm_data(data)
else:
uploaded = True
click.echo('uploaded: %s%s' % (tracker,
r.headers['Location']))
if __name__ == '__main__':
try:
identify()
except acoustid.NoBackendError:
logging.error("chromaprint library/tool not found")
except requests.HTTPError as e:
logging.error('error talking with tracker: %s', e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment