Skip to content

Instantly share code, notes, and snippets.

@Hellowlol
Last active October 25, 2019 04:43
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Hellowlol/5f8545e999259b4371c91ac223409209 to your computer and use it in GitHub Desktop.
Save Hellowlol/5f8545e999259b4371c91ac223409209 to your computer and use it in GitHub Desktop.
tqdm thread subprocess ffmpeg
import random
import re
import subprocess
import time
def example_func(*args):
for i in range(1, 101):
s = random.choice([0.1, 0.3, 0.5, 0.7, 0.9])
time.sleep(s)
yield i
def to_ms(s=None, des=None, **kwargs):
if s:
hour = int(s[0:2])
minute = int(s[3:5])
sec = int(s[6:8])
ms = int(s[10:11])
else:
hour = int(kwargs.get('hour', 0))
minute = int(kwargs.get('min', 0))
sec = int(kwargs.get('sec', 0))
ms = int(kwargs.get('ms'))
result = (hour * 60 * 60 * 1000) + (minute * 60 * 1000) + (sec * 1000) + ms
if des and isinstance(des, int):
return round(result, des)
return result
def stream_ffmpeg(filename, url, *args):
durr = None
dur_regex = re.compile(r'Duration: (?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})')
time_regex = re.compile(r'\stime=(?P<hour>\d{2}):(?P<min>\d{2}):(?P<sec>\d{2})\.(?P<ms>\d{2})')
cmd = 'ffmpeg -i %s -n -vcodec copy -acodec ac3 "%s.mkv"' % (url, filename)
process = subprocess.Popen(cmd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
for line in iter(process.stderr):
if durr is None and dur_regex.search(line):
dur = dur_regex.search(line).groupdict()
dur = to_ms(**dur)
result = time_regex.search(line)
if result and result.group('hour'):
elapsed_time = to_ms(**result.groupdict())
yield elapsed_time / dur * 100
# Since we might miss the last tick
yield 100
def multi_progress_thread(tasks, func=None, workers=None):
""" tqdm nested helper if we dont know the range of the iterable and using treads.
The idea is pretty simple. Each task reports the progress to the queue
We read the queue and increment the progress bars.
tasks (list): of tuples, used as args for func
func: Teh function to execute in the threads, func must yield a progress in int
"""
from functools import partial
try:
from queue import Queue as queue
except ImportError:
from Queue import Queue as queue
import workerpool
import tqdm
class JOBZ(workerpool.Job):
def __init__(self, q, task_number, func=None):
"""Args:
q: queue
n: tasknumber
fun
"""
self.q = q
self.n = task_number
self.func = func
def run(self):
for i in self.func():
self.q.put((self.n, i))
# Set check done.
self.q.put((self.n, 'done'))
assert callable(func)
q = queue()
len_tasks = len(tasks)
w = len_taks is workers is None else workers
pool = workerpool.WorkerPool(size=w)
bars = []
main_bar = tqdm.tqdm(total=len_tasks, position=0)
for i, task in enumerate(tasks):
# Increment the position since we want the
# main_bar to be on top.
pos = i + 1
wrap_func = partial(func, *task)
j = JOBZ(q, i, wrap_func)
pool.put(j)
f = partial(tqdm.tqdm, total=100, position=pos, desc='T %s' % i)
bars.append(f())
try:
exit = 0
progress = {}
while True:
# Check if we should exit the thread and update the bars
if exit == len_tasks:
# Update sub-bars because of the exit
for bb in bars:
bb.n = 0
bb.update(100)
# Update main bar.
main_bar.n = 0
main_bar.update(len_tasks)
break
item = q.get()
if item is not None:
t, i = item
if i == 'done':
exit += 1
continue
b = bars[t]
b.n = 0
b.update(i)
progress[str(t)] = i
main_bar_progress = sum(progress.values()) / 100
main_bar.n = 0
main_bar.update(main_bar_progress)
except KeyboardInterrupt:
pass
pool.shutdown()
pool.wait()
multi_progress_thread([('x', i) for i in range(10)], func=example_func)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment