diff options
-rw-r--r-- | tools/binman/cbfs_util.py | 9 | ||||
-rw-r--r-- | tools/binman/comp_util.py | 88 | ||||
-rw-r--r-- | tools/binman/entry.py | 3 | ||||
-rw-r--r-- | tools/binman/etype/section.py | 3 | ||||
-rw-r--r-- | tools/binman/ftest.py | 9 | ||||
-rw-r--r-- | tools/patman/tools.py | 79 |
6 files changed, 102 insertions, 89 deletions
diff --git a/tools/binman/cbfs_util.py b/tools/binman/cbfs_util.py index 00664bcf432..2b4178a6854 100644 --- a/tools/binman/cbfs_util.py +++ b/tools/binman/cbfs_util.py @@ -20,6 +20,7 @@ import io import struct import sys +from binman import comp_util from binman import elf from patman import command from patman import tools @@ -240,9 +241,9 @@ class CbfsFile(object): """Handle decompressing data if necessary""" indata = self.data if self.compress == COMPRESS_LZ4: - data = tools.Decompress(indata, 'lz4', with_header=False) + data = comp_util.Decompress(indata, 'lz4', with_header=False) elif self.compress == COMPRESS_LZMA: - data = tools.Decompress(indata, 'lzma', with_header=False) + data = comp_util.Decompress(indata, 'lzma', with_header=False) else: data = indata self.memlen = len(data) @@ -361,9 +362,9 @@ class CbfsFile(object): elif self.ftype == TYPE_RAW: orig_data = data if self.compress == COMPRESS_LZ4: - data = tools.Compress(orig_data, 'lz4', with_header=False) + data = comp_util.Compress(orig_data, 'lz4', with_header=False) elif self.compress == COMPRESS_LZMA: - data = tools.Compress(orig_data, 'lzma', with_header=False) + data = comp_util.Compress(orig_data, 'lzma', with_header=False) self.memlen = len(orig_data) self.data_len = len(data) attr = struct.pack(ATTR_COMPRESSION_FORMAT, diff --git a/tools/binman/comp_util.py b/tools/binman/comp_util.py new file mode 100644 index 00000000000..541e1919dd6 --- /dev/null +++ b/tools/binman/comp_util.py @@ -0,0 +1,88 @@ +# SPDX-License-Identifier: GPL-2.0+ +# Copyright 2022 Google LLC +# +"""Utilities to compress and decompress data""" + +import struct +import tempfile + +from patman import tools + +def Compress(indata, algo, with_header=True): + """Compress some data using a given algorithm + + Note that for lzma this uses an old version of the algorithm, not that + provided by xz. + + This requires 'lz4' and 'lzma_alone' tools. It also requires an output + directory to be previously set up, by calling PrepareOutputDir(). + + Care is taken to use unique temporary files so that this function can be + called from multiple threads. + + Args: + indata: Input data to compress + algo: Algorithm to use ('none', 'gzip', 'lz4' or 'lzma') + + Returns: + Compressed data + """ + if algo == 'none': + return indata + fname = tempfile.NamedTemporaryFile(prefix='%s.comp.tmp' % algo, + dir=tools.GetOutputDir()).name + tools.WriteFile(fname, indata) + if algo == 'lz4': + data = tools.Run('lz4', '--no-frame-crc', '-B4', '-5', '-c', fname, + binary=True) + # cbfstool uses a very old version of lzma + elif algo == 'lzma': + outfname = tempfile.NamedTemporaryFile(prefix='%s.comp.otmp' % algo, + dir=tools.GetOutputDir()).name + tools.Run('lzma_alone', 'e', fname, outfname, '-lc1', '-lp0', '-pb0', + '-d8') + data = tools.ReadFile(outfname) + elif algo == 'gzip': + data = tools.Run('gzip', '-c', fname, binary=True) + else: + raise ValueError("Unknown algorithm '%s'" % algo) + if with_header: + hdr = struct.pack('<I', len(data)) + data = hdr + data + return data + +def Decompress(indata, algo, with_header=True): + """Decompress some data using a given algorithm + + Note that for lzma this uses an old version of the algorithm, not that + provided by xz. + + This requires 'lz4' and 'lzma_alone' tools. It also requires an output + directory to be previously set up, by calling PrepareOutputDir(). + + Args: + indata: Input data to decompress + algo: Algorithm to use ('none', 'gzip', 'lz4' or 'lzma') + + Returns: + Compressed data + """ + if algo == 'none': + return indata + if with_header: + data_len = struct.unpack('<I', indata[:4])[0] + indata = indata[4:4 + data_len] + fname = tools.GetOutputFilename('%s.decomp.tmp' % algo) + with open(fname, 'wb') as fd: + fd.write(indata) + if algo == 'lz4': + data = tools.Run('lz4', '-dc', fname, binary=True) + elif algo == 'lzma': + outfname = tools.GetOutputFilename('%s.decomp.otmp' % algo) + tools.Run('lzma_alone', 'd', fname, outfname) + data = tools.ReadFile(outfname, binary=True) + elif algo == 'gzip': + data = tools.Run('gzip', '-cd', fname, binary=True) + else: + raise ValueError("Unknown algorithm '%s'" % algo) + return data diff --git a/tools/binman/entry.py b/tools/binman/entry.py index 9cd900670e1..5281dcdab6a 100644 --- a/tools/binman/entry.py +++ b/tools/binman/entry.py @@ -11,6 +11,7 @@ import pathlib import sys from binman import bintool +from binman import comp_util from dtoc import fdt_util from patman import tools from patman.tools import ToHex, ToHexSize @@ -1034,7 +1035,7 @@ features to produce new behaviours. self.uncomp_data = indata if self.compress != 'none': self.uncomp_size = len(indata) - data = tools.Compress(indata, self.compress) + data = comp_util.Compress(indata, self.compress) return data @classmethod diff --git a/tools/binman/etype/section.py b/tools/binman/etype/section.py index 221a64cd032..66121cb29a2 100644 --- a/tools/binman/etype/section.py +++ b/tools/binman/etype/section.py @@ -13,6 +13,7 @@ import concurrent.futures import re import sys +from binman import comp_util from binman.entry import Entry from binman import state from dtoc import fdt_util @@ -775,7 +776,7 @@ class Entry_section(Entry): data = parent_data[offset:offset + child.size] if decomp: indata = data - data = tools.Decompress(indata, child.compress) + data = comp_util.Decompress(indata, child.compress) if child.uncomp_size: tout.Info("%s: Decompressing data size %#x with algo '%s' to data size %#x" % (child.GetPath(), len(indata), child.compress, diff --git a/tools/binman/ftest.py b/tools/binman/ftest.py index f543d173997..90d7c3cf593 100644 --- a/tools/binman/ftest.py +++ b/tools/binman/ftest.py @@ -23,6 +23,7 @@ import urllib.error from binman import bintool from binman import cbfs_util from binman import cmdline +from binman import comp_util from binman import control from binman import elf from binman import elf_test @@ -1926,7 +1927,7 @@ class TestFunctional(unittest.TestCase): self._ResetDtbs() def _decompress(self, data): - return tools.Decompress(data, 'lz4') + return comp_util.Decompress(data, 'lz4') def testCompress(self): """Test compression of blobs""" @@ -2805,7 +2806,7 @@ class TestFunctional(unittest.TestCase): def testExtractCbfsRaw(self): """Test extracting CBFS compressed data without decompressing it""" data = self._RunExtractCmd('section/cbfs/u-boot-dtb', decomp=False) - dtb = tools.Decompress(data, 'lzma', with_header=False) + dtb = comp_util.Decompress(data, 'lzma', with_header=False) self.assertEqual(EXTRACT_DTB_SIZE, len(dtb)) def testExtractBadEntry(self): @@ -4232,13 +4233,13 @@ class TestFunctional(unittest.TestCase): # Check compressed data section1 = self._decompress(rest) - expect1 = tools.Compress(COMPRESS_DATA + U_BOOT_DATA, 'lz4') + expect1 = comp_util.Compress(COMPRESS_DATA + U_BOOT_DATA, 'lz4') self.assertEquals(expect1, rest[:len(expect1)]) self.assertEquals(COMPRESS_DATA + U_BOOT_DATA, section1) rest1 = rest[len(expect1):] section2 = self._decompress(rest1) - expect2 = tools.Compress(COMPRESS_DATA + COMPRESS_DATA, 'lz4') + expect2 = comp_util.Compress(COMPRESS_DATA + COMPRESS_DATA, 'lz4') self.assertEquals(expect2, rest1[:len(expect2)]) self.assertEquals(COMPRESS_DATA + COMPRESS_DATA, section2) rest2 = rest1[len(expect2):] diff --git a/tools/patman/tools.py b/tools/patman/tools.py index 072b024646d..5dfecaf917b 100644 --- a/tools/patman/tools.py +++ b/tools/patman/tools.py @@ -7,7 +7,6 @@ import glob import os import shlex import shutil -import struct import sys import tempfile import urllib.request @@ -518,84 +517,6 @@ def ToString(bval): """ return bval.decode('utf-8') -def Compress(indata, algo, with_header=True): - """Compress some data using a given algorithm - - Note that for lzma this uses an old version of the algorithm, not that - provided by xz. - - This requires 'lz4' and 'lzma_alone' tools. It also requires an output - directory to be previously set up, by calling PrepareOutputDir(). - - Care is taken to use unique temporary files so that this function can be - called from multiple threads. - - Args: - indata: Input data to compress - algo: Algorithm to use ('none', 'gzip', 'lz4' or 'lzma') - - Returns: - Compressed data - """ - if algo == 'none': - return indata - fname = tempfile.NamedTemporaryFile(prefix='%s.comp.tmp' % algo, - dir=outdir).name - WriteFile(fname, indata) - if algo == 'lz4': - data = Run('lz4', '--no-frame-crc', '-B4', '-5', '-c', fname, - binary=True) - # cbfstool uses a very old version of lzma - elif algo == 'lzma': - outfname = tempfile.NamedTemporaryFile(prefix='%s.comp.otmp' % algo, - dir=outdir).name - Run('lzma_alone', 'e', fname, outfname, '-lc1', '-lp0', '-pb0', '-d8') - data = ReadFile(outfname) - elif algo == 'gzip': - data = Run('gzip', '-c', fname, binary=True) - else: - raise ValueError("Unknown algorithm '%s'" % algo) - if with_header: - hdr = struct.pack('<I', len(data)) - data = hdr + data - return data - -def Decompress(indata, algo, with_header=True): - """Decompress some data using a given algorithm - - Note that for lzma this uses an old version of the algorithm, not that - provided by xz. - - This requires 'lz4' and 'lzma_alone' tools. It also requires an output - directory to be previously set up, by calling PrepareOutputDir(). - - Args: - indata: Input data to decompress - algo: Algorithm to use ('none', 'gzip', 'lz4' or 'lzma') - - Returns: - Compressed data - """ - if algo == 'none': - return indata - if with_header: - data_len = struct.unpack('<I', indata[:4])[0] - indata = indata[4:4 + data_len] - fname = GetOutputFilename('%s.decomp.tmp' % algo) - with open(fname, 'wb') as fd: - fd.write(indata) - if algo == 'lz4': - data = Run('lz4', '-dc', fname, binary=True) - elif algo == 'lzma': - outfname = GetOutputFilename('%s.decomp.otmp' % algo) - Run('lzma_alone', 'd', fname, outfname) - data = ReadFile(outfname, binary=True) - elif algo == 'gzip': - data = Run('gzip', '-cd', fname, binary=True) - else: - raise ValueError("Unknown algorithm '%s'" % algo) - return data - def ToHex(val): """Convert an integer value (or None) to a string |