# Generate test vectors for Windows LZ77 Huffman compression. # # Copyright (c) 2022 Catalyst IT # # GPLv3+. # # This uses the Python ctypes module to access the lower level RTL # compression functions. import sys import argparse from ctypes import create_string_buffer, byref, windll from ctypes.wintypes import USHORT, ULONG, LONG, PULONG, LPVOID, CHAR NTSTATUS = LONG METHODS = { 'LZNT1': 2, 'XPRESS_PLAIN': 3, 'XPRESS_HUFF': 4, '2': 2, '3': 3, '4': 4 } class RtlError(Exception): pass def ntstatus_check(status, f, args): # 0x117 is STATUS_BUFFER_ALL_ZEROS status &= 0xffffffff if status in (0, 0x117): return status msg = { 0xC0000023: "buffer too small", 0xC0000242: "bad compression data", }.get(status, '') raise RtlError(f'NTSTATUS: {status:08X} {msg}') def wrap(f, result, *args): f.restype = result f.argtypes = args f.errcheck = ntstatus_check return f CompressBuffer = wrap(windll.ntdll.RtlCompressBuffer, NTSTATUS, USHORT, LPVOID, ULONG, LPVOID, ULONG, ULONG, PULONG, LPVOID) GetCompressionWorkSpaceSize = wrap(windll.ntdll.RtlGetCompressionWorkSpaceSize, NTSTATUS, USHORT, PULONG, PULONG) DecompressBufferEx = wrap(windll.ntdll.RtlDecompressBufferEx, NTSTATUS, USHORT, LPVOID, ULONG, LPVOID, ULONG, PULONG, LPVOID) def compress(data, format, effort=0): flags = USHORT(format | effort) workspace_size = ULONG(0) fragment_size = ULONG(0) comp_len = ULONG(0) GetCompressionWorkSpaceSize(flags, byref(workspace_size), byref(fragment_size)) workspace = create_string_buffer(workspace_size.value) output_len = len(data) * 9 // 8 + 260 output_buf = bytearray(output_len) CompressBuffer(flags, (CHAR * 1).from_buffer(data), len(data), (CHAR * 1).from_buffer(output_buf), output_len, 4096, byref(comp_len), workspace) return output_buf[:comp_len.value] def decompress(data, format, target_size=None): flags = USHORT(format) workspace_size = ULONG(0) fragment_size = ULONG(0) decomp_len = ULONG(0) GetCompressionWorkSpaceSize(flags, byref(workspace_size), byref(fragment_size)) workspace = create_string_buffer(workspace_size.value) if target_size is None: output_len = len(data) * 10 else: output_len = target_size output_buf = bytearray(output_len) DecompressBufferEx(format, (CHAR * 1).from_buffer(output_buf), len(output_buf), (CHAR * 1).from_buffer(data), len(data), byref(decomp_len), workspace) return output_buf[:decomp_len.value] def main(): if sys.getwindowsversion().major < 7: print("this probably won't work on your very old version of Windows\n" "but we'll try anyway!", file=sys.stderr) parser = argparse.ArgumentParser() parser.add_argument('-d', '--decompress', action='store_true', help='decompress instead of compress') parser.add_argument('-m', '--method', default='XPRESS_HUFF', choices=list(METHODS.keys()), help='use this compression method') parser.add_argument('-e', '--extra-effort', action='store_true', help='use extra effort to compress') parser.add_argument('-s', '--decompressed-size', type=int, help=('decompress to this size ' '(required for XPRESS_HUFF')) parser.add_argument('-o', '--output', help='write to this file') parser.add_argument('-i', '--input', help='read data from this file') args = parser.parse_args() method = METHODS[args.method] if all((args.decompress, args.decompressed_size is None, method == 4)): print("a size is required for XPRESS_HUFF decompression") sys.exit(1) with open(args.input, 'rb') as f: data = bytearray(f.read()) if args.decompress: output = decompress(data, method, args.decompressed_size) else: effort = 1 if args.extra_effort else 0 output = compress(data, method, effort) with open(args.output, 'wb') as f: f.write(output) main()