Skip to content

Instantly share code, notes, and snippets.

@zvoase
Created October 27, 2008 22:22
Show Gist options
  • Save zvoase/20217 to your computer and use it in GitHub Desktop.
Save zvoase/20217 to your computer and use it in GitHub Desktop.
Do interesting things with files.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# fproc: Do interesting things with files.
# Copyright (c) 2008 Zachary Voase
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
from __future__ import with_statement
from contextlib import nested
import base64
import gzip
import os
import shutil
import subprocess
import sys
class Base64Middleware(object):
flags = ('b64', 'base64')
@staticmethod
def enter(old_fname):
new_fname = os.path.join(os.path.dirname(old_fname),
base64.b64encode(os.path.basename(old_fname)) + '.b64')
with nested(open(old_fname), open(new_fname, 'w')) as (
infile, outfile):
for chunk in chunks(infile, chunk_size=1023):
outfile.write(base64.b64encode(chunk))
os.remove(old_fname)
return new_fname
@staticmethod
def exit(new_fname):
basename = os.path.basename(new_fname)
basename = basename[:-4] if basename.endswith('.b64') else basename
old_fname = os.path.join(os.path.dirname(new_fname),
base64.b64decode(basename))
with nested(open(new_fname), open(old_fname, 'w')) as (
infile, outfile):
for chunk in chunks(infile, chunk_size=1364):
outfile.write(base64.b64decode(chunk))
os.remove(new_fname)
return old_fname
class Base64RenameMiddleware(object):
flags = ('b64rn', 'base64rename')
@staticmethod
def enter(old_fname):
new_fname = os.path.join(os.path.dirname(old_fname),
base64.b64encode(os.path.basename(old_fname)) + '.b64rn')
if os.path.isfile(new_fname):
os.remove(new_fname)
shutil.move(old_fname, new_fname)
return new_fname
@staticmethod
def exit(new_fname):
basename = os.path.basename(new_fname)
basename = basename[:-6] if basename.endswith('.b64rn') else basename
old_fname = os.path.join(os.path.dirname(new_fname),
base64.b64decode(basename))
if os.path.isfile(old_fname):
os.remove(old_fname)
shutil.move(new_fname, old_fname)
return old_fname
class GZipMiddleware(object):
flags = ('gz', 'gzip')
@staticmethod
def enter(old_fname):
new_fname = os.path.join(os.path.dirname(old_fname),
os.path.basename(old_fname) + '.gz')
with nested(open(old_fname), open(new_fname, 'wb')) as (
infile, outfile):
gzout = gzip.GzipFile(
filename=old_fname, mode='wb', fileobj=outfile)
for chunk in chunks(infile):
gzout.write(chunk)
gzout.close()
os.remove(old_fname)
return new_fname
@staticmethod
def exit(new_fname):
basename = os.path.basename(new_fname)
basename = basename[:-3] if basename.endswith('.gz') else basename
old_fname = os.path.join(os.path.dirname(new_fname), basename)
with nested(open(new_fname), open(old_fname, 'wb')) as (
infile, outfile):
gzin = gzip.GzipFile(mode='rb', fileobj=infile)
for chunk in chunks(gzin):
outfile.write(chunk)
gzin.close()
os.remove(new_fname)
return old_fname
class BZip2Middleware(object):
flags = ('bz2', 'bzip2')
@staticmethod
def enter(old_fname):
new_fname = os.path.join(os.path.dirname(old_fname),
os.path.basename(old_fname) + '.bz2')
with open(old_fname) as infile:
outfile = bz2.BZ2File(new_fname, mode='w')
try:
for chunk in chunks(infile):
outfile.write(chunks)
except:
raise
finally:
outfile.close()
os.remove(old_fname)
return new_fname
@staticmethod
def exit(new_fname):
basename = os.path.basename(new_fname)
basename = basename[:-4] if basename.endswith('.bz2') else basename
old_fname = os.path.join(os.path.dirname(new_fname), basename)
with open(new_fname, 'w') as outfile:
infile = bz2.BZ2File(old_fname, mode='r')
try:
for chunk in chunks(infile):
outfile.write(chunk)
except:
raise
finally:
infile.close()
os.remove(new_fname)
return old_fname
global FLAGS_TO_MIDDLEWARE
FLAGS_TO_MIDDLEWARE = {}
for obj in globals().values() + locals().values():
if hasattr(obj, 'flags'):
for flag in obj.flags:
FLAGS_TO_MIDDLEWARE[flag] = obj
def chunks(fp, chunk_size=1024):
chunk = fp.read(chunk_size)
while chunk:
yield chunk
chunk = fp.read(chunk_size)
def main(command, *args):
mwflags, files = args[:args.index('-f')], args[args.index('-f')+1:]
if command not in ('enter', 'exit'):
print 'Error: unknown command %r' % (command,)
return
if command == 'enter':
middleware = map(FLAGS_TO_MIDDLEWARE.get, mwflags)
for filename in files:
if not os.path.isfile(filename):
print 'File not found: %r' % (filename,)
continue
new_filename = filename[:]
for mw in middleware:
new_filename = mw.enter(new_filename)
print '\t'.join((filename, new_filename))
elif command == 'exit':
middleware = map(FLAGS_TO_MIDDLEWARE.get, mwflags)
for filename in files:
if not os.path.isfile(filename):
print 'File not found: %r' % (filename,)
continue
new_filename = filename[:]
for mw in reversed(middleware):
new_filename = mw.exit(new_filename)
print '\t'.join((filename, new_filename))
if __name__ == '__main__':
main(*sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment