Skip to content

Instantly share code, notes, and snippets.

@mborgerson
Created February 4, 2020 01:42
Show Gist options
  • Save mborgerson/0d214d78005a1dbffe1a06ebeac42637 to your computer and use it in GitHub Desktop.
Save mborgerson/0d214d78005a1dbffe1a06ebeac42637 to your computer and use it in GitHub Desktop.
Concatenate a lot of files
#!/usr/bin/env
# Copyright (c) 2020 Matt Borgerson
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
# REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
# LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
# OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
# PERFORMANCE OF THIS SOFTWARE.
import argparse
import os
import os.path
import time
class SimpleProgress:
def __init__(self, total=1):
self._total = total
self._count = 0
self._last_reported_count = -1
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
pass
def update(self):
self._count += 1
if (self._count - self._last_reported_count) >= (self._total / 100.0):
print('%d%% (%d of %d)...' % (int(self._count/self._total*100), self._count, self._total))
self._last_reported_count = self._count
try:
from tqdm import tqdm
except ImportError:
tqdm = SimpleProgress
def main():
args = argparse.ArgumentParser(description='Concatenate a lot of files')
args.add_argument('outfile', help='path to output file')
args.add_argument('indir', help='path to input dir (or cwd if blank)', default='.', nargs='?')
args = args.parse_args()
# Count files
num_files = 0
for root, dirs, files in os.walk(args.indir):
num_files += len(files)
outfile = os.open(args.outfile, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
# Send 'em
with tqdm(total=num_files) as t:
for root, dirs, files in os.walk(args.indir):
for f in files:
path = os.path.join(root, f)
infile = os.open(path, os.O_RDONLY)
os.sendfile(outfile, infile, None, os.path.getsize(path))
os.close(infile)
t.update()
os.close(outfile)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment