Skip to content

Instantly share code, notes, and snippets.

@Fr6jDJF
Created December 14, 2019 11:50
Show Gist options
  • Save Fr6jDJF/be94bd166b01a49ba82fec2a1c9c837f to your computer and use it in GitHub Desktop.
Save Fr6jDJF/be94bd166b01a49ba82fec2a1c9c837f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import threading
import time
import sys
import signal
import configparser
import audioop
import subprocess as sp
import argparse
import os.path
import pymumble.pymumble_py3 as pymumble
import interface
import variables as var
import hashlib
import youtube_dl
import logging
import util
import base64
from PIL import Image
from io import BytesIO
from mutagen.easyid3 import EasyID3
import re
import media.url
import media.file
import media.playlist
import media.radio
import media.system
class MumbleBot:
def __init__(self, args):
signal.signal(signal.SIGINT, self.ctrl_caught)
self.volume = var.config.getfloat('bot', 'volume')
if db.has_option('bot', 'volume'):
self.volume = var.db.getfloat('bot', 'volume')
self.stream_rtmp = var.config.getboolean('bot', 'stream_rtmp')
self.channel = args.channel
FORMAT = '%(asctime)s: %(message)s'
if args.verbose:
logging.basicConfig(format=FORMAT, level=logging.DEBUG, datefmt='%Y-%m-%d %H:%M:%S')
logging.debug("Starting in DEBUG loglevel")
elif args.quiet:
logging.basicConfig(format=FORMAT, level=logging.ERROR, datefmt='%Y-%m-%d %H:%M:%S')
logging.error("Starting in ERROR loglevel")
else:
logging.basicConfig(format=FORMAT, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
logging.info("Starting in INFO loglevel")
var.playlist = []
var.user = args.user
var.music_folder = var.config.get('bot', 'music_folder')
var.is_proxified = var.config.getboolean("webinterface", "is_web_proxified")
self.exit = False
self.nb_exit = 0
self.media_thread = None
self.is_playing = False
if var.config.getboolean("webinterface", "enabled"):
wi_addr = var.config.get("webinterface", "listening_addr")
wi_port = var.config.getint("webinterface", "listening_port")
interface.init_proxy()
tt = threading.Thread(target=start_web_interface, args=(wi_addr, wi_port))
tt.daemon = True
tt.start()
if args.host:
host = args.host
else:
host = var.config.get("server", "host")
if args.port:
port = args.port
else:
port = var.config.getint("server", "port")
if args.password:
password = args.password
else:
password = var.config.get("server", "password")
if args.certificate:
certificate = args.certificate
else:
certificate = var.config.get("server", "certificate")
if args.tokens:
tokens = args.tokens
else:
tokens = var.config.get("server", "tokens")
tokens = tokens.split(',')
if args.user:
self.username = args.user
else:
self.username = var.config.get("bot", "username")
self.mumble = pymumble.Mumble(host, user=self.username, port=port, password=password, tokens=tokens,
debug=var.config.getboolean('debug', 'mumbleConnection'), certfile=args.certificate)
self.mumble.callbacks.set_callback("text_received", self.message_received)
self.mumble.set_codec_profile("audio")
self.mumble.start() # start the mumble thread
self.mumble.is_ready() # wait for the connection
self.set_comment()
self.mumble.users.myself.unmute() # by sure the user is not muted
if self.channel:
self.mumble.channels.find_by_name(self.channel).move_in()
self.mumble.set_bandwidth(200000)
self.loop()
def ctrl_caught(self, signal, frame):
logging.info("\nSIGINT caught, quitting, {} more to kill".format(2 - self.nb_exit))
self.exit = True
self.stop()
if self.nb_exit > 1:
logging.info("Forced Quit")
sys.exit(0)
self.nb_exit += 1
def message_received(self, text):
message = text.message.strip()
user = self.mumble.users[text.actor]['name']
if var.config.getboolean('command', 'split_username_at_space'):
user = user.split()[0]
if message[0] == var.config.get('command', 'command_symbol'):
message = message[1:].split(' ', 1)
if len(message) > 0:
command = message[0]
parameter = ''
if len(message) > 1:
parameter = message[1]
else:
return
logging.info(command + ' - ' + parameter + ' by ' + user)
if command == var.config.get('command', 'joinme'):
self.mumble.users.myself.move_in(self.mumble.users[text.actor]['channel_id'], token=parameter)
return
if not self.is_admin(user) and not var.config.getboolean('bot', 'allow_other_channel_message') and self.mumble.users[text.actor]['channel_id'] != self.mumble.users.myself['channel_id']:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'not_in_my_channel'))
return
if not self.is_admin(user) and not var.config.getboolean('bot', 'allow_private_message') and text.session:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'pm_not_allowed'))
return
for i in var.db.items("user_ban"):
if user.lower() == i[0]:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'user_ban'))
return
if command == var.config.get('command', 'user_ban'):
if self.is_admin(user):
if parameter:
self.mumble.users[text.actor].send_message(util.user_ban(parameter))
else:
self.mumble.users[text.actor].send_message(util.get_user_ban())
else:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'not_admin'))
return
elif command == var.config.get('command', 'user_unban'):
if self.is_admin(user):
if parameter:
self.mumble.users[text.actor].send_message(util.user_unban(parameter))
else:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'not_admin'))
return
elif command == var.config.get('command', 'url_ban'):
if self.is_admin(user):
if parameter:
self.mumble.users[text.actor].send_message(util.url_ban(self.get_url_from_input(parameter)))
else:
self.mumble.users[text.actor].send_message(util.get_url_ban())
else:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'not_admin'))
return
elif command == var.config.get('command', 'url_unban'):
if self.is_admin(user):
if parameter:
self.mumble.users[text.actor].send_message(util.url_unban(self.get_url_from_input(parameter)))
else:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'not_admin'))
return
if parameter:
for i in var.db.items("url_ban"):
if self.get_url_from_input(parameter.lower()) == i[0]:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'url_ban'))
return
if command == var.config.get('command', 'play_file') and parameter:
music_folder = var.config.get('bot', 'music_folder')
# sanitize "../" and so on
path = os.path.abspath(os.path.join(music_folder, parameter))
if path.startswith(music_folder):
if os.path.isfile(path):
filename = path.replace(music_folder, '')
music = {'type': 'file',
'path': filename,
'user': user}
var.playlist.append(music)
else:
# try to do a partial match
matches = [file for file in util.get_recursive_filelist_sorted(music_folder) if parameter.lower() in file.lower()]
if len(matches) == 0:
self.send_msg(var.config.get('strings', 'no_file'), text)
elif len(matches) == 1:
music = {'type': 'file',
'path': matches[0],
'user': user}
var.playlist.append(music)
else:
msg = var.config.get('strings', 'multiple_matches') + '<br />'
msg += '<br />'.join(matches)
self.send_msg(msg, text)
else:
self.send_msg(var.config.get('strings', 'bad_file'), text)
self.async_download_next()
elif command == var.config.get('command', 'play_url') and parameter:
music = {'type': 'url',
'url': self.get_url_from_input(parameter),
'user': user,
'ready': 'validation'}
var.playlist.append(music)
if media.url.get_url_info():
if var.playlist[-1]['duration'] > var.config.getint('bot', 'max_track_duration'):
var.playlist.pop()
self.send_msg(var.config.get('strings', 'too_long'), text)
else:
for i in var.db.options("url_ban"):
print(i, ' -> ', {var.playlist[-1]["url"]})
if var.playlist[-1]['url'] == i:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'url_ban'))
var.playlist.pop()
return
var.playlist[-1]['ready'] = "no"
self.async_download_next()
else:
var.playlist.pop()
self.send_msg(var.config.get('strings', 'unable_download'), text)
elif command == var.config.get('command', 'play_playlist') and parameter:
offset = 1
try:
offset = int(parameter.split(" ")[-1])
except ValueError:
pass
if media.playlist.get_playlist_info(url=self.get_url_from_input(parameter), start_index=offset, user=user):
self.async_download_next()
elif command == var.config.get('command', 'play_radio') and parameter:
if var.config.has_option('radio', parameter):
parameter = var.config.get('radio', parameter)
music = {'type': 'radio',
'url': self.get_url_from_input(parameter),
'user': user}
var.playlist.append(music)
self.async_download_next()
elif command == var.config.get('command', 'help'):
self.send_msg(var.config.get('strings', 'help'), text)
elif command == var.config.get('command', 'stop'):
self.stop()
elif command == var.config.get('command', 'kill'):
if self.is_admin(user):
self.stop()
self.exit = True
else:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'not_admin'))
elif command == var.config.get('command', 'update'):
if self.is_admin(user):
self.mumble.users[text.actor].send_message("Starting the update")
tp = sp.check_output([var.config.get('bot', 'pip3_path'), 'install', '--upgrade', 'youtube-dl']).decode()
msg = ""
if "Requirement already up-to-date" in tp:
msg += "Youtube-dl is up-to-date"
else:
msg += "Update done : " + tp.split('Successfully installed')[1]
if 'up-to-date' not in sp.check_output(['/usr/bin/env', 'git', 'pull']).decode():
msg += "<br /> I'm up-to-date"
else:
msg += "<br /> I have available updates, need to do it manually"
self.mumble.users[text.actor].send_message(msg)
else:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'not_admin'))
elif command == var.config.get('command', 'stop_and_getout'):
self.stop()
if self.channel:
self.mumble.channels.find_by_name(self.channel).move_in()
elif command == var.config.get('command', 'volume'):
if parameter is not None and parameter.isdigit() and 0 <= int(parameter) <= 100:
self.volume = float(float(parameter) / 100)
self.send_msg(var.config.get('strings', 'change_volume') % (
int(self.volume * 100), self.mumble.users[text.actor]['name']), text)
var.db.set('bot', 'volume', str(self.volume))
else:
self.send_msg(var.config.get('strings', 'current_volume') % int(self.volume * 100), text)
elif command == var.config.get('command', 'current_music'):
if len(var.playlist) > 0:
source = var.playlist[0]["type"]
if source == "radio":
reply = "[radio] {title} on {url} by {user}".format(
title=media.radio.get_radio_title(var.playlist[0]["url"]),
url=var.playlist[0]["title"],
user=var.playlist[0]["user"]
)
elif source == "url" and 'from_playlist' in var.playlist[0]:
reply = "[playlist] {title} (from the playlist <a href=\"{url}\">{playlist}</a> by {user}".format(
title=var.playlist[0]["title"],
url=var.playlist[0]["playlist_url"],
playlist=var.playlist[0]["playlist_title"],
user=var.playlist[0]["user"]
)
elif source == "url":
reply = "[url] {title} (<a href=\"{url}\">{url}</a>) by {user}".format(
title=var.playlist[0]["title"],
url=var.playlist[0]["url"],
user=var.playlist[0]["user"]
)
elif source == "file":
reply = "[file] {title} by {user}".format(
title=var.playlist[0]["title"],
user=var.playlist[0]["user"])
else:
reply = "ERROR"
logging.error(var.playlist)
else:
reply = var.config.get('strings', 'not_playing')
self.send_msg(reply, text)
elif command == var.config.get('command', 'ffwd'):
self.send_msg(var.config.get('strings', 'fast_forward'), text)
elif command == var.config.get('command', 'skip'):
if parameter is not None and parameter.isdigit() and int(parameter) > 0:
if int(parameter) < len(var.playlist):
removed = var.playlist.pop(int(parameter))
self.send_msg(var.config.get('strings', 'removing_item') % (removed['title'] if 'title' in removed else removed['url']), text)
else:
self.send_msg(var.config.get('strings', 'no_possible'), text)
elif self.next():
self.launch_music()
self.async_download_next()
else:
self.send_msg(var.config.get('strings', 'queue_empty'), text)
self.stop()
elif command == var.config.get('command', 'list'):
folder_path = var.config.get('bot', 'music_folder')
files = util.get_recursive_filelist_sorted(folder_path)
if files:
self.send_msg('<br>'.join(files), text)
else:
self.send_msg(var.config.get('strings', 'no_file'), text)
elif command == var.config.get('command', 'queue'):
if len(var.playlist) <= 1:
msg = var.config.get('strings', 'queue_empty')
else:
msg = var.config.get('strings', 'queue_contents') + '<br />'
i = 1
for value in var.playlist[1:]:
msg += '[{}] ({}) {}<br />'.format(i, value['type'], value['title'] if 'title' in value else value['url'])
i += 1
self.send_msg(msg, text)
elif command == var.config.get('command', 'repeat'):
var.playlist.append(var.playlist[0])
else:
self.mumble.users[text.actor].send_message(var.config.get('strings', 'bad_command'))
@staticmethod
def is_admin(user):
list_admin = var.config.get('bot', 'admin').split(';')
if user in list_admin:
return True
else:
return False
@staticmethod
def next():
logging.debug("Next into the queue")
if len(var.playlist) > 1:
var.playlist.pop(0)
return True
elif len(var.playlist) == 1:
var.playlist.pop(0)
return False
else:
return False
def launch_music(self):
uri = ""
logging.debug("launch_music asked" + str(var.playlist[0]))
if var.playlist[0]["type"] == "url":
media.system.clear_tmp_folder(var.config.get('bot', 'tmp_folder'), var.config.getint('bot', 'tmp_folder_max_size'))
if var.playlist[0]["ready"] == "downloading":
return
elif var.playlist[0]["ready"] != "yes":
logging.info("Current music wasn't ready, Downloading...")
self.download_music(index=0)
uri = var.playlist[0]['path']
if os.path.isfile(uri):
audio = EasyID3(uri)
title = ""
if audio["title"]:
title = audio["title"][0]
path_thumbnail = var.playlist[0]['path'][:-4] + '.jpg' # Remove .mp3 and add .jpg
thumbnail_html = ""
if os.path.isfile(path_thumbnail):
im = Image.open(path_thumbnail)
im.thumbnail((100, 100), Image.ANTIALIAS)
buffer = BytesIO()
im.save(buffer, format="JPEG")
thumbnail_base64 = base64.b64encode(buffer.getvalue())
thumbnail_html = '<img - src="data:image/PNG;base64,' + thumbnail_base64.decode() + '"/>'
logging.debug("Thunbail data " + thumbnail_html)
if var.config.getboolean('bot', 'announce_current_music'):
if not self.stream_rtmp:
self.send_msg(var.config.get('strings', 'now_playing') % (title, thumbnail_html))
else:
rtmp_stream = '(@ <a href="{0}{1}">{0}{1}</a> )'.format(var.config.get('bot', 'rtmp_url'), var.config.get('bot', 'rtmp_key'))# rtmp not recognised as url
logging.info("FFmpeg rtmp_stream : " + rtmp_stream)
self.send_msg(rtmp_stream+var.config.get('strings', 'now_playing') % (title, thumbnail_html))
else:
logging.error("Error with the path during launch_music")
pass
elif var.playlist[0]["type"] == "file":
uri = var.config.get('bot', 'music_folder') + var.playlist[0]["path"]
elif var.playlist[0]["type"] == "radio":
uri = var.playlist[0]["url"]
title = media.radio.get_radio_server_description(uri)
var.playlist[0]["title"] = title
if var.config.getboolean('debug', 'ffmpeg'):
ffmpeg_debug = "debug"
else:
ffmpeg_debug = "warning"
if self.stream_rtmp:
try:
var.config.rtmp_url = var.config.get('bot', 'rtmp_url')
var.config.rtmp_key = var.config.get('bot', 'rtmp_key')
var.config.rtmp_fps = var.config.get('bot', 'rtmp_fps')
var.config.rtmp_vbr = var.config.get('bot', 'rtmp_vbr')
var.config.rtmp_logo = var.config.get('bot', 'rtmp_logo')
except NameError:
logging.error("Error; missing rtmp parameters")
else:
video=False
if video:
pass
else:
# stream rtmp
LOGO="logo.png"
SOURCE=uri#"content/Digital_Grapes.mp4" # Source file/UDP stream -qscale is ambigious
FILT_COMPL="[0:v][1:v] overlay=25:25:enable='between(t,0,20)'"
command = ["ffmpeg",
"-re", "-i", uri,
"-acodec", "libmp3lame", "-ar", "48000", "-threads", "6", "-q:a", "3", "-b:a", "712000", "-bufsize", "512k",
"-f", "flv",(var.config.rtmp_url+var.config.rtmp_key)
]
logging.info("FFmpeg command(smtp-audio) : " + " ".join(command))
else:
logging.info("FFmpeg command : " + " ".join(command))
command = ["ffmpeg", '-v', ffmpeg_debug, '-nostdin', '-i', uri, '-ac', '1', '-f', 's16le', '-ar', '48000', '-']
self.media_thread = sp.Popen(command, stdout=sp.PIPE, bufsize=480)
self.is_playing = True
def download_music(self, index):
if var.playlist[index]['type'] == 'url' and var.playlist[index]['ready'] == "validation":
if media.url.get_url_info(index=index):
if var.playlist[index]['duration'] > var.config.getint('bot', 'max_track_duration'):
var.playlist.pop()
logging.info("the music " + var.playlist[index]["url"] + " has a duration of " + var.playlist[index]['duration'] + "s -- too long")
self.send_msg(var.config.get('strings', 'too_long'))
return
else:
var.playlist[index]['ready'] = "no"
else:
var.playlist.pop(index)
logging.error("Error while fetching info from the URL")
self.send_msg(var.config.get('strings', 'unable_download'))
if var.playlist[index]['type'] == 'url' and var.playlist[index]['ready'] == "no":
var.playlist[index]['ready'] = "downloading"
logging.debug("Download index:" + str(index))
logging.debug(var.playlist[index])
url = var.playlist[index]['url']
url_hash = hashlib.md5(url.encode()).hexdigest()
path = var.config.get('bot', 'tmp_folder') + url_hash + ".%(ext)s"
mp3 = path.replace(".%(ext)s", ".mp3")
var.playlist[index]['path'] = mp3
# if os.path.isfile(mp3):
# audio = EasyID3(mp3)
# var.playlist[index]['title'] = audio["title"][0]
ydl_opts = ""
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': path,
'noplaylist': True,
'writethumbnail': True,
'updatetime': False,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'},
{'key': 'FFmpegMetadata'}]
}
self.send_msg(var.config.get('strings', "download_in_progress") % var.playlist[index]['title'])
logging.info("Information before start downloading :" + str(var.playlist[index]))
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
for i in range(2):
try:
ydl.extract_info(url)
if 'ready' in var.playlist[index] and var.playlist[index]['ready'] == "downloading":
var.playlist[index]['ready'] = "yes"
except youtube_dl.utils.DownloadError:
pass
else:
break
return
def async_download_next(self):
logging.info("Async download next asked")
if len(var.playlist) > 1 and var.playlist[1]['type'] == 'url' and var.playlist[1]['ready'] in ["no", "validation"]:
th = threading.Thread(target=self.download_music, kwargs={'index': 1})
else:
return
logging.info("Start downloading next in thread")
th.daemon = True
th.start()
@staticmethod
def get_url_from_input(string):
if string.startswith('http'):
return string
p = re.compile('href="(.+?)"', re.IGNORECASE)
res = re.search(p, string)
if res:
return res.group(1)
else:
return False
def loop(self):
raw_music = ""
while not self.exit and self.mumble.isAlive():
if not self.stream_rtmp:
while self.mumble.sound_output.get_buffer_size() > 0.5 and not self.exit:
time.sleep(0.01)
if self.media_thread:
raw_music = self.media_thread.stdout.read(480) #! buffer 480 here?
if raw_music:
self.mumble.sound_output.add_sound(audioop.mul(raw_music, 2, self.volume))
else:
time.sleep(0.1)
else:
time.sleep(0.1)
if self.media_thread is None or not raw_music:
if self.is_playing:
self.is_playing = False
self.next()
if len(var.playlist) > 0:
if var.playlist[0]['type'] in ['radio', 'file'] \
or (var.playlist[0]['type'] == 'url' and var.playlist[0]['ready'] not in ['validation', 'downloading']):
self.launch_music()
self.async_download_next()
while self.mumble.sound_output.get_buffer_size() > 0:
time.sleep(0.01)
time.sleep(0.5)
if self.exit:
util.write_db()
def stop(self):
if self.media_thread:
self.media_thread.kill()
self.media_thread = None
var.playlist = []
self.is_playing = False
def set_comment(self):
self.mumble.users.myself.comment(var.config.get('bot', 'comment'))
def send_msg(self, msg, text=None):
if not text or not text.session:
own_channel = self.mumble.channels[self.mumble.users.myself['channel_id']]
own_channel.send_text_message(msg)
else:
self.mumble.users[text.actor].send_message(msg)
def start_web_interface(addr, port):
logging.info('Starting web interface on {}:{}'.format(addr, port))
interface.web.run(port=port, host=addr)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Bot for playing music on Mumble')
# General arguments
parser.add_argument("--config", dest='config', type=str, default='configuration.ini', help='Load configuration from this file. Default: configuration.ini')
parser.add_argument("--db", dest='db', type=str, default='db.ini', help='database file. Default db.ini')
parser.add_argument("-q", "--quiet", dest="quiet", action="store_true", help="Only Error logs")
parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="Show debug log")
# Mumble arguments
parser.add_argument("-s", "--server", dest="host", type=str, help="Hostname of the Mumble server")
parser.add_argument("-u", "--user", dest="user", type=str, help="Username for the bot")
parser.add_argument("-P", "--password", dest="password", type=str, help="Server password, if required")
parser.add_argument("-T", "--tokens", dest="tokens", type=str, help="Server tokens, if required")
parser.add_argument("-p", "--port", dest="port", type=int, help="Port for the Mumble server")
parser.add_argument("-c", "--channel", dest="channel", type=str, help="Default channel for the bot")
parser.add_argument("-C", "--cert", dest="certificate", type=str, default=None, help="Certificate file")
args = parser.parse_args()
var.dbfile = args.db
config = configparser.ConfigParser(interpolation=None, allow_no_value=True)
parsed_configs = config.read(['configuration.default.ini', args.config], encoding='latin-1')
db = configparser.ConfigParser(interpolation=None, allow_no_value=True, delimiters='²')
db.read(var.dbfile, encoding='latin-1')
if len(parsed_configs) == 0:
logging.error('Could not read configuration from file \"{}\"'.format(args.config), file=sys.stderr)
sys.exit()
var.config = config
var.db = db
botamusique = MumbleBot(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment