Skip to content

Instantly share code, notes, and snippets.

@cristian-bicheru
Last active April 11, 2024 02:05
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cristian-bicheru/2375befeac75b01d095f3da8727f979b to your computer and use it in GitHub Desktop.
Save cristian-bicheru/2375befeac75b01d095f3da8727f979b to your computer and use it in GitHub Desktop.
WinAPI Compression and Decompression in Python
import ctypes
RtlDecompressBuffer = ctypes.windll.ntdll.RtlDecompressBuffer
RtlCompressBuffer = ctypes.windll.ntdll.RtlCompressBuffer
RtlGetCompressionWorkSpaceSize = ctypes.windll.ntdll.RtlGetCompressionWorkSpaceSize
Formats = {
"COMPRESSION_FORMAT_LZNT1" : ctypes.c_uint16(2),
"COMPRESSION_FORMAT_XPRESS" : ctypes.c_uint16(3),
"COMPRESSION_FORMAT_XPRESS_HUFF" : ctypes.c_uint16(4)
}
Engines = {
"COMPRESSION_ENGINE_STANDARD" : ctypes.c_uint16(0),
"COMPRESSION_ENGINE_MAXIMUM" : ctypes.c_uint16(256)
}
UncompressedChunkSize = ctypes.c_uint16(4096)
#################################################
# Status Code Format As Defined In ntstatus.h: #
#######################################################################
# Values are 32 bit values laid out as follows: #
# #
# 3 3 2 2 2 2 2 2 2 2 2 2 1 1 1 1 1 1 1 1 1 1 #
# 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0 9 8 7 6 5 4 3 2 1 0 #
# +---+-+-+-----------------------+-------------------------------+ #
# |Sev|C|R| Facility | Code | #
# +---+-+-+-----------------------+-------------------------------+ #
# #
# where #
# #
# Sev - is the severity code #
# #
# 00 - Success #
# 01 - Informational #
# 10 - Warning #
# 11 - Error #
# #
# C - is the Customer code flag #
# #
# R - is a reserved bit #
# #
# Facility - is the facility code #
# #
# Code - is the facility's status code #
#######################################################################
StatusCodes = {
'0b11000000000000000000000000100011' : 'STATUS_BUFFER_TOO_SMALL',
'0b11000000000000000000000010111011' : 'STATUS_NOT_SUPPORTED',
'0b11000000000000000000001001011111' : 'STATUS_UNSUPPORTED_COMPRESSION',
'0b11000000000000000000000000001101' : 'STATUS_INVALID_PARAMETER',
'0b11000000000000000000001001000010' : 'STATUS_BAD_COMPRESSION_BUFFER',
'0b00000000000000000000000100010111' : 'STATUS_BUFFER_ALL_ZEROS'
}
def ConvertStatusCode(Code):
return bin(2**32+Code)
def WinDLLCall(func, *args):
status = ConvertStatusCode(func(*args))
if status[2] == '1' and status [3] == '1':
raise SystemError("Function call failed with code: "+StatusCodes[status])
def compress(UncompressedBuffer, UncompressedBufferSize, Format, Engine):
CompressedBuffer = (ctypes.c_ubyte * UncompressedBufferSize)()
CompressionFormatAndEngine = ctypes.c_uint16(Format.value | Engine.value)
CompressBufferWorkSpaceSize = ctypes.c_uint32()
CompressFragmentWorkSpaceSize = ctypes.c_uint32()
WorkSpace = (CompressFragmentWorkSpaceSize.value * ctypes.c_ubyte)()
FinalCompressedSize = ctypes.c_uint32()
WinDLLCall(RtlGetCompressionWorkSpaceSize,
CompressionFormatAndEngine,
ctypes.byref(CompressBufferWorkSpaceSize),
ctypes.byref(CompressFragmentWorkSpaceSize))
WinDLLCall(RtlCompressBuffer,
CompressionFormatAndEngine,
ctypes.byref(UncompressedBuffer),
ctypes.c_uint32(UncompressedBufferSize),
ctypes.byref(CompressedBuffer),
ctypes.c_uint32(UncompressedBufferSize),
UncompressedChunkSize,
ctypes.byref(FinalCompressedSize),
ctypes.byref(WorkSpace))
return CompressedBuffer, FinalCompressedSize
def decompress(CompressedBuffer, CompressedBufferSize, UncompressedBufferSize, Format):
UncompressedBuffer = (ctypes.c_ubyte * UncompressedBufferSize)()
FinalUncompressedSize = ctypes.c_uint32()
WinDLLCall(RtlDecompressBuffer,
Format,
ctypes.byref(UncompressedBuffer),
ctypes.c_uint32(UncompressedBufferSize),
ctypes.byref(CompressedBuffer),
ctypes.c_uint32(CompressedBufferSize),
ctypes.byref(FinalUncompressedSize))
return UncompressedBuffer, FinalUncompressedSize
def run_test():
data = b"test"*10
size = len(data)
data = (size * ctypes.c_ubyte).from_buffer_copy(data)
compressed, compressedsize = compress(data, size, Formats["COMPRESSION_FORMAT_LZNT1"], Engines["COMPRESSION_ENGINE_STANDARD"])
uncompressed, uncompressedsize = decompress(compressed, compressedsize.value, size, Formats["COMPRESSION_FORMAT_LZNT1"])
assert(all([data[i] == uncompressed[i] for i in range(len(data))]))
print("Test Passed.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment