#!/usr/bin/env python #-*- coding: utf8 -*- # ----------------------------------------------------------------------------------------- # This is a Python 3 port of compressed_rtf at https://github.com/delimitry/compressed_rtf, # which is MIT licensed. # ----------------------------------------------------------------------------------------- """ Compressed Rich Text Format (RTF) worker Based on Rich Text Format (RTF) Compression Algorithm https://msdn.microsoft.com/en-us/library/cc463890(v=exchg.80).aspx """ import struct from crc32 import crc32 from io import BytesIO __all__ = ['decompress', 'compress'] INIT_DICT = ( b'{\\rtf1\\ansi\\mac\\deff0\\deftab720{\\fonttbl;}{\\f0\\fnil \\froman \\' b'fswiss \\fmodern \\fscript \\fdecor MS Sans SerifSymbolArialTimes New ' b'RomanCourier{\\colortbl\\red0\\green0\\blue0\r\n\\par \\pard\\plain\\' b'f0\\fs20\\b\\i\\u\\tab\\tx' ) INIT_DICT_SIZE = 207 MAX_DICT_SIZE = 4096 COMPRESSED = b'LZFu' UNCOMPRESSED = b'MELA' def decompress(data): """ Decompress data """ # set init dict init_dict = list(INIT_DICT) init_dict += ' ' * (MAX_DICT_SIZE - INIT_DICT_SIZE) if len(data) < 16: raise Exception('Data must be at least 16 bytes long') write_offset = INIT_DICT_SIZE output_buffer = BytesIO() # make stream in_stream = BytesIO(data) # read compressed RTF header comp_size = struct.unpack('H', val)[0] # big-endian # extract [12 bit offset][4 bit length] offset = (token >> 4) & 0b111111111111 length = token & 0b1111 # end indicator if write_offset == offset: end = True break actual_length = length + 2 for step in range(actual_length): read_offset = (offset + step) % MAX_DICT_SIZE char = init_dict[read_offset] output_buffer.write(bytes([char])) init_dict[write_offset] = char write_offset = (write_offset + 1) % MAX_DICT_SIZE else: # token is literal (8 bit) val = contents.read(1) if not val: break output_buffer.write(val) init_dict[write_offset] = val[0] write_offset = (write_offset + 1) % MAX_DICT_SIZE elif comp_type == UNCOMPRESSED: return contents.read(raw_size) else: raise Exception('Unknown type of RTF compression!') return output_buffer.getvalue() def compress(data, compressed=True): """ Compress `data` with `compressed` flag """ output_buffer = '' # set init dict init_dict = list(INIT_DICT + ' ' * (MAX_DICT_SIZE - INIT_DICT_SIZE)) write_offset = INIT_DICT_SIZE # compressed if compressed: comp_type = COMPRESSED # make stream in_stream = BytesIO(data) # init params control_byte = 0 control_bit = 1 token_offset = 0 token_buffer = '' match_len = 0 longest_match = 0 while True: # find longest match dict_offset, longest_match, write_offset = \ _find_longest_match(init_dict, in_stream, write_offset) char = in_stream.read(longest_match if longest_match > 1 else 1) # EOF input stream if not char: # update params control_byte |= 1 << control_bit - 1 control_bit += 1 token_offset += 2 # add dict reference dict_ref = (write_offset & 0xfff) << 4 token_buffer += struct.pack('>H', dict_ref) # add to output output_buffer += struct.pack('B', control_byte) output_buffer += token_buffer[:token_offset] break else: if longest_match > 1: # update params control_byte |= 1 << control_bit - 1 control_bit += 1 token_offset += 2 # add dict reference dict_ref = (dict_offset & 0xfff) << 4 | ( longest_match - 2) & 0xf token_buffer += struct.pack('>H', dict_ref) else: # character is not found in dictionary if longest_match == 0: init_dict[write_offset] = char write_offset = (write_offset + 1) % MAX_DICT_SIZE # update params control_byte |= 0 << control_bit - 1 control_bit += 1 token_offset += 1 # add literal token_buffer += char longest_match = 0 if control_bit > 8: # add to output output_buffer += struct.pack('B', control_byte) output_buffer += token_buffer[:token_offset] # reset params control_byte = 0 control_bit = 1 token_offset = 0 token_buffer = '' else: # if uncompressed - copy data to output comp_type = UNCOMPRESSED output_buffer = data # write compressed RTF header comp_size = struct.pack(' longest_match_len: dict_offset = dict_index - match_len + 1 # add to dictionary and update longest match init_dict[write_offset] = char write_offset = (write_offset + 1) % MAX_DICT_SIZE longest_match_len = match_len # read the next char char = stream.read(1) if not char: stream.seek(stream.tell() - match_len, 0) return dict_offset, longest_match_len, write_offset else: stream.seek(stream.tell() - match_len - 1, 0) match_len = 0 # read the first char char = stream.read(1) if not char: break dict_index += 1 if dict_index >= prev_write_offset + longest_match_len: break stream.seek(stream.tell() - match_len - 1, 0) return dict_offset, longest_match_len, write_offset