| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- from __future__ import print_function
- import collections
- import re
- import sys
- import gzip
- import zlib
- _COMPRESSED_MARKER = 0xFF
- def check_non_ascii(msg):
- for c in msg:
- if ord(c) >= 0x80:
- print(
- 'Unable to generate compressed data: message "{}" contains a non-ascii character "{}".'.format(
- msg, c
- ),
- file=sys.stderr,
- )
- sys.exit(1)
- # Replace <char><space> with <char | 0x80>.
- # Trival scheme to demo/test.
- def space_compression(error_strings):
- for line in error_strings:
- check_non_ascii(line)
- result = ""
- for i in range(len(line)):
- if i > 0 and line[i] == " ":
- result = result[:-1]
- result += "\\{:03o}".format(ord(line[i - 1]))
- else:
- result += line[i]
- error_strings[line] = result
- return None
- # Replace common words with <0x80 | index>.
- # Index is into a table of words stored as aaaaa<0x80|a>bbb<0x80|b>...
- # Replaced words are assumed to have spaces either side to avoid having to store the spaces in the compressed strings.
- def word_compression(error_strings):
- topn = collections.Counter()
- for line in error_strings.keys():
- check_non_ascii(line)
- for word in line.split(" "):
- topn[word] += 1
- # Order not just by frequency, but by expected saving. i.e. prefer a longer string that is used less frequently.
- # Use the word itself for ties so that compression is deterministic.
- def bytes_saved(item):
- w, n = item
- return -((len(w) + 1) * (n - 1)), w
- top128 = sorted(topn.items(), key=bytes_saved)[:128]
- index = [w for w, _ in top128]
- index_lookup = {w: i for i, w in enumerate(index)}
- for line in error_strings.keys():
- result = ""
- need_space = False
- for word in line.split(" "):
- if word in index_lookup:
- result += "\\{:03o}".format(0b10000000 | index_lookup[word])
- need_space = False
- else:
- if need_space:
- result += " "
- need_space = True
- result += word
- error_strings[line] = result.strip()
- return "".join(w[:-1] + "\\{:03o}".format(0b10000000 | ord(w[-1])) for w in index)
- # Replace chars in text with variable length bit sequence.
- # For comparison only (the table is not emitted).
- def huffman_compression(error_strings):
- # https://github.com/tannewt/huffman
- import huffman
- all_strings = "".join(error_strings)
- cb = huffman.codebook(collections.Counter(all_strings).items())
- for line in error_strings:
- b = "1"
- for c in line:
- b += cb[c]
- n = len(b)
- if n % 8 != 0:
- n += 8 - (n % 8)
- result = ""
- for i in range(0, n, 8):
- result += "\\{:03o}".format(int(b[i : i + 8], 2))
- if len(result) > len(line) * 4:
- result = line
- error_strings[line] = result
- # TODO: This would be the prefix lengths and the table ordering.
- return "_" * (10 + len(cb))
- # Replace common N-letter sequences with <0x80 | index>, where
- # the common sequences are stored in a separate table.
- # This isn't very useful, need a smarter way to find top-ngrams.
- def ngram_compression(error_strings):
- topn = collections.Counter()
- N = 2
- for line in error_strings.keys():
- check_non_ascii(line)
- if len(line) < N:
- continue
- for i in range(0, len(line) - N, N):
- topn[line[i : i + N]] += 1
- def bytes_saved(item):
- w, n = item
- return -(len(w) * (n - 1))
- top128 = sorted(topn.items(), key=bytes_saved)[:128]
- index = [w for w, _ in top128]
- index_lookup = {w: i for i, w in enumerate(index)}
- for line in error_strings.keys():
- result = ""
- for i in range(0, len(line) - N + 1, N):
- word = line[i : i + N]
- if word in index_lookup:
- result += "\\{:03o}".format(0b10000000 | index_lookup[word])
- else:
- result += word
- if len(line) % N != 0:
- result += line[len(line) - len(line) % N :]
- error_strings[line] = result.strip()
- return "".join(index)
- def main(collected_path, fn):
- error_strings = collections.OrderedDict()
- max_uncompressed_len = 0
- num_uses = 0
- # Read in all MP_ERROR_TEXT strings.
- with open(collected_path, "r") as f:
- for line in f:
- line = line.strip()
- if not line:
- continue
- num_uses += 1
- error_strings[line] = None
- max_uncompressed_len = max(max_uncompressed_len, len(line))
- # So that objexcept.c can figure out how big the buffer needs to be.
- print("#define MP_MAX_UNCOMPRESSED_TEXT_LEN ({})".format(max_uncompressed_len))
- # Run the compression.
- compressed_data = fn(error_strings)
- # Print the data table.
- print('MP_COMPRESSED_DATA("{}")'.format(compressed_data))
- # Print the replacements.
- for uncomp, comp in error_strings.items():
- if uncomp == comp:
- prefix = ""
- else:
- prefix = "\\{:03o}".format(_COMPRESSED_MARKER)
- print('MP_MATCH_COMPRESSED("{}", "{}{}")'.format(uncomp, prefix, comp))
- # Used to calculate the "true" length of the (escaped) compressed strings.
- def unescape(s):
- return re.sub(r"\\\d\d\d", "!", s)
- # Stats. Note this doesn't include the cost of the decompressor code.
- uncomp_len = sum(len(s) + 1 for s in error_strings.keys())
- comp_len = sum(1 + len(unescape(s)) + 1 for s in error_strings.values())
- data_len = len(compressed_data) + 1 if compressed_data else 0
- print("// Total input length: {}".format(uncomp_len))
- print("// Total compressed length: {}".format(comp_len))
- print("// Total data length: {}".format(data_len))
- print("// Predicted saving: {}".format(uncomp_len - comp_len - data_len))
- # Somewhat meaningless comparison to zlib/gzip.
- all_input_bytes = "\\0".join(error_strings.keys()).encode()
- print()
- if hasattr(gzip, "compress"):
- gzip_len = len(gzip.compress(all_input_bytes)) + num_uses * 4
- print("// gzip length: {}".format(gzip_len))
- print("// Percentage of gzip: {:.1f}%".format(100 * (comp_len + data_len) / gzip_len))
- if hasattr(zlib, "compress"):
- zlib_len = len(zlib.compress(all_input_bytes)) + num_uses * 4
- print("// zlib length: {}".format(zlib_len))
- print("// Percentage of zlib: {:.1f}%".format(100 * (comp_len + data_len) / zlib_len))
- if __name__ == "__main__":
- main(sys.argv[1], word_compression)
|