Skip to content

Instantly share code, notes, and snippets.

@atvKumar
Last active August 29, 2015 14:12
Show Gist options
  • Save atvKumar/213d180ec96cd94d7e2e to your computer and use it in GitHub Desktop.
Save atvKumar/213d180ec96cd94d7e2e to your computer and use it in GitHub Desktop.
File Utilities
from __future__ import division
from platform import system
from os import stat
from os.path import basename, splitext, dirname, split as splitpath, \
join as joinpath
from datetime import datetime
from glob import glob
import hashlib
import subprocess
blocksize_bg = 64000000 # 64MB
blocksize_sm = 16384 # 16KB
def filesize(filename, mode=1, precision=1):
"""
Calculate file size
:param filename: File Path
:param mode: 1 = raw bytes, 2 = rounded raw bytes, 3 = size on disk
:param precision: float precision in Mode 2
:return: 1 = int, 2 = float, 3 = string
"""
file_bytes = stat(filename).st_size
if mode == 1:
return file_bytes
if system() == 'Darwin':
file_blocks = stat(filename).st_blocks
if file_bytes < 1024**2: # KB
if mode == 2:
return round(file_bytes / 1000, precision)
elif mode == 3:
return str(round(file_blocks * 512e-3, 1)) + ' KB'
elif file_bytes < 1024**3: # MB
if mode == 2:
return round(file_bytes / 1000**2, precision)
elif mode == 3:
return str(round(file_blocks * 512e-6, 1)) + ' MB'
elif file_bytes < 1024**4: # GB
if mode == 2:
return round(file_bytes / 1000**3, precision)
elif mode == 3:
return str(round(file_blocks * 512e-6 / 1000, 2)) + ' GB'
elif file_bytes < 1024**5: # TB
if mode == 2:
return round(file_bytes / 1000**4, precision)
elif mode == 3:
return str(round(file_blocks * 512e-6 / 1000**2, 2)) + ' TB'
elif system() == 'Windows':
# file_blocks = 0
if file_bytes < 1024**2: # KB
if mode == 2:
return round(file_bytes / 1000, precision)
elif mode == 3:
return str(round(file_bytes / 1024, 1)) + ' KB'
elif file_bytes < 1024**3: # MB
if mode == 2:
return round(file_bytes / 1000**2, precision)
elif mode == 3:
return str(round(file_bytes / 1024**2, 1)) + ' MB'
elif file_bytes < 1024**4: # GB
if mode == 2:
return round(file_bytes / 1000**3, precision)
elif mode == 3:
return str(round(file_bytes / 1024**3, 2)) + ' GB'
elif file_bytes < 1024**5: # TB
if mode == 2:
return round(file_bytes / 1000**4, precision)
elif mode == 3:
return str(round(file_bytes / 1024**4, 2)) + ' TB'
def get_creation_time(filename):
p = subprocess.Popen(['stat', '-f%B', filename],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if p.wait():
raise OSError(p.stderr.read().rstrip())
else:
return int(p.stdout.read())
def filedate_created(filename, as_datetime=False, date_format=None):
if system() == 'Darwin':
modified_date = datetime.fromtimestamp(get_creation_time(filename))
else:
modified_date = datetime.fromtimestamp(stat(filename).st_ctime)
if as_datetime:
return modified_date
if date_format:
return modified_date.strftime(date_format)
if system() == 'Darwin':
return modified_date.strftime("%d/%m/%Y %H:%M:%S %p")
elif system() == 'Windows':
return modified_date.strftime("%d %B %Y, %H:%M:%S %p")
def filedate_modified(filename, as_datetime=False, date_format=None):
modified_date = datetime.fromtimestamp(stat(filename).st_mtime)
if as_datetime:
return modified_date
if date_format:
return modified_date.strftime(date_format)
if system() == 'Darwin':
return modified_date.strftime("%d/%m/%Y %H:%M:%S %p")
elif system() == 'Windows':
return modified_date.strftime("%d %B %Y, %H:%M:%S %p")
def chunk(filename, size=4096):
with open(filename, 'rb') as fp:
for piece in iter(lambda: fp.read(size), ''):
yield piece
def buffered_chunk(filename, blocksize=blocksize_sm):
file_size = filesize(filename)
num_of_chunks = calculate_chunks(filename, blocksize)
chunk_size = blocksize
total_bytes = 0
with open(filename, 'rb') as fp:
for x in xrange(1, num_of_chunks+1):
if x == num_of_chunks:
chunk_size = file_size - total_bytes
data = bytearray(chunk_size)
fp.readinto(data)
total_bytes += len(data)
yield data
del data
def calculate_md5(filename):
md5 = hashlib.md5()
for piece in chunk(filename):
md5.update(piece)
return md5.hexdigest()
def calculate_chunks(filename, chunk_size=blocksize_sm):
file_size = filesize(filename)
num_of_chunks = int(file_size / chunk_size)
if file_size % chunk_size:
num_of_chunks += 1
return num_of_chunks
def split(filename, output_directory=None, blocksize=blocksize_sm, digits=5):
if not output_directory:
output_directory = splitpath(filename)[0]
for i, block in enumerate(chunk(filename, blocksize), 1):
# print i, hashlib.md5(block).hexdigest(), type(block)
with open(joinpath(output_directory,
basename(filename)+'.') + str(i).zfill(digits),
'wb') as fp:
fp.write(block)
fp.flush()
def join(filename, digits=5):
filename, source_directory = basename(filename), dirname(filename)
org_filename = filename[0:-(digits+1)]
num_of_chunks = glob(joinpath(source_directory, org_filename) + '.*')
with open(joinpath(source_directory, org_filename), 'wb') as out_fp:
for i in sorted(num_of_chunks):
with open(i, 'rb') as fp:
# fp.readinto(data)
data = fp.read()
out_fp.write(data)
out_fp.flush()
def generate_index(source_fname, blocksize=blocksize_sm):
location, fname = splitpath(source_fname)
index_filename = splitext(fname)[0] + '.index'
fsize = filesize(source_fname)
fdate_created = filedate_created(source_fname)
fdate_modified = filedate_modified(source_fname)
md5_signature = hashlib.md5()
index_table = list()
for i, block in enumerate(chunk(source_fname, blocksize), 1):
md5_signature.update(block)
row = [i, len(block), hashlib.md5(block).hexdigest()]
index_table.append(row)
file_signature = md5_signature.hexdigest()
# print index_filename
# print location
# print fname
# print ','.join([str(fsize), fdate_created, fdate_modified])
# print file_signature
# print '\n'.join([','.join(str(y) for y in x)
# for x in [i for i in index_table]])
with open(joinpath(location, index_filename), 'w') as fp:
fp.write(location + '\n')
fp.write(fname + '\n')
fp.write(','.join([str(fsize), fdate_created, fdate_modified]) + '\n')
fp.write(file_signature + '\n')
fp.write('\n'.join([','.join(str(y) for y in x)
for x in [i for i in index_table]]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment