Skip to content

Instantly share code, notes, and snippets.

@zelon88
Last active May 12, 2019 06:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zelon88/229c1453123ff270de447fcb68d30ff0 to your computer and use it in GitHub Desktop.
Save zelon88/229c1453123ff270de447fcb68d30ff0 to your computer and use it in GitHub Desktop.
xPress File Compressor & Extractor - v1.3.7
# --------------------------------------------------
# xPress.py
# v1.3.7 - 5/12/2019
# Justin Grimes (@zelon88)
# https://github.com/zelon88/xPress
# https://www.HonestRepair.net
# Made on Windows 7 with Python 2.7.12
# This program is a file compressor and extractor that uses
# a new compression algorithm designed and dictated by this
# document. It has an adjustable dictionary value length and
# can decompress files without special extractor preparations.
# Code blocks are prefaced with COMPRESS or EXTRACT to denote
# which function they support, for easier code surfing.
# --------------------------------------------------
# --------------------------------------------------
# VALID ARGUMENTS / PARAMETERS / SWITCHES
# If you combine multiple verbosity or log levels the last specified will be used.
# h - Display help text. 1st argument.
# c - Convert. 1st argument.
# e - Extract. 1st argument.
# <C:\Path\To\Input_File> - 2nd argument.
# <C:\Path\To\Output_File> - 3rd argument
# f - Forced overwrite. Optional. Overwrite any output files. 4th, 5th, or 6th argument.
# v0 - Verbosity 0. Optional. Disable output. 4th, 5th, or 6th argument.
# v1 - Verbosity 1. Optional. Only errors are output. 4th, 5th, or 6th argument.
# v2 - Verbosity 2. Optional. Everything is output. 4th, 5th, or 6th argument.
# l0 - Log level 0. Optional. Disable logging. 4th, 5th, or 6th argument.
# l1 - Log level 1. Optional. Only errors logged. 4th, 5th, or 6th argument.
# l2 - Log level 2. Optional. Everything is logged. 4th, 5th, or 6th argument.
# --------------------------------------------------
# --------------------------------------------------
# EXAMPLE COMMANDS
# xPress.py h
# xPress.py c c:\TestFolder\test.png c:\TestFolder\output.xpr
# xPress.py c c:\TestFolder\test.png c:\TestFolder\test.xpr v1 l1
# xPress.py e c:\TestFolder\test.xpr c:\TestFolder\test.png v0 l3
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS & EXTRACT
# Load required modules and set global variables.
import sys, getopt, datetime, os, psutil, math, pickle, re
forced = False
now = datetime.datetime.now()
time = now.strftime("%B %d, %Y, %H:%M")
logging = 1
verbosity = 2
error = feature = inputFile = inputPath = dictPath = entryPrefix = \
dictFile = tempPath = tempFile = outputPath = outputFile = ''
currentPath = os.path.dirname(__file__)
logFile = os.path.join(currentPath, 'xPress.log')
chunkSize = offset = chunkCount = errorCounter = 0
dictLength = 12
dictionaryPrefix = '@!@!DST@!@!'
dictionarySufix = '@!@!DND@!@!'
logPrefix = 'OP-Act: '
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS & EXTRACT
# A function to print output to the console in a consistent manner.
def printGracefully(logPrefix, message):
print (logPrefix+message+'.')
return 1
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS & EXTRACT
# A function to kill the program gracefully during unrecoverable error.
# The errorMessage will be displayed to the user, unless the s switch is set.
# Note this uses sys.exit(), which not only kills this script but the entire interpreter.
def dieGracefully(errorMessage, errorNumber, errorCounter):
print ('ERROR-'+str(errorCounter)+'!!! xPress'+str(errorNumber)+': '+str(errorMessage)+' on '+str(time)+'!')
sys.exit()
return 1
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS & EXTRACT
# A function to write an entry to the logFile.
# Do not punctuate your log entries with punctuation, or it will look strange.
# Set the errorNumber to 0 for regular prefix (default is "OP-Act").
# If the entry is an error message, set errorNumber to an int greater than 0.
def writeLog(logFile, logEntry, time, errorNumber, errorCounter):
if os.path.isfile(logFile): append = "ab"
else: append = "wb+"
if errorNumber > 0: entryPrefix = 'ERROR-'+str(errorCounter)+'!!! xPress'+str(errorNumber)+': '
else: entryPrefix = logPrefix
entrySufix = ' on '+str(time)+'.'
with open(logFile, append) as logData:
logData.write(entryPrefix+logEntry+entrySufix+"\n")
logData.close
return 1
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS & EXTRACT
# Process user supplied arguments/parameters/switches.
def parseArgs(logging, verbosity, argv, errorCounter):
forced = False
# Check if any arguments were passed.
try: opts, args = getopt.getopt(argv,"h")
except getopt.GetoptError:
# Print the help text if the "h" argument is passed
print ('xPress.py <c>or<e> <inputFile> <outputFile>')
sys.exit(2)
if sys.argv[1] == 'c': feature = 'compress'
if sys.argv[1] == 'e': feature = 'extract'
if len(sys.argv) > 4:
# Set the logging level from argument 4.
if sys.argv[4] == 'l0': logging = 0
if sys.argv[4] == 'l1': logging = 1
if sys.argv[4] == 'l2': logging = 2
# Set the verbosity level from argument 4.
if sys.argv[4] == 'v0': verbosity = 0
if sys.argv[4] == 'v1': verbosity = 1
if sys.argv[4] == 'v2': verbosity = 2
# Set forced overwrite from argument 4.
if sys.argv[5] == 'f': forced = True
if len(sys.argv) > 5:
# Set the logging level from argument 5.
if sys.argv[5] == 'l0': logging = 0
if sys.argv[5] == 'l1': logging = 1
if sys.argv[5] == 'l2': logging = 2
# Set the verbosity level from argument 5.
if sys.argv[5] == 'v0': verbosity = 0
if sys.argv[5] == 'v1': verbosity = 1
if sys.argv[5] == 'v2': verbosity = 2
# Set forced overwrite from argument 5.
if sys.argv[5] == 'f': forced = True
if len(sys.argv) > 6:
# Set the logging level from argument 6.
if sys.argv[6] == 'l0': logging = 0
if sys.argv[6] == 'l1': logging = 1
if sys.argv[6] == 'l2': logging = 2
# Set the verbosity level from argument 6.
if sys.argv[6] == 'v0': verbosity = 0
if sys.argv[6] == 'v1': verbosity = 1
if sys.argv[6] == 'v2': verbosity = 2
# Set forced overwrite from argument 6.
if sys.argv[6] == 'f': forced = True
# Check to see if an input file argument was supplied.
try: sys.argv[1]
except IndexError:
# Display an error and stop execution if the input argument is missing.
# "ERROR-<#>!!! xPress89, No input file was specified on <time>."
errorCounter += 1
message = 'No input file was specified'
if logging > 0: writeLog(logFile, message, time, 89, errorCounter)
if verbosity > 0: dieGracefully(message, 89, errorCounter)
else: sys.exit()
else:
inputFile = sys.argv[2]
inputPath = os.path.dirname(inputFile)
# Display an error and stop execution if the input file does not exist.
if not os.path.exists(inputFile):
# "ERROR-<#>!!! xPress97, The input file specified does not exist on <time>."
errorCounter += 1
message = 'The input file specified does not exist'
if logging > 0: writeLog(logFile, message, time, 97, errorCounter)
if verbosity > 0: dieGracefully(message, 97, errorCounter)
else: sys.exit()
# Check to see if an output file argument was supplied.
try: sys.argv[3]
except IndexError:
# Display an error and stop execution if the output argument is missing.
# "ERROR-<#>!!! xPress108, No output file was specified on <time>."
errorCounter += 1
message = 'No output file was specified'
if logging > 0: writeLog(logFile, message, time, 108, errorCounter)
if verbosity > 0: dieGracefully(message, 108, errorCounter)
else: sys.exit()
else:
outputFile = sys.argv[3]
outputPath = os.path.dirname(outputFile)
tempFile = sys.argv[3]+'-TEMP.dat'
tempPath = os.path.dirname(tempFile)
dictFile = sys.argv[3]+'-DICT.dat'
dictPath = os.path.dirname(dictFile)
# Check to see that a directory exists to put an output file into and display an error if not.
if not os.path.exists(outputPath):
# "ERROR-<#>!!! xPress108, The output file specified relies on an invalid directory on <time>."
errorCounter += 1
message = 'The output file specified relies on an invalid directory'
if logging > 0: writeLog(logFile, message, time, 126, errorCounter)
if verbosity > 0: dieGracefully(message, 126, errorCounter)
else: sys.exit()
return forced, logging, verbosity, tempFile, tempPath, inputFile, inputPath, outputFile, outputPath, dictFile, dictPath
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS
# Define the chunkSize based on fileSize and available memory.
# We need to store 2 copies of the offset buffer and the rest of this application.
# By dynamically setting how much of a file to load into memory at a time, xPress should be hardware agnostic.
# Severely limited machines with memory levels measured in hundreds of megabytes may see less compression performance than machines with more memory.
def defineChunkSize(logging, verbosity, inputFile):
chunkSize = 0
# Get the filesize of the input file.
message = 'Defining chunkSize with inputFile '+str(inputFile)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
fileSize = int(os.path.getsize(inputFile))
# Get the available memory.
mem = psutil.virtual_memory()
availableMemory = mem.available
message = 'Available memory is '+str(availableMemory)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
# Our chunkSize is 1/4 of available memory. This translates to about 1/2 of available memory used once we load each chunk twice.
chunkSize = int(availableMemory) / 100
# If the chunkSize is smaller than the file being processed the entire file becomes the only chunk.
if chunkSize >= fileSize: chunkSize = fileSize
message = 'ChunkSize is '+str(chunkSize)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
return chunkSize
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS
# Define what the file offsets and number of chunks based on fileSize and chunkSize.
# If a file is too big it is divided into small chunks.
# The offset is different from the chunkSize in that it is evenly divisible by the filesize.
# To put it differently, the chunkSize limits global memory usage and the offset allocates an exact quantity of memory for each operation.
def defineOffset(logging, verbosity, inputFile, chunkSize):
offset = chunkCount = result = 'ERROR'
# Verify that the inputFile exists.
if os.path.isfile(inputFile):
# Get the filesize of the input file.
fileSize = int(os.path.getsize(inputFile))
chunkSize = int(chunkSize)
message = 'Defining offset with chunkSize of '+str(chunkSize)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
# Decide if the file should be chunked or processed as a whole.
if fileSize > chunkSize:
chunkCount = int(math.ceil(fileSize / chunkSize))
offset = fileSize
else: chunkCount = 1
offset = fileSize / chunkCount
result = 1
message = 'Offset is '+str(offset)+', chunkCount is '+str(chunkCount)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
else:
offset = chunkCount = result = 'ERROR'
chunkCount = 0
if offset > sys.maxint:
offset = sys.maxint / 10;
return offset, chunkCount, result
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS
# A function for determining a dictLength for the operation.
def defineDictLength(inputFile):
message = 'Defining dictLength with inputFile '+str(inputFile)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
# Get the filesize of an inputFile for hueristics.
size = os.stat(inputFile).st_size
dictLength = int(size - (0.9999 * size))
threshold = 0.5
# Set a default minimum.
if dictLength < 2: dictLength = 2
message = 'DictLength is ' + str(dictLength)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
return dictLength, threshold
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS
# A function to compress a selection of data.
def compressData(dictionary, adjusted, done, logging, verbosity, outputFile, dictLength, counter0, dataLen, lastDataLen, data, filePosition, dictIndexNumber):
if dictLength < 1: dictLength = 5
newLoop = True
i = 0
for i in range(i, dataLen, dictLength):
append0 = "ab"
message = 'Initiating a compression loop. Byte '+str(filePosition)+' of '+str(dataLen)+' in chunk '+str(counter0)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
# Fill up the chars buffer.
chars = data[i:(i+dictLength)]
dataLen = len(data)
dataMatch = data.count(chars)
# Calculate the percentageOf compression achieved in the last loop.
#percentageOf = ((lastDataLen - dataLen) * lastDataLen) / 100
# Check to see that the threshold is being met.
if (dataMatch > 1 and dataLen < lastDataLen) or newLoop == True:
dictIndexNumber += 1
dictIndex = '#'+str(dictIndexNumber)+'$'
lastDataLen = dataLen
data = data.replace(chars, dictIndex)
#lastDictLen = len(dictionary)
dictionary.update({dictIndex : chars})
#percentageOf = ((lastDataLen - dataLen) / lastDataLen) * 100
newLoop = False
continue
else:
# Decrease the dictLengh if results are not forthcoming.
if adjusted < 3:
dictLength = dictLength / 2
if dictLength > int(dataLen / 2): dictLength = dictLength / (dictLength / 2)
message = 'Decreasing the dictLength, currently ' + str(dictLength)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
adjusted += 1
newLoop = False
break
# Increase the dictLengh if results are not forthcoming.
if adjusted >= 3 and adjusted < 8:
dictLength = dictLength * dictLength
if dictLength > int(dataLen / 2): dictLength = dictLength / (dictLength / 2)
message = 'Increasing the dictLength, currently ' + str(dictLength)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
adjusted += 1
newLoop = False
break
# Save uncompressed data to the output file.
done = True
message = 'Skipping bytes '+str(i)+' of '+str(dataLen)+' in chunk '+str(counter0)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
with open(outputFile, append0) as openFile2:
openFile2.write(data)
openFile2.close()
break
return dictionary, adjusted, done, logging, verbosity, outputFile, dictLength, counter0, dataLen, lastDataLen, data, filePosition, dictIndexNumber
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS
# A function to iterate through the temp file and build a dictionary for the file.
def buildDictionary(forced, logging, verbosity, outputFile, inputFile, dictFile, dictLength, threshold, dictionaryPrefix, dictionarySufix, errorCounter):
dictionary = result = data = 'ERROR'
done = False
# Verify that no output file or dict file exists already.
if (os.path.isfile(outputFile) or os.path.isfile(dictFile)) and forced == False:
errorCounter += 1
message = 'The output file or temp files already exist for outputFile '+str(outputFile)
if logging > 0: writeLog(logFile, message, time, 280, errorCounter)
if verbosity > 0: dieGracefully(message, 280, errorCounter)
else: sys.exit()
else:
# Verify that in input file exists.
if os.path.isfile(inputFile):
adjusted = result = dictIndexNumber = counter0 = lastChunk = 0
message = 'Building a dictonary with inputFile '+str(inputFile)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
# Set some variables.
dictionary = {}
dictIndex = '#',str(dictIndexNumber),'$'
tempChunkSize = defineChunkSize(logging, verbosity, inputFile)
tempOffset, tempChunkCount, dOffResult = defineOffset(logging, verbosity, inputFile, tempChunkSize)
if dOffResult != 'ERROR' and tempOffset != 'ERROR' and tempChunkCount > 0:
# Open the input file.
with open(inputFile, "rb") as openFile:
# Stop execution after all chunks are processed.
while counter0 <= tempChunkCount:
# Redefine some variables for each chunk.
adjusted = 0
done = False
# Set the current offset.
filePosition = openFile.tell()
# Fill up the offset buffer.
data = openFile.read(tempOffset)
dataLen = lastDataLen = len(data)
#origDictLength = dictLength
#dictionaryLen = lastDictLen = len(dictionary)
# Select some data and attempt to compress it.
# dictLength can only be set before the first loop for this logic.
# Any changes to dictLength made during this loop DO NOT change this "for."
while done != True: dictionary, adjusted, done, logging, verbosity, outputFile, dictLength, counter0, dataLen, lastDataLen, data, filePosition, dictIndexNumber = compressData(dictionary, adjusted, done, logging, verbosity, outputFile, dictLength, counter0, dataLen, lastDataLen, data, filePosition, dictIndexNumber)
counter0 += 1
if counter0 == tempChunkCount: break
openFile.close()
with open(dictFile, "wb+") as openFile3:
openFile3.write(str(dictionary))
openFile3.close()
else: dictionary = result = data = 'ERROR'
else: dictionary = result = data = 'ERROR'
# Verify that a dictionary file was created and no errors were encountered.
if not os.path.isfile(dictFile) or dictionary == 'ERROR' or result == 'ERROR':
dictionary = result = data ='ERROR'
errorCounter += 1
message = 'The operation failed to generate a dictionary for inputFile '+str(inputFile)
if logging > 0: writeLog(logFile, message, time, 346, errorCounter)
if verbosity > 0: dieGracefully(message, 346, errorCounter)
else: sys.exit()
else:
message = 'Writing a temporary dictFile to '+str(dictFile)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
return dictionary, data, result
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS
# A function to combine the compressed data and the newly created dictionary into a cohesive file.
def compressFile(logging, verbosity, outputFile, compressedData, dictionary, dictionaryPrefix, dictionarySufix, errorCounter):
message = 'Writing dictionary data to outputFile '+str(outputFile)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
archive = open(outputFile, "ab")
archive.write(dictionaryPrefix)
pickle.dump(dictionary, archive)
archive.write(dictionarySufix)
archive.close()
result = 1
# Verify that an output file was created.
if not os.path.isfile(outputFile):
result = 'ERROR'
errorCounter += 1
message = 'There was an error writing the dictionary to the outputFile '+str(outputFile)
if logging > 0: writeLog(logFile, message, time, 373, errorCounter)
if verbosity > 0: dieGracefully(message, 373, errorCounter)
else: sys.exit()
return result
# --------------------------------------------------
# --------------------------------------------------
# EXTRACT
# A function to extract compressed data and reconstruct the dictionary file.
def extractDictionary(logging, verbosity, inputFile, outputFile, dictFile, dictionaryPrefix, dictionarySufix, errorCounter):
result = dictionary = inData = outputRaw = 'ERROR'
# Perform sanity checks before attempting anything.
if os.path.isfile(outputFile) or os.path.isfile(dictFile):
message = 'The output file or temp files already exist for outputFile '+str(outputFile)
if logging > 0: writeLog(logFile, message, time, 390, errorCounter)
if verbosity > 0: dieGracefully(message, 390, errorCounter)
else: sys.exit()
else:
if os.path.isfile(inputFile):
message = 'Extracting dictionary from inputFile '+str(inputFile)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
# Open the input file and put its contents into memory.
with open(inputFile, "rb") as inputData:
inData = inputData.read()
inputData.close()
# Extract the dictionary data from the input data.
dictionaryData = inData[inData.find(dictionaryPrefix)+len(dictionaryPrefix):inData.find(dictionarySufix)].replace(dictionaryPrefix, '').replace(dictionarySufix, '')
# Write the extracted dictionary data to a temporary dictionary file.
with open(dictFile, "wb+") as dictData:
dictData.write(dictionaryData)
dictData.close()
# Create an output file, separate compressed data from dictionary data, and put just the compressed data into it.
with open(outputFile, "wb+") as outputData:
outputRaw = inData.replace(dictionaryPrefix+dictionaryData+dictionarySufix, '')
inData = ''
outputData.write(outputRaw)
outputData.close()
dictionaryData = ''
dictFileOpen = open(dictFile, "rb")
dictionary = pickle.load(dictFileOpen)
dictFileOpen.close()
result = 1
else:
result = dictionary = outputRaw = 'ERROR'
errorCounter += 1
message = 'The operation failed to extract a dictionary from inputFile '+str(inputFile)
if logging > 0: writeLog(logFile, message, time, 428, errorCounter)
if verbosity > 0: dieGracefully(message, 428, errorCounter)
else: sys.exit()
if not os.path.isfile(dictFile):
result = dictionary = outputRaw = 'ERROR'
errorCounter += 1
message = 'The operation failed to write the dictionary to outputFile'+str(outputFile)
if logging > 0: writeLog(logFile, message, time, 438, errorCounter)
if verbosity > 0: dieGracefully(message, 438, errorCounter)
else: sys.exit()
return dictionary, outputRaw, result
# --------------------------------------------------
# --------------------------------------------------
# EXTRACT
# A function to loop through the dictionary and compressed data and look for matches.
def dictionaryLoop(logging, verbosity, compressedData, dictionary, errorCounter):
result = 'ERROR'
matchCount = 0
if not isinstance(dictionary, dict):
errorCounter += 1
message = 'The supplied dictionary is not readable'
if logging > 0: writeLog(logFile, message, time, 455, errorCounter)
if verbosity > 0: dieGracefully(message, 455, errorCounter)
else: sys.exit()
else:
result = 1
message = 'Initiating a decompression loop'
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
# Loop through each key in the dictionary, count and replace matches.
for key, value in dictionary.iteritems():
matchCount = matchCount + compressedData.count(key)
compressedData = compressedData.replace(key, value)
message = 'Decompressing '+str(matchCount)+' matches'
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
return compressedData, dictionary, matchCount, result
# --------------------------------------------------
# --------------------------------------------------
# EXTRACT
# A function to decompress an input file.
def decompressFile(logging, verbosity, outputFile, compressedData, dictionary, errorCounter):
result = 'ERROR'
counter = 1
if not os.path.isfile(outputFile) or compressedData == 'ERROR' or dictionary == 'ERROR':
errorCounter += 1
message = 'Could not decompress outputFile '+str(outputFile)
if logging > 0: writeLog(logFile, message, time, 482, errorCounter)
if verbosity > 0: dieGracefully(message, 482, errorCounter)
else: sys.exit()
else:
result = 1
message = 'Initiating decompressor on inputFile '+str(inputFile)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
# Loop through the file and look for dictionary matches.
while counter != 0:
compressedData, dictionary, counter, result = dictionaryLoop(logging, verbosity, compressedData, dictionary, errorCounter)
if result == 'ERROR' or compressedData == 'ERROR':
errorCounter += 1
message = 'The operation failed during decompression of outputFile '+str(outputFile)
if logging > 0: writeLog(logFile, message, time, 500, errorCounter)
if verbosity > 0: dieGracefully(message, 500, errorCounter)
else: sys.exit()
with open(outputFile, "wb+") as outputData:
outputData.write(compressedData)
outputData.close()
compressedData = ''
if not os.path.isfile(outputFile):
result = 'ERROR'
errorCounter += 1
message = 'The operation failed to create an outputFile '+str(outputFile)
if logging > 0: writeLog(logFile, message, time, 512, errorCounter)
if verbosity > 0: dieGracefully(message, 512, errorCounter)
else: sys.exit()
else: result = 1
return result
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS & EXTRACT
# Display some text to kick things off.
def printWelcome(logging, verbosity):
message = 'Starting xPress Compress on '+str(time)
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1: printGracefully(logPrefix, message)
return 1
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS
# Code to compress a specified file.
if sys.argv[1] == 'c':
forced, logging, verbosity, tempFile, tempPath, inputFile, inputPath, outputFile, outputPath, dictFile, dictPath = parseArgs(logging, verbosity, sys.argv[1:], errorCounter)
printWelcome(logging, verbosity)
dictLength, threshold = defineDictLength(inputFile)
dictionary, compressedData, dictResult = buildDictionary(forced, logging, verbosity, outputFile, inputFile, dictFile, dictLength, threshold, dictionaryPrefix, dictionarySufix, errorCounter)
if dictResult != 'ERROR': compressionResult = compressFile(logging, verbosity, outputFile, compressedData, dictionary, dictionaryPrefix, dictionarySufix, errorCounter)
else:
errorCounter += 1
message = 'The operation failed to create an outputFile '+str(outputFile)
if logging > 0: writeLog(logFile, message, time, 664, errorCounter)
if verbosity > 0: dieGracefully(message, 664, errorCounter)
else: sys.exit()
# --------------------------------------------------
# --------------------------------------------------
# EXTRACT
# Code to extract a specified file.
if sys.argv[1] == 'e':
forced, logging, verbosity, tempFile, tempPath, inputFile, inputPath, outputFile, outputPath, dictFile, dictPath = parseArgs(logging, verbosity, sys.argv[1:], errorCounter)
printWelcome(logging, verbosity)
dictionary, compressedData, dictResult = extractDictionary(logging, verbosity, inputFile, outputFile, dictFile, dictionaryPrefix, dictionarySufix, errorCounter)
if dictResult != 'ERROR': decompressionResult = decompressFile(logging, verbosity, outputFile, compressedData, dictionary, errorCounter)
else:
errorCounter += 1
message = 'The operation failed to create an outputFile '+str(outputFile)
if logging > 0: writeLog(logFile, message, time, 683, errorCounter)
if verbosity > 0: dieGracefully(message, 683, errorCounter)
else: sys.exit()
# --------------------------------------------------
# --------------------------------------------------
# COMPRESS & EXTRACT
message = 'Operation complete'
if logging > 1: writeLog(logFile, message, time, 0, 0)
if verbosity > 1:
printGracefully('', message)
print("\n")
# --------------------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment