Skip to content

Instantly share code, notes, and snippets.

@dansondergaard
Created January 11, 2013 08:55
Show Gist options
  • Save dansondergaard/4509080 to your computer and use it in GitHub Desktop.
Save dansondergaard/4509080 to your computer and use it in GitHub Desktop.
Easily modify your scripts to run any command in parallel.

= q

Many of the command-line tools we use and write daily have no clue about multiple cores or threads. Wrote a script to convert a directory of music files to some other format? You probably did it serially! Did you write a script for downloading a bunch of big files? You probably did that serially too!

These problems are inherently parallisable. Don't waste idle CPU cycles on your machine.

q is here to help. It allows you to conveniently build a queue of commands that you wish to run. When you're done adding commands to the queue, you execute the queue. q will automatically figure out how many cores you've got and run your commands in parallel. Here's a generic example:

#!/usr/bin/env bash
for FILE in /my/big/dir/; do
  q push myscript-queue "... command to run on $FILE ..."
done
q execute ${QUEUE}
echo "done"

Easy!

#!/usr/bin/env python3
#
# Copyright (C) 2013 Dan Søndergaard
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import argparse
import os
import os.path
import subprocess
import shlex
import multiprocessing
class QueueCommand(object):
CONFIG_DIRECTORY = os.path.expanduser('~/.q/')
def __init__(self, args):
self.queue = {
'name': args.queue,
'path': os.path.join(QueueCommand.CONFIG_DIRECTORY, args.queue)
}
class PushCommand(QueueCommand):
def __init__(self, args):
super(PushCommand, self).__init__(args)
with open(self.queue['path'], 'a+') as fd:
fd.write(args.command + os.linesep)
class ExecuteCommand(QueueCommand):
def __init__(self, args):
super(ExecuteCommand, self).__init__(args)
with open(self.queue['path'], 'r') as fd:
with multiprocessing.Pool(processes=args.processes) as pool:
pool.map(self.execute, fd)
pool.close()
pool.join()
# Delete the queue when done processing all items
os.remove(self.queue['path'])
def execute(self, command):
tokens = shlex.split(command)
try:
subprocess.check_call(tokens)
except subprocess.CalledProcessError as e:
os.sys.stderr.write('[error] process exited with {0} ({1})\n'.format(
e.returncode, command.strip()))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers()
push_parser = subparsers.add_parser('push')
push_parser.set_defaults(handler=PushCommand)
push_parser.add_argument('queue', default='default',
help='name that identifies the queue to push to')
push_parser.add_argument('command',
help='command to be pushed to the queue')
exec_parser = subparsers.add_parser('execute')
exec_parser.set_defaults(handler=ExecuteCommand)
exec_parser.add_argument('queue', default='default',
help='name that identifies the queue to execute')
exec_parser.add_argument('-p', '--processes', default=multiprocessing.cpu_count(),
help='the number of processes to run simultaneously, defaults to number of cpu cores')
args = parser.parse_args()
args.handler(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment