summaryrefslogtreecommitdiff
path: root/tools/u_boot_pylib
diff options
context:
space:
mode:
Diffstat (limited to 'tools/u_boot_pylib')
-rw-r--r--tools/u_boot_pylib/LICENSE339
-rw-r--r--tools/u_boot_pylib/README.rst15
-rw-r--r--tools/u_boot_pylib/__init__.py4
-rwxr-xr-xtools/u_boot_pylib/__main__.py22
-rw-r--r--tools/u_boot_pylib/command.py221
-rw-r--r--tools/u_boot_pylib/cros_subprocess.py401
-rw-r--r--tools/u_boot_pylib/gitutil.py886
-rw-r--r--tools/u_boot_pylib/pyproject.toml25
-rw-r--r--tools/u_boot_pylib/requirements.txt1
-rw-r--r--tools/u_boot_pylib/terminal.py346
-rw-r--r--tools/u_boot_pylib/test_util.py229
-rw-r--r--tools/u_boot_pylib/tools.py612
-rw-r--r--tools/u_boot_pylib/tout.py190
l---------tools/u_boot_pylib/u_boot_pylib1
14 files changed, 3292 insertions, 0 deletions
diff --git a/tools/u_boot_pylib/LICENSE b/tools/u_boot_pylib/LICENSE
new file mode 100644
index 00000000000..d159169d105
--- /dev/null
+++ b/tools/u_boot_pylib/LICENSE
@@ -0,0 +1,339 @@
+ GNU GENERAL PUBLIC LICENSE
+ Version 2, June 1991
+
+ Copyright (C) 1989, 1991 Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+ Preamble
+
+ The licenses for most software are designed to take away your
+freedom to share and change it. By contrast, the GNU General Public
+License is intended to guarantee your freedom to share and change free
+software--to make sure the software is free for all its users. This
+General Public License applies to most of the Free Software
+Foundation's software and to any other program whose authors commit to
+using it. (Some other Free Software Foundation software is covered by
+the GNU Lesser General Public License instead.) You can apply it to
+your programs, too.
+
+ When we speak of free software, we are referring to freedom, not
+price. Our General Public Licenses are designed to make sure that you
+have the freedom to distribute copies of free software (and charge for
+this service if you wish), that you receive source code or can get it
+if you want it, that you can change the software or use pieces of it
+in new free programs; and that you know you can do these things.
+
+ To protect your rights, we need to make restrictions that forbid
+anyone to deny you these rights or to ask you to surrender the rights.
+These restrictions translate to certain responsibilities for you if you
+distribute copies of the software, or if you modify it.
+
+ For example, if you distribute copies of such a program, whether
+gratis or for a fee, you must give the recipients all the rights that
+you have. You must make sure that they, too, receive or can get the
+source code. And you must show them these terms so they know their
+rights.
+
+ We protect your rights with two steps: (1) copyright the software, and
+(2) offer you this license which gives you legal permission to copy,
+distribute and/or modify the software.
+
+ Also, for each author's protection and ours, we want to make certain
+that everyone understands that there is no warranty for this free
+software. If the software is modified by someone else and passed on, we
+want its recipients to know that what they have is not the original, so
+that any problems introduced by others will not reflect on the original
+authors' reputations.
+
+ Finally, any free program is threatened constantly by software
+patents. We wish to avoid the danger that redistributors of a free
+program will individually obtain patent licenses, in effect making the
+program proprietary. To prevent this, we have made it clear that any
+patent must be licensed for everyone's free use or not licensed at all.
+
+ The precise terms and conditions for copying, distribution and
+modification follow.
+
+ GNU GENERAL PUBLIC LICENSE
+ TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
+
+ 0. This License applies to any program or other work which contains
+a notice placed by the copyright holder saying it may be distributed
+under the terms of this General Public License. The "Program", below,
+refers to any such program or work, and a "work based on the Program"
+means either the Program or any derivative work under copyright law:
+that is to say, a work containing the Program or a portion of it,
+either verbatim or with modifications and/or translated into another
+language. (Hereinafter, translation is included without limitation in
+the term "modification".) Each licensee is addressed as "you".
+
+Activities other than copying, distribution and modification are not
+covered by this License; they are outside its scope. The act of
+running the Program is not restricted, and the output from the Program
+is covered only if its contents constitute a work based on the
+Program (independent of having been made by running the Program).
+Whether that is true depends on what the Program does.
+
+ 1. You may copy and distribute verbatim copies of the Program's
+source code as you receive it, in any medium, provided that you
+conspicuously and appropriately publish on each copy an appropriate
+copyright notice and disclaimer of warranty; keep intact all the
+notices that refer to this License and to the absence of any warranty;
+and give any other recipients of the Program a copy of this License
+along with the Program.
+
+You may charge a fee for the physical act of transferring a copy, and
+you may at your option offer warranty protection in exchange for a fee.
+
+ 2. You may modify your copy or copies of the Program or any portion
+of it, thus forming a work based on the Program, and copy and
+distribute such modifications or work under the terms of Section 1
+above, provided that you also meet all of these conditions:
+
+ a) You must cause the modified files to carry prominent notices
+ stating that you changed the files and the date of any change.
+
+ b) You must cause any work that you distribute or publish, that in
+ whole or in part contains or is derived from the Program or any
+ part thereof, to be licensed as a whole at no charge to all third
+ parties under the terms of this License.
+
+ c) If the modified program normally reads commands interactively
+ when run, you must cause it, when started running for such
+ interactive use in the most ordinary way, to print or display an
+ announcement including an appropriate copyright notice and a
+ notice that there is no warranty (or else, saying that you provide
+ a warranty) and that users may redistribute the program under
+ these conditions, and telling the user how to view a copy of this
+ License. (Exception: if the Program itself is interactive but
+ does not normally print such an announcement, your work based on
+ the Program is not required to print an announcement.)
+
+These requirements apply to the modified work as a whole. If
+identifiable sections of that work are not derived from the Program,
+and can be reasonably considered independent and separate works in
+themselves, then this License, and its terms, do not apply to those
+sections when you distribute them as separate works. But when you
+distribute the same sections as part of a whole which is a work based
+on the Program, the distribution of the whole must be on the terms of
+this License, whose permissions for other licensees extend to the
+entire whole, and thus to each and every part regardless of who wrote it.
+
+Thus, it is not the intent of this section to claim rights or contest
+your rights to work written entirely by you; rather, the intent is to
+exercise the right to control the distribution of derivative or
+collective works based on the Program.
+
+In addition, mere aggregation of another work not based on the Program
+with the Program (or with a work based on the Program) on a volume of
+a storage or distribution medium does not bring the other work under
+the scope of this License.
+
+ 3. You may copy and distribute the Program (or a work based on it,
+under Section 2) in object code or executable form under the terms of
+Sections 1 and 2 above provided that you also do one of the following:
+
+ a) Accompany it with the complete corresponding machine-readable
+ source code, which must be distributed under the terms of Sections
+ 1 and 2 above on a medium customarily used for software interchange; or,
+
+ b) Accompany it with a written offer, valid for at least three
+ years, to give any third party, for a charge no more than your
+ cost of physically performing source distribution, a complete
+ machine-readable copy of the corresponding source code, to be
+ distributed under the terms of Sections 1 and 2 above on a medium
+ customarily used for software interchange; or,
+
+ c) Accompany it with the information you received as to the offer
+ to distribute corresponding source code. (This alternative is
+ allowed only for noncommercial distribution and only if you
+ received the program in object code or executable form with such
+ an offer, in accord with Subsection b above.)
+
+The source code for a work means the preferred form of the work for
+making modifications to it. For an executable work, complete source
+code means all the source code for all modules it contains, plus any
+associated interface definition files, plus the scripts used to
+control compilation and installation of the executable. However, as a
+special exception, the source code distributed need not include
+anything that is normally distributed (in either source or binary
+form) with the major components (compiler, kernel, and so on) of the
+operating system on which the executable runs, unless that component
+itself accompanies the executable.
+
+If distribution of executable or object code is made by offering
+access to copy from a designated place, then offering equivalent
+access to copy the source code from the same place counts as
+distribution of the source code, even though third parties are not
+compelled to copy the source along with the object code.
+
+ 4. You may not copy, modify, sublicense, or distribute the Program
+except as expressly provided under this License. Any attempt
+otherwise to copy, modify, sublicense or distribute the Program is
+void, and will automatically terminate your rights under this License.
+However, parties who have received copies, or rights, from you under
+this License will not have their licenses terminated so long as such
+parties remain in full compliance.
+
+ 5. You are not required to accept this License, since you have not
+signed it. However, nothing else grants you permission to modify or
+distribute the Program or its derivative works. These actions are
+prohibited by law if you do not accept this License. Therefore, by
+modifying or distributing the Program (or any work based on the
+Program), you indicate your acceptance of this License to do so, and
+all its terms and conditions for copying, distributing or modifying
+the Program or works based on it.
+
+ 6. Each time you redistribute the Program (or any work based on the
+Program), the recipient automatically receives a license from the
+original licensor to copy, distribute or modify the Program subject to
+these terms and conditions. You may not impose any further
+restrictions on the recipients' exercise of the rights granted herein.
+You are not responsible for enforcing compliance by third parties to
+this License.
+
+ 7. If, as a consequence of a court judgment or allegation of patent
+infringement or for any other reason (not limited to patent issues),
+conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot
+distribute so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you
+may not distribute the Program at all. For example, if a patent
+license would not permit royalty-free redistribution of the Program by
+all those who receive copies directly or indirectly through you, then
+the only way you could satisfy both it and this License would be to
+refrain entirely from distribution of the Program.
+
+If any portion of this section is held invalid or unenforceable under
+any particular circumstance, the balance of the section is intended to
+apply and the section as a whole is intended to apply in other
+circumstances.
+
+It is not the purpose of this section to induce you to infringe any
+patents or other property right claims or to contest validity of any
+such claims; this section has the sole purpose of protecting the
+integrity of the free software distribution system, which is
+implemented by public license practices. Many people have made
+generous contributions to the wide range of software distributed
+through that system in reliance on consistent application of that
+system; it is up to the author/donor to decide if he or she is willing
+to distribute software through any other system and a licensee cannot
+impose that choice.
+
+This section is intended to make thoroughly clear what is believed to
+be a consequence of the rest of this License.
+
+ 8. If the distribution and/or use of the Program is restricted in
+certain countries either by patents or by copyrighted interfaces, the
+original copyright holder who places the Program under this License
+may add an explicit geographical distribution limitation excluding
+those countries, so that distribution is permitted only in or among
+countries not thus excluded. In such case, this License incorporates
+the limitation as if written in the body of this License.
+
+ 9. The Free Software Foundation may publish revised and/or new versions
+of the General Public License from time to time. Such new versions will
+be similar in spirit to the present version, but may differ in detail to
+address new problems or concerns.
+
+Each version is given a distinguishing version number. If the Program
+specifies a version number of this License which applies to it and "any
+later version", you have the option of following the terms and conditions
+either of that version or of any later version published by the Free
+Software Foundation. If the Program does not specify a version number of
+this License, you may choose any version ever published by the Free Software
+Foundation.
+
+ 10. If you wish to incorporate parts of the Program into other free
+programs whose distribution conditions are different, write to the author
+to ask for permission. For software which is copyrighted by the Free
+Software Foundation, write to the Free Software Foundation; we sometimes
+make exceptions for this. Our decision will be guided by the two goals
+of preserving the free status of all derivatives of our free software and
+of promoting the sharing and reuse of software generally.
+
+ NO WARRANTY
+
+ 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
+FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
+OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
+PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED
+OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS
+TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE
+PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING,
+REPAIR OR CORRECTION.
+
+ 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
+REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES,
+INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING
+OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED
+TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY
+YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER
+PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGES.
+
+ END OF TERMS AND CONDITIONS
+
+ How to Apply These Terms to Your New Programs
+
+ If you develop a new program, and you want it to be of the greatest
+possible use to the public, the best way to achieve this is to make it
+free software which everyone can redistribute and change under these terms.
+
+ To do so, attach the following notices to the program. It is safest
+to attach them to the start of each source file to most effectively
+convey the exclusion of warranty; and each file should have at least
+the "copyright" line and a pointer to where the full notice is found.
+
+ <one line to give the program's name and a brief idea of what it does.>
+ Copyright (C) <year> <name of author>
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along
+ with this program; if not, write to the Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+Also add information on how to contact you by electronic and paper mail.
+
+If the program is interactive, make it output a short notice like this
+when it starts in an interactive mode:
+
+ Gnomovision version 69, Copyright (C) year name of author
+ Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
+ This is free software, and you are welcome to redistribute it
+ under certain conditions; type `show c' for details.
+
+The hypothetical commands `show w' and `show c' should show the appropriate
+parts of the General Public License. Of course, the commands you use may
+be called something other than `show w' and `show c'; they could even be
+mouse-clicks or menu items--whatever suits your program.
+
+You should also get your employer (if you work as a programmer) or your
+school, if any, to sign a "copyright disclaimer" for the program, if
+necessary. Here is a sample; alter the names:
+
+ Yoyodyne, Inc., hereby disclaims all copyright interest in the program
+ `Gnomovision' (which makes passes at compilers) written by James Hacker.
+
+ <signature of Ty Coon>, 1 April 1989
+ Ty Coon, President of Vice
+
+This General Public License does not permit incorporating your program into
+proprietary programs. If your program is a subroutine library, you may
+consider it more useful to permit linking proprietary applications with the
+library. If this is what you want to do, use the GNU Lesser General
+Public License instead of this License.
diff --git a/tools/u_boot_pylib/README.rst b/tools/u_boot_pylib/README.rst
new file mode 100644
index 00000000000..36a18256d8b
--- /dev/null
+++ b/tools/u_boot_pylib/README.rst
@@ -0,0 +1,15 @@
+.. SPDX-License-Identifier: GPL-2.0+
+
+# U-Boot Python Library
+=======================
+
+This is a Python library used by various U-Boot tools, including patman,
+buildman and binman.
+
+The module can be installed with pip::
+
+ pip install u_boot_pylib
+
+or via setup.py::
+
+ ./setup.py install [--user]
diff --git a/tools/u_boot_pylib/__init__.py b/tools/u_boot_pylib/__init__.py
new file mode 100644
index 00000000000..807a62e0743
--- /dev/null
+++ b/tools/u_boot_pylib/__init__.py
@@ -0,0 +1,4 @@
+# SPDX-License-Identifier: GPL-2.0+
+
+__all__ = ['command', 'cros_subprocess', 'gitutil', 'terminal', 'test_util',
+ 'tools', 'tout']
diff --git a/tools/u_boot_pylib/__main__.py b/tools/u_boot_pylib/__main__.py
new file mode 100755
index 00000000000..d86b9d7dce0
--- /dev/null
+++ b/tools/u_boot_pylib/__main__.py
@@ -0,0 +1,22 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Copyright 2023 Google LLC
+#
+
+import os
+import sys
+
+if __name__ == "__main__":
+ # Allow 'from u_boot_pylib import xxx to work'
+ our_path = os.path.dirname(os.path.realpath(__file__))
+ sys.path.append(os.path.join(our_path, '..'))
+
+ # Run tests
+ from u_boot_pylib import test_util
+
+ result = test_util.run_test_suites(
+ 'u_boot_pylib', False, False, False, False, None, None, None,
+ ['terminal'])
+
+ sys.exit(0 if result.wasSuccessful() else 1)
diff --git a/tools/u_boot_pylib/command.py b/tools/u_boot_pylib/command.py
new file mode 100644
index 00000000000..cb7ebf49ce5
--- /dev/null
+++ b/tools/u_boot_pylib/command.py
@@ -0,0 +1,221 @@
+# SPDX-License-Identifier: GPL-2.0+
+"""
+Shell command ease-ups for Python
+
+Copyright (c) 2011 The Chromium OS Authors.
+"""
+
+import subprocess
+
+from u_boot_pylib import cros_subprocess
+
+# This permits interception of RunPipe for test purposes. If it is set to
+# a function, then that function is called with the pipe list being
+# executed. Otherwise, it is assumed to be a CommandResult object, and is
+# returned as the result for every run_pipe() call.
+# When this value is None, commands are executed as normal.
+TEST_RESULT = None
+
+
+class CommandExc(Exception):
+ """Reports an exception to the caller"""
+ def __init__(self, msg, result):
+ """Set up a new exception object
+
+ Args:
+ result (CommandResult): Execution result so far
+ """
+ super().__init__(msg)
+ self.result = result
+
+
+class CommandResult:
+ """A class which captures the result of executing a command.
+
+ Members:
+ stdout (bytes): stdout obtained from command, as a string
+ stderr (bytes): stderr obtained from command, as a string
+ combined (bytes): stdout and stderr interleaved
+ return_code (int): Return code from command
+ exception (Exception): Exception received, or None if all ok
+ output (str or None): Returns output as a single line if requested
+ """
+ def __init__(self, stdout='', stderr='', combined='', return_code=0,
+ exception=None):
+ self.stdout = stdout
+ self.stderr = stderr
+ self.combined = combined
+ self.return_code = return_code
+ self.exception = exception
+ self.output = None
+
+ def to_output(self, binary):
+ """Converts binary output to its final form
+
+ Args:
+ binary (bool): True to report binary output, False to use strings
+ Returns:
+ self
+ """
+ if not binary:
+ self.stdout = self.stdout.decode('utf-8')
+ self.stderr = self.stderr.decode('utf-8')
+ self.combined = self.combined.decode('utf-8')
+ return self
+
+
+def run_pipe(pipe_list, infile=None, outfile=None, capture=False,
+ capture_stderr=False, oneline=False, raise_on_error=True, cwd=None,
+ binary=False, output_func=None, **kwargs):
+ """
+ Perform a command pipeline, with optional input/output filenames.
+
+ Args:
+ pipe_list (list of list): List of command lines to execute. Each command
+ line is piped into the next, and is itself a list of strings. For
+ example [ ['ls', '.git'] ['wc'] ] will pipe the output of
+ 'ls .git' into 'wc'.
+ infile (str): File to provide stdin to the pipeline
+ outfile (str): File to store stdout
+ capture (bool): True to capture output
+ capture_stderr (bool): True to capture stderr
+ oneline (bool): True to strip newline chars from output
+ raise_on_error (bool): True to raise on an error, False to return it in
+ the CommandResult
+ cwd (str or None): Directory to run the command in
+ binary (bool): True to report binary output, False to use strings
+ output_func (function): Output function to call with each output
+ fragment (if it returns True the function terminates)
+ **kwargs: Additional keyword arguments to cros_subprocess.Popen()
+ Returns:
+ CommandResult object
+ Raises:
+ CommandExc if an exception happens
+ """
+ if TEST_RESULT:
+ if hasattr(TEST_RESULT, '__call__'):
+ # pylint: disable=E1102
+ result = TEST_RESULT(pipe_list=pipe_list)
+ if result:
+ return result
+ else:
+ return TEST_RESULT
+ # No result: fall through to normal processing
+ result = CommandResult(b'', b'', b'')
+ last_pipe = None
+ pipeline = list(pipe_list)
+ user_pipestr = '|'.join([' '.join(pipe) for pipe in pipe_list])
+ kwargs['stdout'] = None
+ kwargs['stderr'] = None
+ while pipeline:
+ cmd = pipeline.pop(0)
+ if last_pipe is not None:
+ kwargs['stdin'] = last_pipe.stdout
+ elif infile:
+ kwargs['stdin'] = open(infile, 'rb')
+ if pipeline or capture:
+ kwargs['stdout'] = cros_subprocess.PIPE
+ elif outfile:
+ kwargs['stdout'] = open(outfile, 'wb')
+ if capture_stderr:
+ kwargs['stderr'] = cros_subprocess.PIPE
+
+ try:
+ last_pipe = cros_subprocess.Popen(cmd, cwd=cwd, **kwargs)
+ except Exception as err:
+ result.exception = err
+ if raise_on_error:
+ raise CommandExc(f"Error running '{user_pipestr}': {err}",
+ result) from err
+ result.return_code = 255
+ return result.to_output(binary)
+
+ if capture:
+ result.stdout, result.stderr, result.combined = (
+ last_pipe.communicate_filter(output_func))
+ if result.stdout and oneline:
+ result.output = result.stdout.rstrip(b'\r\n')
+ result.return_code = last_pipe.wait()
+ if raise_on_error and result.return_code:
+ raise CommandExc(f"Error running '{user_pipestr}'", result)
+ return result.to_output(binary)
+
+
+def output(*cmd, **kwargs):
+ """Run a command and return its output
+
+ Args:
+ *cmd (list of str): Command to run
+ **kwargs (dict of args): Extra arguments to pass in
+
+ Returns:
+ str: command output
+ """
+ kwargs['raise_on_error'] = kwargs.get('raise_on_error', True)
+ return run_pipe([cmd], capture=True, **kwargs).stdout
+
+
+def output_one_line(*cmd, **kwargs):
+ """Run a command and output it as a single-line string
+
+ The command is expected to produce a single line of output
+
+ Args:
+ *cmd (list of str): Command to run
+ **kwargs (dict of args): Extra arguments to pass in
+
+ Returns:
+ str: output of command with all newlines removed
+ """
+ raise_on_error = kwargs.pop('raise_on_error', True)
+ result = run_pipe([cmd], capture=True, oneline=True,
+ raise_on_error=raise_on_error, **kwargs).stdout.strip()
+ return result
+
+
+def run(*cmd, **kwargs):
+ """Run a command
+
+ Note that you must add 'capture' to kwargs to obtain non-empty output
+
+ Args:
+ *cmd (list of str): Command to run
+ **kwargs (dict of args): Extra arguments to pass in
+
+ Returns:
+ str: output of command
+ """
+ return run_pipe([cmd], **kwargs).stdout
+
+
+def run_one(*cmd, **kwargs):
+ """Run a single command
+
+ Note that you must add 'capture' to kwargs to obtain non-empty output
+
+ Args:
+ *cmd (list of str): Command to run
+ **kwargs (dict of args): Extra arguments to pass in
+
+ Returns:
+ CommandResult: output of command
+ """
+ return run_pipe([cmd], **kwargs)
+
+
+def run_list(cmd, **kwargs):
+ """Run a command and return its output
+
+ Args:
+ cmd (list of str): Command to run
+
+ Returns:
+ str: output of command
+ **kwargs (dict of args): Extra arguments to pass in
+ """
+ return run_pipe([cmd], capture=True, **kwargs).stdout
+
+
+def stop_all():
+ """Stop all subprocesses initiated with cros_subprocess"""
+ cros_subprocess.stay_alive = False
diff --git a/tools/u_boot_pylib/cros_subprocess.py b/tools/u_boot_pylib/cros_subprocess.py
new file mode 100644
index 00000000000..cd614f38a64
--- /dev/null
+++ b/tools/u_boot_pylib/cros_subprocess.py
@@ -0,0 +1,401 @@
+# Copyright (c) 2012 The Chromium OS Authors.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+#
+# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
+# Licensed to PSF under a Contributor Agreement.
+# See http://www.python.org/2.4/license for licensing details.
+
+"""Subprocess execution
+
+This module holds a subclass of subprocess.Popen with our own required
+features, mainly that we get access to the subprocess output while it
+is running rather than just at the end. This makes it easier to show
+progress information and filter output in real time.
+"""
+
+import errno
+import os
+import pty
+import select
+import subprocess
+import sys
+import unittest
+
+
+# Import these here so the caller does not need to import subprocess also.
+PIPE = subprocess.PIPE
+STDOUT = subprocess.STDOUT
+PIPE_PTY = -3 # Pipe output through a pty
+stay_alive = True
+
+
+class Popen(subprocess.Popen):
+ """Like subprocess.Popen with ptys and incremental output
+
+ This class deals with running a child process and filtering its output on
+ both stdout and stderr while it is running. We do this so we can monitor
+ progress, and possibly relay the output to the user if requested.
+
+ The class is similar to subprocess.Popen, the equivalent is something like:
+
+ Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ But this class has many fewer features, and two enhancement:
+
+ 1. Rather than getting the output data only at the end, this class sends it
+ to a provided operation as it arrives.
+ 2. We use pseudo terminals so that the child will hopefully flush its output
+ to us as soon as it is produced, rather than waiting for the end of a
+ line.
+
+ Use communicate_filter() to handle output from the subprocess.
+
+ """
+
+ def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY,
+ shell=False, cwd=None, env=None, **kwargs):
+ """Cut-down constructor
+
+ Args:
+ args: Program and arguments for subprocess to execute.
+ stdin: See subprocess.Popen()
+ stdout: See subprocess.Popen(), except that we support the sentinel
+ value of cros_subprocess.PIPE_PTY.
+ stderr: See subprocess.Popen(), except that we support the sentinel
+ value of cros_subprocess.PIPE_PTY.
+ shell: See subprocess.Popen()
+ cwd: Working directory to change to for subprocess, or None if none.
+ env: Environment to use for this subprocess, or None to inherit parent.
+ kwargs: No other arguments are supported at the moment. Passing other
+ arguments will cause a ValueError to be raised.
+ """
+ stdout_pty = None
+ stderr_pty = None
+
+ if stdout == PIPE_PTY:
+ stdout_pty = pty.openpty()
+ stdout = os.fdopen(stdout_pty[1])
+ if stderr == PIPE_PTY:
+ stderr_pty = pty.openpty()
+ stderr = os.fdopen(stderr_pty[1])
+
+ super(Popen, self).__init__(args, stdin=stdin,
+ stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env,
+ **kwargs)
+
+ # If we're on a PTY, we passed the slave half of the PTY to the subprocess.
+ # We want to use the master half on our end from now on. Setting this here
+ # does make some assumptions about the implementation of subprocess, but
+ # those assumptions are pretty minor.
+
+ # Note that if stderr is STDOUT, then self.stderr will be set to None by
+ # this constructor.
+ if stdout_pty is not None:
+ self.stdout = os.fdopen(stdout_pty[0])
+ if stderr_pty is not None:
+ self.stderr = os.fdopen(stderr_pty[0])
+
+ # Insist that unit tests exist for other arguments we don't support.
+ if kwargs:
+ raise ValueError("Unit tests do not test extra args - please add tests")
+
+ def convert_data(self, data):
+ """Convert stdout/stderr data to the correct format for output
+
+ Args:
+ data: Data to convert, or None for ''
+
+ Returns:
+ Converted data, as bytes
+ """
+ if data is None:
+ return b''
+ return data
+
+ def communicate_filter(self, output, input_buf=''):
+ """Interact with process: Read data from stdout and stderr.
+
+ This method runs until end-of-file is reached, then waits for the
+ subprocess to terminate.
+
+ The output function is sent all output from the subprocess and must be
+ defined like this:
+
+ def output([self,] stream, data)
+ Args:
+ stream: the stream the output was received on, which will be
+ sys.stdout or sys.stderr.
+ data: a string containing the data
+
+ Returns:
+ True to terminate the process
+
+ Note: The data read is buffered in memory, so do not use this
+ method if the data size is large or unlimited.
+
+ Args:
+ output: Function to call with each fragment of output.
+
+ Returns:
+ A tuple (stdout, stderr, combined) which is the data received on
+ stdout, stderr and the combined data (interleaved stdout and stderr).
+
+ Note that the interleaved output will only be sensible if you have
+ set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on
+ the timing of the output in the subprocess. If a subprocess flips
+ between stdout and stderr quickly in succession, by the time we come to
+ read the output from each we may see several lines in each, and will read
+ all the stdout lines, then all the stderr lines. So the interleaving
+ may not be correct. In this case you might want to pass
+ stderr=cros_subprocess.STDOUT to the constructor.
+
+ This feature is still useful for subprocesses where stderr is
+ rarely used and indicates an error.
+
+ Note also that if you set stderr to STDOUT, then stderr will be empty
+ and the combined output will just be the same as stdout.
+ """
+
+ read_set = []
+ write_set = []
+ stdout = None # Return
+ stderr = None # Return
+
+ if self.stdin:
+ # Flush stdio buffer. This might block, if the user has
+ # been writing to .stdin in an uncontrolled fashion.
+ self.stdin.flush()
+ if input_buf:
+ write_set.append(self.stdin)
+ else:
+ self.stdin.close()
+ if self.stdout:
+ read_set.append(self.stdout)
+ stdout = bytearray()
+ if self.stderr and self.stderr != self.stdout:
+ read_set.append(self.stderr)
+ stderr = bytearray()
+ combined = bytearray()
+
+ stop_now = False
+ input_offset = 0
+ while read_set or write_set:
+ try:
+ rlist, wlist, _ = select.select(read_set, write_set, [], 0.2)
+ except select.error as e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+
+ if not stay_alive:
+ self.terminate()
+
+ if self.stdin in wlist:
+ # When select has indicated that the file is writable,
+ # we can write up to PIPE_BUF bytes without risk
+ # blocking. POSIX defines PIPE_BUF >= 512
+ chunk = input_buf[input_offset : input_offset + 512]
+ bytes_written = os.write(self.stdin.fileno(), chunk)
+ input_offset += bytes_written
+ if input_offset >= len(input_buf):
+ self.stdin.close()
+ write_set.remove(self.stdin)
+
+ if self.stdout in rlist:
+ data = b''
+ # We will get an error on read if the pty is closed
+ try:
+ data = os.read(self.stdout.fileno(), 1024)
+ except OSError:
+ pass
+ if not len(data):
+ self.stdout.close()
+ read_set.remove(self.stdout)
+ else:
+ stdout += data
+ combined += data
+ if output:
+ stop_now = output(sys.stdout, data)
+ if self.stderr in rlist:
+ data = b''
+ # We will get an error on read if the pty is closed
+ try:
+ data = os.read(self.stderr.fileno(), 1024)
+ except OSError:
+ pass
+ if not len(data):
+ self.stderr.close()
+ read_set.remove(self.stderr)
+ else:
+ stderr += data
+ combined += data
+ if output:
+ stop_now = output(sys.stderr, data)
+ if stop_now:
+ self.terminate()
+
+ # All data exchanged. Translate lists into strings.
+ stdout = self.convert_data(stdout)
+ stderr = self.convert_data(stderr)
+ combined = self.convert_data(combined)
+
+ self.wait()
+ return (stdout, stderr, combined)
+
+
+# Just being a unittest.TestCase gives us 14 public methods. Unless we
+# disable this, we can only have 6 tests in a TestCase. That's not enough.
+#
+# pylint: disable=R0904
+
+class TestSubprocess(unittest.TestCase):
+ """Our simple unit test for this module"""
+
+ class MyOperation:
+ """Provides a operation that we can pass to Popen"""
+ def __init__(self, input_to_send=None):
+ """Constructor to set up the operation and possible input.
+
+ Args:
+ input_to_send: a text string to send when we first get input. We will
+ add \r\n to the string.
+ """
+ self.stdout_data = ''
+ self.stderr_data = ''
+ self.combined_data = ''
+ self.stdin_pipe = None
+ self._input_to_send = input_to_send
+ if input_to_send:
+ pipe = os.pipe()
+ self.stdin_read_pipe = pipe[0]
+ self._stdin_write_pipe = os.fdopen(pipe[1], 'w')
+
+ def output(self, stream, data):
+ """Output handler for Popen. Stores the data for later comparison"""
+ if stream == sys.stdout:
+ self.stdout_data += data
+ if stream == sys.stderr:
+ self.stderr_data += data
+ self.combined_data += data
+
+ # Output the input string if we have one.
+ if self._input_to_send:
+ self._stdin_write_pipe.write(self._input_to_send + '\r\n')
+ self._stdin_write_pipe.flush()
+
+ def _basic_check(self, plist, oper):
+ """Basic checks that the output looks sane."""
+ self.assertEqual(plist[0], oper.stdout_data)
+ self.assertEqual(plist[1], oper.stderr_data)
+ self.assertEqual(plist[2], oper.combined_data)
+
+ # The total length of stdout and stderr should equal the combined length
+ self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2]))
+
+ def test_simple(self):
+ """Simple redirection: Get process list"""
+ oper = TestSubprocess.MyOperation()
+ plist = Popen(['ps']).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+
+ def test_stderr(self):
+ """Check stdout and stderr"""
+ oper = TestSubprocess.MyOperation()
+ cmd = 'echo fred >/dev/stderr && false || echo bad'
+ plist = Popen([cmd], shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], 'bad\r\n')
+ self.assertEqual(plist [1], 'fred\r\n')
+
+ def test_shell(self):
+ """Check with and without shell works"""
+ oper = TestSubprocess.MyOperation()
+ cmd = 'echo test >/dev/stderr'
+ self.assertRaises(OSError, Popen, [cmd], shell=False)
+ plist = Popen([cmd], shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(len(plist [0]), 0)
+ self.assertEqual(plist [1], 'test\r\n')
+
+ def test_list_args(self):
+ """Check with and without shell works using list arguments"""
+ oper = TestSubprocess.MyOperation()
+ cmd = ['echo', 'test', '>/dev/stderr']
+ plist = Popen(cmd, shell=False).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n')
+ self.assertEqual(len(plist [1]), 0)
+
+ oper = TestSubprocess.MyOperation()
+
+ # this should be interpreted as 'echo' with the other args dropped
+ cmd = ['echo', 'test', '>/dev/stderr']
+ plist = Popen(cmd, shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], '\r\n')
+
+ def test_cwd(self):
+ """Check we can change directory"""
+ for shell in (False, True):
+ oper = TestSubprocess.MyOperation()
+ plist = Popen('pwd', shell=shell, cwd='/tmp').communicate_filter(
+ oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], '/tmp\r\n')
+
+ def test_env(self):
+ """Check we can change environment"""
+ for add in (False, True):
+ oper = TestSubprocess.MyOperation()
+ env = os.environ
+ if add:
+ env ['FRED'] = 'fred'
+ cmd = 'echo $FRED'
+ plist = Popen(cmd, shell=True, env=env).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n')
+
+ def test_extra_args(self):
+ """Check we can't add extra arguments"""
+ self.assertRaises(ValueError, Popen, 'true', close_fds=False)
+
+ def test_basic_input(self):
+ """Check that incremental input works
+
+ We set up a subprocess which will prompt for name. When we see this prompt
+ we send the name as input to the process. It should then print the name
+ properly to stdout.
+ """
+ oper = TestSubprocess.MyOperation('Flash')
+ prompt = 'What is your name?: '
+ cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt
+ plist = Popen([cmd], stdin=oper.stdin_read_pipe,
+ shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(len(plist [1]), 0)
+ self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n')
+
+ def test_isatty(self):
+ """Check that ptys appear as terminals to the subprocess"""
+ oper = TestSubprocess.MyOperation()
+ cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; '
+ 'else echo "not %d" >&%d; fi;')
+ both_cmds = ''
+ for fd in (1, 2):
+ both_cmds += cmd % (fd, fd, fd, fd, fd)
+ plist = Popen(both_cmds, shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], 'terminal 1\r\n')
+ self.assertEqual(plist [1], 'terminal 2\r\n')
+
+ # Now try with PIPE and make sure it is not a terminal
+ oper = TestSubprocess.MyOperation()
+ plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ shell=True).communicate_filter(oper.output)
+ self._basic_check(plist, oper)
+ self.assertEqual(plist [0], 'not 1\n')
+ self.assertEqual(plist [1], 'not 2\n')
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/tools/u_boot_pylib/gitutil.py b/tools/u_boot_pylib/gitutil.py
new file mode 100644
index 00000000000..34b4dbb4839
--- /dev/null
+++ b/tools/u_boot_pylib/gitutil.py
@@ -0,0 +1,886 @@
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright (c) 2011 The Chromium OS Authors.
+#
+
+"""Basic utilities for running the git command-line tool from Python"""
+
+import os
+import sys
+
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+
+# True to use --no-decorate - we check this in setup()
+USE_NO_DECORATE = True
+
+
+def log_cmd(commit_range, git_dir=None, oneline=False, reverse=False,
+ count=None, decorate=False):
+ """Create a command to perform a 'git log'
+
+ Args:
+ commit_range (str): Range expression to use for log, None for none
+ git_dir (str): Path to git repository (None to use default)
+ oneline (bool): True to use --oneline, else False
+ reverse (bool): True to reverse the log (--reverse)
+ count (int or None): Number of commits to list, or None for no limit
+ decorate (bool): True to use --decorate
+
+ Return:
+ List containing command and arguments to run
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd += ['--git-dir', git_dir]
+ cmd += ['--no-pager', 'log', '--no-color']
+ if oneline:
+ cmd.append('--oneline')
+ if USE_NO_DECORATE and not decorate:
+ cmd.append('--no-decorate')
+ if decorate:
+ cmd.append('--decorate')
+ if reverse:
+ cmd.append('--reverse')
+ if count is not None:
+ cmd.append(f'-n{count}')
+ if commit_range:
+ cmd.append(commit_range)
+
+ # Add this in case we have a branch with the same name as a directory.
+ # This avoids messages like this, for example:
+ # fatal: ambiguous argument 'test': both revision and filename
+ cmd.append('--')
+ return cmd
+
+
+def count_commits_to_branch(branch, git_dir=None, end=None):
+ """Returns number of commits between HEAD and the tracking branch.
+
+ This looks back to the tracking branch and works out the number of commits
+ since then.
+
+ Args:
+ branch (str or None): Branch to count from (None for current branch)
+ git_dir (str): Path to git repository (None to use default)
+ end (str): End commit to stop before
+
+ Return:
+ Number of patches that exist on top of the branch
+ """
+ if end:
+ rev_range = f'{end}..{branch}'
+ elif branch:
+ us, msg = get_upstream(git_dir or '.git', branch)
+ if not us:
+ raise ValueError(msg)
+ rev_range = f'{us}..{branch}'
+ else:
+ rev_range = '@{upstream}..'
+ cmd = log_cmd(rev_range, git_dir=git_dir, oneline=True)
+ result = command.run_one(*cmd, capture=True, capture_stderr=True,
+ oneline=True, raise_on_error=False)
+ if result.return_code:
+ raise ValueError(
+ f'Failed to determine upstream: {result.stderr.strip()}')
+ patch_count = len(result.stdout.splitlines())
+ return patch_count
+
+
+def name_revision(commit_hash):
+ """Gets the revision name for a commit
+
+ Args:
+ commit_hash (str): Commit hash to look up
+
+ Return:
+ Name of revision, if any, else None
+ """
+ stdout = command.output_one_line('git', 'name-rev', commit_hash)
+ if not stdout:
+ return None
+
+ # We expect a commit, a space, then a revision name
+ name = stdout.split()[1].strip()
+ return name
+
+
+def guess_upstream(git_dir, branch):
+ """Tries to guess the upstream for a branch
+
+ This lists out top commits on a branch and tries to find a suitable
+ upstream. It does this by looking for the first commit where
+ 'git name-rev' returns a plain branch name, with no ! or ^ modifiers.
+
+ Args:
+ git_dir (str): Git directory containing repo
+ branch (str): Name of branch
+
+ Returns:
+ Tuple:
+ Name of upstream branch (e.g. 'upstream/master') or None if none
+ Warning/error message, or None if none
+ """
+ cmd = log_cmd(branch, git_dir=git_dir, oneline=True, count=100,
+ decorate=True)
+ result = command.run_one(*cmd, capture=True, capture_stderr=True,
+ raise_on_error=False)
+ if result.return_code:
+ return None, f"Branch '{branch}' not found"
+ for line in result.stdout.splitlines()[1:]:
+ parts = line.split(maxsplit=1)
+ if len(parts) >= 2 and parts[1].startswith('('):
+ commit_hash = parts[0]
+ name = name_revision(commit_hash)
+ if '~' not in name and '^' not in name:
+ if name.startswith('remotes/'):
+ name = name[8:]
+ return name, f"Guessing upstream as '{name}'"
+ return None, f"Cannot find a suitable upstream for branch '{branch}'"
+
+
+def get_upstream(git_dir, branch):
+ """Returns the name of the upstream for a branch
+
+ Args:
+ git_dir (str): Git directory containing repo
+ branch (str): Name of branch
+
+ Returns:
+ Tuple:
+ Name of upstream branch (e.g. 'upstream/master') or None if none
+ Warning/error message, or None if none
+ """
+ try:
+ remote = command.output_one_line('git', '--git-dir', git_dir, 'config',
+ f'branch.{branch}.remote')
+ merge = command.output_one_line('git', '--git-dir', git_dir, 'config',
+ f'branch.{branch}.merge')
+ except command.CommandExc:
+ upstream, msg = guess_upstream(git_dir, branch)
+ return upstream, msg
+
+ if remote == '.':
+ return merge, None
+ if remote and merge:
+ # Drop the initial refs/heads from merge
+ leaf = merge.split('/', maxsplit=2)[2:]
+ return f'{remote}/{"/".join(leaf)}', None
+ raise ValueError("Cannot determine upstream branch for branch "
+ f"'{branch}' remote='{remote}', merge='{merge}'")
+
+
+def get_range_in_branch(git_dir, branch, include_upstream=False):
+ """Returns an expression for the commits in the given branch.
+
+ Args:
+ git_dir (str): Directory containing git repo
+ branch (str): Name of branch
+ include_upstream (bool): Include the upstream commit as well
+ Return:
+ Expression in the form 'upstream..branch' which can be used to
+ access the commits. If the branch does not exist, returns None.
+ """
+ upstream, msg = get_upstream(git_dir, branch)
+ if not upstream:
+ return None, msg
+ rstr = f"{upstream}{'~' if include_upstream else ''}..{branch}"
+ return rstr, msg
+
+
+def count_commits_in_range(git_dir, range_expr):
+ """Returns the number of commits in the given range.
+
+ Args:
+ git_dir (str): Directory containing git repo
+ range_expr (str): Range to check
+ Return:
+ Number of patches that exist in the supplied range or None if none
+ were found
+ """
+ cmd = log_cmd(range_expr, git_dir=git_dir, oneline=True)
+ result = command.run_one(*cmd, capture=True, capture_stderr=True,
+ raise_on_error=False)
+ if result.return_code:
+ return None, f"Range '{range_expr}' not found or is invalid"
+ patch_count = len(result.stdout.splitlines())
+ return patch_count, None
+
+
+def count_commits_in_branch(git_dir, branch, include_upstream=False):
+ """Returns the number of commits in the given branch.
+
+ Args:
+ git_dir (str): Directory containing git repo
+ branch (str): Name of branch
+ include_upstream (bool): Include the upstream commit as well
+ Return:
+ Number of patches that exist on top of the branch, or None if the
+ branch does not exist.
+ """
+ range_expr, msg = get_range_in_branch(git_dir, branch, include_upstream)
+ if not range_expr:
+ return None, msg
+ return count_commits_in_range(git_dir, range_expr)
+
+
+def count_commits(commit_range):
+ """Returns the number of commits in the given range.
+
+ Args:
+ commit_range (str): Range of commits to count (e.g. 'HEAD..base')
+ Return:
+ Number of patches that exist on top of the branch
+ """
+ pipe = [log_cmd(commit_range, oneline=True),
+ ['wc', '-l']]
+ stdout = command.run_pipe(pipe, capture=True, oneline=True).stdout
+ patch_count = int(stdout)
+ return patch_count
+
+
+def checkout(commit_hash, git_dir=None, work_tree=None, force=False):
+ """Checkout the selected commit for this build
+
+ Args:
+ commit_hash (str): Commit hash to check out
+ git_dir (str): Directory containing git repo, or None for current dir
+ work_tree (str): Git worktree to use, or None if none
+ force (bool): True to force the checkout (git checkout -f)
+ """
+ pipe = ['git']
+ if git_dir:
+ pipe.extend(['--git-dir', git_dir])
+ if work_tree:
+ pipe.extend(['--work-tree', work_tree])
+ pipe.append('checkout')
+ if force:
+ pipe.append('-f')
+ pipe.append(commit_hash)
+ result = command.run_pipe([pipe], capture=True, raise_on_error=False,
+ capture_stderr=True)
+ if result.return_code != 0:
+ raise OSError(f'git checkout ({pipe}): {result.stderr}')
+
+
+def clone(repo, output_dir):
+ """Clone a repo
+
+ Args:
+ repo (str): Repo to clone (e.g. web address)
+ output_dir (str): Directory to close into
+ """
+ result = command.run_one('git', 'clone', repo, '.', capture=True,
+ cwd=output_dir, capture_stderr=True)
+ if result.return_code != 0:
+ raise OSError(f'git clone: {result.stderr}')
+
+
+def fetch(git_dir=None, work_tree=None):
+ """Fetch from the origin repo
+
+ Args:
+ git_dir (str): Directory containing git repo, or None for current dir
+ work_tree (str or None): Git worktree to use, or None if none
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd.extend(['--git-dir', git_dir])
+ if work_tree:
+ cmd.extend(['--work-tree', work_tree])
+ cmd.append('fetch')
+ result = command.run_one(*cmd, capture=True, capture_stderr=True)
+ if result.return_code != 0:
+ raise OSError(f'git fetch: {result.stderr}')
+
+
+def check_worktree_is_available(git_dir):
+ """Check if git-worktree functionality is available
+
+ Args:
+ git_dir (str): The repository to test in
+
+ Returns:
+ True if git-worktree commands will work, False otherwise.
+ """
+ result = command.run_one('git', '--git-dir', git_dir, 'worktree', 'list',
+ capture=True, capture_stderr=True,
+ raise_on_error=False)
+ return result.return_code == 0
+
+
+def add_worktree(git_dir, output_dir, commit_hash=None):
+ """Create and checkout a new git worktree for this build
+
+ Args:
+ git_dir (str): The repository to checkout the worktree from
+ output_dir (str): Path for the new worktree
+ commit_hash (str): Commit hash to checkout
+ """
+ # We need to pass --detach to avoid creating a new branch
+ cmd = ['git', '--git-dir', git_dir, 'worktree', 'add', '.', '--detach']
+ if commit_hash:
+ cmd.append(commit_hash)
+ result = command.run_one(*cmd, capture=True, cwd=output_dir,
+ capture_stderr=True)
+ if result.return_code != 0:
+ raise OSError(f'git worktree add: {result.stderr}')
+
+
+def prune_worktrees(git_dir):
+ """Remove administrative files for deleted worktrees
+
+ Args:
+ git_dir (str): The repository whose deleted worktrees should be pruned
+ """
+ result = command.run_one('git', '--git-dir', git_dir, 'worktree', 'prune',
+ capture=True, capture_stderr=True)
+ if result.return_code != 0:
+ raise OSError(f'git worktree prune: {result.stderr}')
+
+
+def create_patches(branch, start, count, ignore_binary, series, signoff=True,
+ git_dir=None, cwd=None):
+ """Create a series of patches from the top of the current branch.
+
+ The patch files are written to the current directory using
+ git format-patch.
+
+ Args:
+ branch (str): Branch to create patches from (None for current branch)
+ start (int): Commit to start from: 0=HEAD, 1=next one, etc.
+ count (int): number of commits to include
+ ignore_binary (bool): Don't generate patches for binary files
+ series (Series): Series object for this series (set of patches)
+ signoff (bool): True to add signoff lines automatically
+ git_dir (str): Path to git repository (None to use default)
+ cwd (str): Path to use for git operations
+ Return:
+ Filename of cover letter (None if none)
+ List of filenames of patch files
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd += ['--git-dir', git_dir]
+ cmd += ['format-patch', '-M']
+ if signoff:
+ cmd.append('--signoff')
+ if ignore_binary:
+ cmd.append('--no-binary')
+ if series.get('cover'):
+ cmd.append('--cover-letter')
+ prefix = series.GetPatchPrefix()
+ if prefix:
+ cmd += [f'--subject-prefix={prefix}']
+ brname = branch or 'HEAD'
+ cmd += [f'{brname}~{start + count}..{brname}~{start}']
+
+ stdout = command.run_list(cmd, cwd=cwd)
+ files = stdout.splitlines()
+
+ # We have an extra file if there is a cover letter
+ if series.get('cover'):
+ return files[0], files[1:]
+ return None, files
+
+
+def build_email_list(in_list, alias, tag=None, warn_on_error=True):
+ """Build a list of email addresses based on an input list.
+
+ Takes a list of email addresses and aliases, and turns this into a list
+ of only email address, by resolving any aliases that are present.
+
+ If the tag is given, then each email address is prepended with this
+ tag and a space. If the tag starts with a minus sign (indicating a
+ command line parameter) then the email address is quoted.
+
+ Args:
+ in_list (list of str): List of aliases/email addresses
+ alias (dict): Alias dictionary:
+ key: alias
+ value: list of aliases or email addresses
+ tag (str): Text to put before each address
+ warn_on_error (bool): True to raise an error when an alias fails to
+ match, False to just print a message.
+
+ Returns:
+ List of email addresses
+
+ >>> alias = {}
+ >>> alias['fred'] = ['f.bloggs@napier.co.nz']
+ >>> alias['john'] = ['j.bloggs@napier.co.nz']
+ >>> alias['mary'] = ['Mary Poppins <m.poppins@cloud.net>']
+ >>> alias['boys'] = ['fred', ' john']
+ >>> alias['all'] = ['fred ', 'john', ' mary ']
+ >>> build_email_list(['john', 'mary'], alias, None)
+ ['j.bloggs@napier.co.nz', 'Mary Poppins <m.poppins@cloud.net>']
+ >>> build_email_list(['john', 'mary'], alias, '--to')
+ ['--to "j.bloggs@napier.co.nz"', \
+'--to "Mary Poppins <m.poppins@cloud.net>"']
+ >>> build_email_list(['john', 'mary'], alias, 'Cc')
+ ['Cc j.bloggs@napier.co.nz', 'Cc Mary Poppins <m.poppins@cloud.net>']
+ """
+ raw = []
+ for item in in_list:
+ raw += lookup_email(item, alias, warn_on_error=warn_on_error)
+ result = []
+ for item in raw:
+ if item not in result:
+ result.append(item)
+ if tag:
+ return [x for email in result for x in (tag, email)]
+ return result
+
+
+def check_suppress_cc_config():
+ """Check if sendemail.suppresscc is configured correctly.
+
+ Returns:
+ bool: True if the option is configured correctly, False otherwise.
+ """
+ suppresscc = command.output_one_line(
+ 'git', 'config', 'sendemail.suppresscc', raise_on_error=False)
+
+ # Other settings should be fine.
+ if suppresscc in ('all', 'cccmd'):
+ col = terminal.Color()
+
+ print(col.build(col.RED, 'error') +
+ f': git config sendemail.suppresscc set to {suppresscc}\n' +
+ ' patman needs --cc-cmd to be run to set the cc list.\n' +
+ ' Please run:\n' +
+ ' git config --unset sendemail.suppresscc\n' +
+ ' Or read the man page:\n' +
+ ' git send-email --help\n' +
+ ' and set an option that runs --cc-cmd\n')
+ return False
+
+ return True
+
+
+def email_patches(series, cover_fname, args, dry_run, warn_on_error, cc_fname,
+ alias, self_only=False, in_reply_to=None, thread=False,
+ smtp_server=None, cwd=None):
+ """Email a patch series.
+
+ Args:
+ series (Series): Series object containing destination info
+ cover_fname (str or None): filename of cover letter
+ args (list of str): list of filenames of patch files
+ dry_run (bool): Just return the command that would be run
+ warn_on_error (bool): True to print a warning when an alias fails to
+ match, False to ignore it.
+ cc_fname (str): Filename of Cc file for per-commit Cc
+ alias (dict): Alias dictionary:
+ key: alias
+ value: list of aliases or email addresses
+ self_only (bool): True to just email to yourself as a test
+ in_reply_to (str or None): If set we'll pass this to git as
+ --in-reply-to - should be a message ID that this is in reply to.
+ thread (bool): True to add --thread to git send-email (make
+ all patches reply to cover-letter or first patch in series)
+ smtp_server (str or None): SMTP server to use to send patches
+ cwd (str): Path to use for patch files (None to use current dir)
+
+ Returns:
+ Git command that was/would be run
+
+ # For the duration of this doctest pretend that we ran patman with ./patman
+ >>> _old_argv0 = sys.argv[0]
+ >>> sys.argv[0] = './patman'
+
+ >>> alias = {}
+ >>> alias['fred'] = ['f.bloggs@napier.co.nz']
+ >>> alias['john'] = ['j.bloggs@napier.co.nz']
+ >>> alias['mary'] = ['m.poppins@cloud.net']
+ >>> alias['boys'] = ['fred', ' john']
+ >>> alias['all'] = ['fred ', 'john', ' mary ']
+ >>> alias[os.getenv('USER')] = ['this-is-me@me.com']
+ >>> series = {}
+ >>> series['to'] = ['fred']
+ >>> series['cc'] = ['mary']
+ >>> email_patches(series, 'cover', ['p1', 'p2'], True, True, 'cc-fname', \
+ False, alias)
+ 'git send-email --annotate --to "f.bloggs@napier.co.nz" --cc \
+"m.poppins@cloud.net" --cc-cmd "./patman send --cc-cmd cc-fname" cover p1 p2'
+ >>> email_patches(series, None, ['p1'], True, True, 'cc-fname', False, \
+ alias)
+ 'git send-email --annotate --to "f.bloggs@napier.co.nz" --cc \
+"m.poppins@cloud.net" --cc-cmd "./patman send --cc-cmd cc-fname" p1'
+ >>> series['cc'] = ['all']
+ >>> email_patches(series, 'cover', ['p1', 'p2'], True, True, 'cc-fname', \
+ True, alias)
+ 'git send-email --annotate --to "this-is-me@me.com" --cc-cmd "./patman \
+send --cc-cmd cc-fname" cover p1 p2'
+ >>> email_patches(series, 'cover', ['p1', 'p2'], True, True, 'cc-fname', \
+ False, alias)
+ 'git send-email --annotate --to "f.bloggs@napier.co.nz" --cc \
+"f.bloggs@napier.co.nz" --cc "j.bloggs@napier.co.nz" --cc \
+"m.poppins@cloud.net" --cc-cmd "./patman send --cc-cmd cc-fname" cover p1 p2'
+
+ # Restore argv[0] since we clobbered it.
+ >>> sys.argv[0] = _old_argv0
+ """
+ to = build_email_list(series.get('to'), alias, '--to', warn_on_error)
+ if not to:
+ if not command.output('git', 'config', 'sendemail.to',
+ raise_on_error=False):
+ print("No recipient.\n"
+ "Please add something like this to a commit\n"
+ "Series-to: Fred Bloggs <f.blogs@napier.co.nz>\n"
+ "Or do something like this\n"
+ "git config sendemail.to u-boot@lists.denx.de")
+ return None
+ cc = build_email_list(list(set(series.get('cc')) - set(series.get('to'))),
+ alias, '--cc', warn_on_error)
+ if self_only:
+ to = build_email_list([os.getenv('USER')], '--to', alias,
+ warn_on_error)
+ cc = []
+ cmd = ['git', 'send-email', '--annotate']
+ if smtp_server:
+ cmd.append(f'--smtp-server={smtp_server}')
+ if in_reply_to:
+ cmd.append(f'--in-reply-to="{in_reply_to}"')
+ if thread:
+ cmd.append('--thread')
+
+ cmd += to
+ cmd += cc
+ cmd += ['--cc-cmd', f'{sys.argv[0]} send --cc-cmd {cc_fname}']
+ if cover_fname:
+ cmd.append(cover_fname)
+ cmd += args
+ if not dry_run:
+ command.run(*cmd, capture=False, capture_stderr=False, cwd=cwd)
+ return' '.join([f'"{x}"' if ' ' in x and '"' not in x else x
+ for x in cmd])
+
+
+def lookup_email(lookup_name, alias, warn_on_error=True, level=0):
+ """If an email address is an alias, look it up and return the full name
+
+ TODO: Why not just use git's own alias feature?
+
+ Args:
+ lookup_name (str): Alias or email address to look up
+ alias (dict): Alias dictionary
+ key: alias
+ value: list of aliases or email addresses
+ warn_on_error (bool): True to print a warning when an alias fails to
+ match, False to ignore it.
+ level (int): Depth of alias stack, used to detect recusion/loops
+
+ Returns:
+ tuple:
+ list containing a list of email addresses
+
+ Raises:
+ OSError if a recursive alias reference was found
+ ValueError if an alias was not found
+
+ >>> alias = {}
+ >>> alias['fred'] = ['f.bloggs@napier.co.nz']
+ >>> alias['john'] = ['j.bloggs@napier.co.nz']
+ >>> alias['mary'] = ['m.poppins@cloud.net']
+ >>> alias['boys'] = ['fred', ' john', 'f.bloggs@napier.co.nz']
+ >>> alias['all'] = ['fred ', 'john', ' mary ']
+ >>> alias['loop'] = ['other', 'john', ' mary ']
+ >>> alias['other'] = ['loop', 'john', ' mary ']
+ >>> lookup_email('mary', alias)
+ ['m.poppins@cloud.net']
+ >>> lookup_email('arthur.wellesley@howe.ro.uk', alias)
+ ['arthur.wellesley@howe.ro.uk']
+ >>> lookup_email('boys', alias)
+ ['f.bloggs@napier.co.nz', 'j.bloggs@napier.co.nz']
+ >>> lookup_email('all', alias)
+ ['f.bloggs@napier.co.nz', 'j.bloggs@napier.co.nz', 'm.poppins@cloud.net']
+ >>> lookup_email('odd', alias)
+ Alias 'odd' not found
+ []
+ >>> lookup_email('loop', alias)
+ Traceback (most recent call last):
+ ...
+ OSError: Recursive email alias at 'other'
+ >>> lookup_email('odd', alias, warn_on_error=False)
+ []
+ >>> # In this case the loop part will effectively be ignored.
+ >>> lookup_email('loop', alias, warn_on_error=False)
+ Recursive email alias at 'other'
+ Recursive email alias at 'john'
+ Recursive email alias at 'mary'
+ ['j.bloggs@napier.co.nz', 'm.poppins@cloud.net']
+ """
+ lookup_name = lookup_name.strip()
+ if '@' in lookup_name: # Perhaps a real email address
+ return [lookup_name]
+
+ lookup_name = lookup_name.lower()
+ col = terminal.Color()
+
+ out_list = []
+ if level > 10:
+ msg = f"Recursive email alias at '{lookup_name}'"
+ if warn_on_error:
+ raise OSError(msg)
+ print(col.build(col.RED, msg))
+ return out_list
+
+ if lookup_name:
+ if lookup_name not in alias:
+ msg = f"Alias '{lookup_name}' not found"
+ if warn_on_error:
+ print(col.build(col.RED, msg))
+ return out_list
+ for item in alias[lookup_name]:
+ todo = lookup_email(item, alias, warn_on_error, level + 1)
+ for new_item in todo:
+ if new_item not in out_list:
+ out_list.append(new_item)
+
+ return out_list
+
+
+def get_top_level():
+ """Return name of top-level directory for this git repo.
+
+ Returns:
+ str: Full path to git top-level directory, or None if not found
+
+ This test makes sure that we are running tests in the right subdir
+
+ >>> os.path.realpath(os.path.dirname(__file__)) == \
+ os.path.join(get_top_level(), 'tools', 'patman')
+ True
+ """
+ result = command.run_one(
+ 'git', 'rev-parse', '--show-toplevel', oneline=True, capture=True,
+ capture_stderr=True, raise_on_error=False)
+ if result.return_code:
+ return None
+ return result.stdout.strip()
+
+
+def get_alias_file():
+ """Gets the name of the git alias file.
+
+ Returns:
+ str: Filename of git alias file, or None if none
+ """
+ fname = command.output_one_line('git', 'config', 'sendemail.aliasesfile',
+ raise_on_error=False)
+ if not fname:
+ return None
+
+ fname = os.path.expanduser(fname.strip())
+ if os.path.isabs(fname):
+ return fname
+
+ return os.path.join(get_top_level() or '', fname)
+
+
+def get_default_user_name():
+ """Gets the user.name from .gitconfig file.
+
+ Returns:
+ User name found in .gitconfig file, or None if none
+ """
+ uname = command.output_one_line('git', 'config', '--global', '--includes',
+ 'user.name')
+ return uname
+
+
+def get_default_user_email():
+ """Gets the user.email from the global .gitconfig file.
+
+ Returns:
+ User's email found in .gitconfig file, or None if none
+ """
+ uemail = command.output_one_line('git', 'config', '--global', '--includes',
+ 'user.email')
+ return uemail
+
+
+def get_default_subject_prefix():
+ """Gets the format.subjectprefix from local .git/config file.
+
+ Returns:
+ Subject prefix found in local .git/config file, or None if none
+ """
+ sub_prefix = command.output_one_line(
+ 'git', 'config', 'format.subjectprefix', raise_on_error=False)
+
+ return sub_prefix
+
+
+def setup():
+ """setup() - Set up git utils, by reading the alias files."""
+ # Check for a git alias file also
+ global USE_NO_DECORATE
+
+ cmd = log_cmd(None, count=0)
+ USE_NO_DECORATE = (command.run_one(*cmd, raise_on_error=False)
+ .return_code == 0)
+
+
+def get_hash(spec, git_dir=None):
+ """Get the hash of a commit
+
+ Args:
+ spec (str): Git commit to show, e.g. 'my-branch~12'
+ git_dir (str): Path to git repository (None to use default)
+
+ Returns:
+ str: Hash of commit
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd += ['--git-dir', git_dir]
+ cmd += ['show', '-s', '--pretty=format:%H', spec]
+ return command.output_one_line(*cmd)
+
+
+def get_head():
+ """Get the hash of the current HEAD
+
+ Returns:
+ Hash of HEAD
+ """
+ return get_hash('HEAD')
+
+
+def get_branch(git_dir=None):
+ """Get the branch we are currently on
+
+ Return:
+ str: branch name, or None if none
+ git_dir (str): Path to git repository (None to use default)
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd += ['--git-dir', git_dir]
+ cmd += ['rev-parse', '--abbrev-ref', 'HEAD']
+ out = command.output_one_line(*cmd, raise_on_error=False)
+ if out == 'HEAD':
+ return None
+ return out
+
+
+def check_dirty(git_dir=None, work_tree=None):
+ """Check if the tree is dirty
+
+ Args:
+ git_dir (str): Path to git repository (None to use default)
+ work_tree (str): Git worktree to use, or None if none
+
+ Return:
+ str: List of dirty filenames and state
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd += ['--git-dir', git_dir]
+ if work_tree:
+ cmd += ['--work-tree', work_tree]
+ cmd += ['status', '--porcelain', '--untracked-files=no']
+ return command.output(*cmd).splitlines()
+
+
+def check_branch(name, git_dir=None):
+ """Check if a branch exists
+
+ Args:
+ name (str): Name of the branch to check
+ git_dir (str): Path to git repository (None to use default)
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd += ['--git-dir', git_dir]
+ cmd += ['branch', '--list', name]
+
+ # This produces ' <name>' or '* <name>'
+ out = command.output(*cmd).rstrip()
+ return out[2:] == name
+
+
+def rename_branch(old_name, name, git_dir=None):
+ """Check if a branch exists
+
+ Args:
+ old_name (str): Name of the branch to rename
+ name (str): New name for the branch
+ git_dir (str): Path to git repository (None to use default)
+
+ Return:
+ str: Output from command
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd += ['--git-dir', git_dir]
+ cmd += ['branch', '--move', old_name, name]
+
+ # This produces ' <name>' or '* <name>'
+ return command.output(*cmd).rstrip()
+
+
+def get_commit_message(commit, git_dir=None):
+ """Gets the commit message for a commit
+
+ Args:
+ commit (str): commit to check
+ git_dir (str): Path to git repository (None to use default)
+
+ Return:
+ list of str: Lines from the commit message
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd += ['--git-dir', git_dir]
+ cmd += ['show', '--quiet', commit]
+
+ out = command.output(*cmd)
+ # the header is followed by a blank line
+ lines = out.splitlines()
+ empty = lines.index('')
+ msg = lines[empty + 1:]
+ unindented = [line[4:] for line in msg]
+
+ return unindented
+
+
+def show_commit(commit, msg=True, diffstat=False, patch=False, colour=True,
+ git_dir=None):
+ """Runs 'git show' and returns the output
+
+ Args:
+ commit (str): commit to check
+ msg (bool): Show the commit message
+ diffstat (bool): True to include the diffstat
+ patch (bool): True to include the patch
+ colour (bool): True to force use of colour
+ git_dir (str): Path to git repository (None to use default)
+
+ Return:
+ list of str: Lines from the commit message
+ """
+ cmd = ['git']
+ if git_dir:
+ cmd += ['--git-dir', git_dir]
+ cmd += ['show']
+ if colour:
+ cmd.append('--color')
+ if not msg:
+ cmd.append('--oneline')
+ if diffstat:
+ cmd.append('--stat')
+ else:
+ cmd.append('--quiet')
+ if patch:
+ cmd.append('--patch')
+ cmd.append(commit)
+
+ return command.output(*cmd)
+
+
+if __name__ == "__main__":
+ import doctest
+
+ doctest.testmod()
diff --git a/tools/u_boot_pylib/pyproject.toml b/tools/u_boot_pylib/pyproject.toml
new file mode 100644
index 00000000000..ce2355084ac
--- /dev/null
+++ b/tools/u_boot_pylib/pyproject.toml
@@ -0,0 +1,25 @@
+[build-system]
+requires = ["setuptools>=61.0"]
+build-backend = "setuptools.build_meta"
+
+[project]
+name = "u_boot_pylib"
+version = "0.0.6"
+authors = [
+ { name="Simon Glass", email="sjg@chromium.org" },
+]
+description = "U-Boot python library"
+readme = "README.rst"
+requires-python = ">=3.7"
+classifiers = [
+ "Programming Language :: Python :: 3",
+ "License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)",
+ "Operating System :: OS Independent",
+]
+
+[project.urls]
+"Homepage" = "https://docs.u-boot.org"
+"Bug Tracker" = "https://source.denx.de/groups/u-boot/-/issues"
+
+[tool.setuptools.package-data]
+u_boot_pylib = ["*.rst"]
diff --git a/tools/u_boot_pylib/requirements.txt b/tools/u_boot_pylib/requirements.txt
new file mode 100644
index 00000000000..1087e6f2857
--- /dev/null
+++ b/tools/u_boot_pylib/requirements.txt
@@ -0,0 +1 @@
+concurrencytest==0.1.2
diff --git a/tools/u_boot_pylib/terminal.py b/tools/u_boot_pylib/terminal.py
new file mode 100644
index 00000000000..69c183e85e5
--- /dev/null
+++ b/tools/u_boot_pylib/terminal.py
@@ -0,0 +1,346 @@
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright (c) 2011 The Chromium OS Authors.
+#
+
+"""Terminal utilities
+
+This module handles terminal interaction including ANSI color codes.
+"""
+
+from contextlib import contextmanager
+from io import StringIO
+import os
+import re
+import shutil
+import subprocess
+import sys
+
+# Selection of when we want our output to be colored
+COLOR_IF_TERMINAL, COLOR_ALWAYS, COLOR_NEVER = range(3)
+
+# Initially, we are set up to print to the terminal
+print_test_mode = False
+print_test_list = []
+
+# The length of the last line printed without a newline
+last_print_len = None
+
+# credit:
+# stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python
+ansi_escape = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
+
+# True if we are capturing console output
+CAPTURING = False
+
+# Set this to False to disable output-capturing globally
+USE_CAPTURE = True
+
+
+class PrintLine:
+ """A line of text output
+
+ Members:
+ text: Text line that was printed
+ newline: True to output a newline after the text
+ colour: Text colour to use
+ """
+ def __init__(self, text, colour, newline=True, bright=True):
+ self.text = text
+ self.newline = newline
+ self.colour = colour
+ self.bright = bright
+
+ def __eq__(self, other):
+ return (self.text == other.text and
+ self.newline == other.newline and
+ self.colour == other.colour and
+ self.bright == other.bright)
+
+ def __str__(self):
+ return ("newline=%s, colour=%s, bright=%d, text='%s'" %
+ (self.newline, self.colour, self.bright, self.text))
+
+
+def calc_ascii_len(text):
+ """Calculate the length of a string, ignoring any ANSI sequences
+
+ When displayed on a terminal, ANSI sequences don't take any space, so we
+ need to ignore them when calculating the length of a string.
+
+ Args:
+ text: Text to check
+
+ Returns:
+ Length of text, after skipping ANSI sequences
+
+ >>> col = Color(COLOR_ALWAYS)
+ >>> text = col.build(Color.RED, 'abc')
+ >>> len(text)
+ 14
+ >>> calc_ascii_len(text)
+ 3
+ >>>
+ >>> text += 'def'
+ >>> calc_ascii_len(text)
+ 6
+ >>> text += col.build(Color.RED, 'abc')
+ >>> calc_ascii_len(text)
+ 9
+ """
+ result = ansi_escape.sub('', text)
+ return len(result)
+
+def trim_ascii_len(text, size):
+ """Trim a string containing ANSI sequences to the given ASCII length
+
+ The string is trimmed with ANSI sequences being ignored for the length
+ calculation.
+
+ >>> col = Color(COLOR_ALWAYS)
+ >>> text = col.build(Color.RED, 'abc')
+ >>> len(text)
+ 14
+ >>> calc_ascii_len(trim_ascii_len(text, 4))
+ 3
+ >>> calc_ascii_len(trim_ascii_len(text, 2))
+ 2
+ >>> text += 'def'
+ >>> calc_ascii_len(trim_ascii_len(text, 4))
+ 4
+ >>> text += col.build(Color.RED, 'ghi')
+ >>> calc_ascii_len(trim_ascii_len(text, 7))
+ 7
+ """
+ if calc_ascii_len(text) < size:
+ return text
+ pos = 0
+ out = ''
+ left = size
+
+ # Work through each ANSI sequence in turn
+ for m in ansi_escape.finditer(text):
+ # Find the text before the sequence and add it to our string, making
+ # sure it doesn't overflow
+ before = text[pos:m.start()]
+ toadd = before[:left]
+ out += toadd
+
+ # Figure out how much non-ANSI space we have left
+ left -= len(toadd)
+
+ # Add the ANSI sequence and move to the position immediately after it
+ out += m.group()
+ pos = m.start() + len(m.group())
+
+ # Deal with text after the last ANSI sequence
+ after = text[pos:]
+ toadd = after[:left]
+ out += toadd
+
+ return out
+
+
+def tprint(text='', newline=True, colour=None, limit_to_line=False,
+ bright=True, back=None, col=None):
+ """Handle a line of output to the terminal.
+
+ In test mode this is recorded in a list. Otherwise it is output to the
+ terminal.
+
+ Args:
+ text: Text to print
+ newline: True to add a new line at the end of the text
+ colour: Colour to use for the text
+ """
+ global last_print_len
+
+ if print_test_mode:
+ print_test_list.append(PrintLine(text, colour, newline, bright))
+ else:
+ if colour is not None:
+ if not col:
+ col = Color()
+ text = col.build(colour, text, bright=bright, back=back)
+ if newline:
+ print(text)
+ last_print_len = None
+ else:
+ if limit_to_line:
+ cols = shutil.get_terminal_size().columns
+ text = trim_ascii_len(text, cols)
+ print(text, end='', flush=True)
+ last_print_len = calc_ascii_len(text)
+
+def print_clear():
+ """Clear a previously line that was printed with no newline"""
+ global last_print_len
+
+ if last_print_len:
+ if print_test_mode:
+ print_test_list.append(PrintLine(None, None, None, None))
+ else:
+ print('\r%s\r' % (' '* last_print_len), end='', flush=True)
+ last_print_len = None
+
+def set_print_test_mode(enable=True):
+ """Go into test mode, where all printing is recorded"""
+ global print_test_mode
+
+ print_test_mode = enable
+ get_print_test_lines()
+
+def get_print_test_lines():
+ """Get a list of all lines output through tprint()
+
+ Returns:
+ A list of PrintLine objects
+ """
+ global print_test_list
+
+ ret = print_test_list
+ print_test_list = []
+ return ret
+
+def echo_print_test_lines():
+ """Print out the text lines collected"""
+ for line in print_test_list:
+ if line.colour:
+ col = Color()
+ print(col.build(line.colour, line.text), end='')
+ else:
+ print(line.text, end='')
+ if line.newline:
+ print()
+
+def have_terminal():
+ """Check if we have an interactive terminal or not
+
+ Returns:
+ bool: true if an interactive terminal is attached
+ """
+ return os.isatty(sys.stdout.fileno())
+
+
+class Color():
+ """Conditionally wraps text in ANSI color escape sequences."""
+ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
+ BOLD = -1
+ BRIGHT_START = '\033[1;%d%sm'
+ NORMAL_START = '\033[22;%d%sm'
+ BOLD_START = '\033[1m'
+ BACK_EXTRA = ';%d'
+ RESET = '\033[0m'
+
+ def __init__(self, colored=COLOR_IF_TERMINAL):
+ """Create a new Color object, optionally disabling color output.
+
+ Args:
+ enabled: True if color output should be enabled. If False then this
+ class will not add color codes at all.
+ """
+ try:
+ self._enabled = (colored == COLOR_ALWAYS or
+ (colored == COLOR_IF_TERMINAL and
+ os.isatty(sys.stdout.fileno())))
+ except:
+ self._enabled = False
+
+ def enabled(self):
+ """Check if colour is enabled
+
+ Return: True if enabled, else False
+ """
+ return self._enabled
+
+ def start(self, color, bright=True, back=None):
+ """Returns a start color code.
+
+ Args:
+ color: Color to use, .e.g BLACK, RED, etc.
+
+ Returns:
+ If color is enabled, returns an ANSI sequence to start the given
+ color, otherwise returns empty string
+ """
+ if self._enabled:
+ if color == self.BOLD:
+ return self.BOLD_START
+ base = self.BRIGHT_START if bright else self.NORMAL_START
+ extra = self.BACK_EXTRA % (back + 40) if back else ''
+ return base % (color + 30, extra)
+ return ''
+
+ def stop(self):
+ """Returns a stop color code.
+
+ Returns:
+ If color is enabled, returns an ANSI color reset sequence,
+ otherwise returns empty string
+ """
+ if self._enabled:
+ return self.RESET
+ return ''
+
+ def build(self, color, text, bright=True, back=None):
+ """Returns text with conditionally added color escape sequences.
+
+ Keyword arguments:
+ color: Text color -- one of the color constants defined in this
+ class.
+ text: The text to color.
+
+ Returns:
+ If self._enabled is False, returns the original text. If it's True,
+ returns text with color escape sequences based on the value of
+ color.
+ """
+ if not self._enabled:
+ return text
+ return self.start(color, bright, back) + text + self.RESET
+
+
+# Use this to suppress stdout/stderr output:
+# with terminal.capture() as (stdout, stderr)
+# ...do something...
+@contextmanager
+def capture():
+ global CAPTURING
+
+ capture_out, capture_err = StringIO(), StringIO()
+ old_out, old_err = sys.stdout, sys.stderr
+ try:
+ CAPTURING = True
+ sys.stdout, sys.stderr = capture_out, capture_err
+ yield capture_out, capture_err
+ finally:
+ sys.stdout, sys.stderr = old_out, old_err
+ CAPTURING = False
+ if not USE_CAPTURE:
+ sys.stdout.write(capture_out.getvalue())
+ sys.stderr.write(capture_err.getvalue())
+
+
+@contextmanager
+def pager():
+ """Simple pager for outputting lots of text
+
+ Usage:
+ with terminal.pager():
+ print(...)
+ """
+ proc = None
+ old_stdout = None
+ try:
+ less = os.getenv('PAGER')
+ if not CAPTURING and less != 'none' and have_terminal():
+ if not less:
+ less = 'less -R --quit-if-one-screen'
+ proc = subprocess.Popen(less, stdin=subprocess.PIPE, text=True,
+ shell=True)
+ old_stdout = sys.stdout
+ sys.stdout = proc.stdin
+ yield
+ finally:
+ if proc:
+ sys.stdout = old_stdout
+ proc.communicate()
diff --git a/tools/u_boot_pylib/test_util.py b/tools/u_boot_pylib/test_util.py
new file mode 100644
index 00000000000..d258a1935c9
--- /dev/null
+++ b/tools/u_boot_pylib/test_util.py
@@ -0,0 +1,229 @@
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Copyright (c) 2016 Google, Inc
+#
+
+import doctest
+import glob
+import multiprocessing
+import os
+import re
+import sys
+import unittest
+
+from u_boot_pylib import command
+from u_boot_pylib import terminal
+
+use_concurrent = True
+try:
+ from concurrencytest import ConcurrentTestSuite
+ from concurrencytest import fork_for_tests
+except:
+ use_concurrent = False
+
+
+def run_test_coverage(prog, filter_fname, exclude_list, build_dir,
+ required=None, extra_args=None, single_thread='-P1',
+ args=None, allow_failures=None):
+ """Run tests and check that we get 100% coverage
+
+ Args:
+ prog: Program to run (with be passed a '-t' argument to run tests
+ filter_fname: Normally all *.py files in the program's directory will
+ be included. If this is not None, then it is used to filter the
+ list so that only filenames that don't contain filter_fname are
+ included.
+ exclude_list: List of file patterns to exclude from the coverage
+ calculation
+ build_dir: Build directory, used to locate libfdt.py
+ required: List of modules which must be in the coverage report
+ extra_args (str): Extra arguments to pass to the tool before the -t/test
+ arg
+ single_thread (str): Argument string to make the tests run
+ single-threaded. This is necessary to get proper coverage results.
+ The default is '-P0'
+ args (list of str): List of tests to run, or None to run all
+
+ Raises:
+ ValueError if the code coverage is not 100%
+ """
+ # This uses the build output from sandbox_spl to get _libfdt.so
+ path = os.path.dirname(prog)
+ if filter_fname:
+ glob_list = glob.glob(os.path.join(path, '*.py'))
+ glob_list = [fname for fname in glob_list if filter_fname in fname]
+ else:
+ glob_list = []
+ glob_list += exclude_list
+ glob_list += ['*libfdt.py', '*/site-packages/*', '*/dist-packages/*']
+ glob_list += ['*concurrencytest*']
+ test_cmd = 'test' if 'binman' in prog or 'patman' in prog else '-t'
+ prefix = ''
+ if build_dir:
+ prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir
+
+ # Detect a Python sandbox and use 'coverage' instead
+ covtool = ('python3-coverage' if sys.prefix == sys.base_prefix else
+ 'coverage')
+
+ cmd = ('%s%s run '
+ '--omit "%s" %s %s %s %s %s' % (prefix, covtool, ','.join(glob_list),
+ prog, extra_args or '', test_cmd,
+ single_thread or '-P1',
+ ' '.join(args) if args else ''))
+ os.system(cmd)
+ stdout = command.output(covtool, 'report')
+ lines = stdout.splitlines()
+ if required:
+ # Convert '/path/to/name.py' just the module name 'name'
+ test_set = set([os.path.splitext(os.path.basename(line.split()[0]))[0]
+ for line in lines if '/etype/' in line])
+ missing_list = required
+ missing_list.discard('__init__')
+ missing_list.difference_update(test_set)
+ if missing_list:
+ print('Missing tests for %s' % (', '.join(missing_list)))
+ print(stdout)
+ ok = False
+
+ coverage = lines[-1].split(' ')[-1]
+ ok = True
+ print(coverage)
+ if coverage != '100%':
+ print(stdout)
+ print("To get a report in 'htmlcov/index.html', type: python3-coverage html")
+ print('Coverage error: %s, but should be 100%%' % coverage)
+ ok = False
+ if not ok:
+ if allow_failures:
+ # for line in lines:
+ # print('.', line, re.match(r'^(tools/.*py) *\d+ *(\d+) *(\d+)%$', line))
+ lines = [re.match(r'^(tools/.*py) *\d+ *(\d+) *\d+%$', line)
+ for line in stdout.splitlines()]
+ bad = []
+ for mat in lines:
+ if mat and mat.group(2) != '0':
+ fname = mat.group(1)
+ if fname not in allow_failures:
+ bad.append(fname)
+ if not bad:
+ return
+ raise ValueError('Test coverage failure')
+
+
+class FullTextTestResult(unittest.TextTestResult):
+ """A test result class that can print extended text results to a stream
+
+ This is meant to be used by a TestRunner as a result class. Like
+ TextTestResult, this prints out the names of tests as they are run,
+ errors as they occur, and a summary of the results at the end of the
+ test run. Beyond those, this prints information about skipped tests,
+ expected failures and unexpected successes.
+
+ Args:
+ stream: A file-like object to write results to
+ descriptions (bool): True to print descriptions with test names
+ verbosity (int): Detail of printed output per test as they run
+ Test stdout and stderr always get printed when buffering
+ them is disabled by the test runner. In addition to that,
+ 0: Print nothing
+ 1: Print a dot per test
+ 2: Print test names
+ """
+ def __init__(self, stream, descriptions, verbosity):
+ self.verbosity = verbosity
+ super().__init__(stream, descriptions, verbosity)
+
+ def printErrors(self):
+ "Called by TestRunner after test run to summarize the tests"
+ # The parent class doesn't keep unexpected successes in the same
+ # format as the rest. Adapt it to what printErrorList expects.
+ unexpected_successes = [
+ (test, 'Test was expected to fail, but succeeded.\n')
+ for test in self.unexpectedSuccesses
+ ]
+
+ super().printErrors() # FAIL and ERROR
+ self.printErrorList('SKIP', self.skipped)
+ self.printErrorList('XFAIL', self.expectedFailures)
+ self.printErrorList('XPASS', unexpected_successes)
+
+ def addSkip(self, test, reason):
+ """Called when a test is skipped."""
+ # Add empty line to keep spacing consistent with other results
+ if not reason.endswith('\n'):
+ reason += '\n'
+ super().addSkip(test, reason)
+
+
+def run_test_suites(toolname, debug, verbosity, no_capture, test_preserve_dirs,
+ processes, test_name, toolpath, class_and_module_list):
+ """Run a series of test suites and collect the results
+
+ Args:
+ toolname: Name of the tool that ran the tests
+ debug: True to enable debugging, which shows a full stack trace on error
+ verbosity: Verbosity level to use (0-4)
+ test_preserve_dirs: True to preserve the input directory used by tests
+ so that it can be examined afterwards (only useful for debugging
+ tests). If a single test is selected (in args[0]) it also preserves
+ the output directory for this test. Both directories are displayed
+ on the command line.
+ processes: Number of processes to use to run tests (None=same as #CPUs)
+ test_name: Name of test to run, or None for all
+ toolpath: List of paths to use for tools
+ class_and_module_list: List of test classes (type class) and module
+ names (type str) to run
+ """
+ sys.argv = [sys.argv[0]]
+ if debug:
+ sys.argv.append('-D')
+ if verbosity:
+ sys.argv.append('-v%d' % verbosity)
+ if no_capture:
+ sys.argv.append('-N')
+ terminal.USE_CAPTURE = False
+ if toolpath:
+ for path in toolpath:
+ sys.argv += ['--toolpath', path]
+
+ suite = unittest.TestSuite()
+ loader = unittest.TestLoader()
+ runner = unittest.TextTestRunner(
+ stream=sys.stdout,
+ verbosity=(1 if verbosity is None else verbosity),
+ resultclass=FullTextTestResult,
+ )
+
+ if use_concurrent and processes != 1 and not test_name:
+ suite = ConcurrentTestSuite(suite,
+ fork_for_tests(processes or multiprocessing.cpu_count()))
+
+ for module in class_and_module_list:
+ if isinstance(module, str) and (not test_name or test_name == module):
+ suite.addTests(doctest.DocTestSuite(module))
+
+ for module in class_and_module_list:
+ if isinstance(module, str):
+ continue
+ # Test the test module about our arguments, if it is interested
+ if hasattr(module, 'setup_test_args'):
+ setup_test_args = getattr(module, 'setup_test_args')
+ setup_test_args(preserve_indir=test_preserve_dirs,
+ preserve_outdirs=test_preserve_dirs and test_name is not None,
+ toolpath=toolpath, verbosity=verbosity, no_capture=no_capture)
+ if test_name:
+ # Since Python v3.5 If an ImportError or AttributeError occurs
+ # while traversing a name then a synthetic test that raises that
+ # error when run will be returned. Check that the requested test
+ # exists, otherwise these errors are included in the results.
+ if test_name in loader.getTestCaseNames(module):
+ suite.addTests(loader.loadTestsFromName(test_name, module))
+ else:
+ suite.addTests(loader.loadTestsFromTestCase(module))
+
+ print(f" Running {toolname} tests ".center(70, "="))
+ result = runner.run(suite)
+ print()
+
+ return result
diff --git a/tools/u_boot_pylib/tools.py b/tools/u_boot_pylib/tools.py
new file mode 100644
index 00000000000..1afd289eadd
--- /dev/null
+++ b/tools/u_boot_pylib/tools.py
@@ -0,0 +1,612 @@
+# SPDX-License-Identifier: GPL-2.0+
+#
+# Copyright (c) 2016 Google, Inc
+#
+
+import glob
+import os
+import shlex
+import shutil
+import sys
+import tempfile
+import urllib.request
+
+from u_boot_pylib import command
+from u_boot_pylib import tout
+
+# Output directly (generally this is temporary)
+outdir = None
+
+# True to keep the output directory around after exiting
+preserve_outdir = False
+
+# Path to the Chrome OS chroot, if we know it
+chroot_path = None
+
+# Search paths to use for filename(), used to find files
+search_paths = []
+
+tool_search_paths = []
+
+# Tools and the packages that contain them, on debian
+packages = {
+ 'lz4': 'liblz4-tool',
+ }
+
+# List of paths to use when looking for an input file
+indir = []
+
+def prepare_output_dir(dirname, preserve=False):
+ """Select an output directory, ensuring it exists.
+
+ This either creates a temporary directory or checks that the one supplied
+ by the user is valid. For a temporary directory, it makes a note to
+ remove it later if required.
+
+ Args:
+ dirname: a string, name of the output directory to use to store
+ intermediate and output files. If is None - create a temporary
+ directory.
+ preserve: a Boolean. If outdir above is None and preserve is False, the
+ created temporary directory will be destroyed on exit.
+
+ Raises:
+ OSError: If it cannot create the output directory.
+ """
+ global outdir, preserve_outdir
+
+ preserve_outdir = dirname or preserve
+ if dirname:
+ outdir = dirname
+ if not os.path.isdir(outdir):
+ try:
+ os.makedirs(outdir)
+ except OSError as err:
+ raise ValueError(
+ f"Cannot make output directory 'outdir': 'err.strerror'")
+ tout.debug("Using output directory '%s'" % outdir)
+ else:
+ outdir = tempfile.mkdtemp(prefix='binman.')
+ tout.debug("Using temporary directory '%s'" % outdir)
+
+def _remove_output_dir():
+ global outdir
+
+ shutil.rmtree(outdir)
+ tout.debug("Deleted temporary directory '%s'" % outdir)
+ outdir = None
+
+def finalise_output_dir():
+ global outdir, preserve_outdir
+
+ """Tidy up: delete output directory if temporary and not preserved."""
+ if outdir and not preserve_outdir:
+ _remove_output_dir()
+ outdir = None
+
+def get_output_filename(fname):
+ """Return a filename within the output directory.
+
+ Args:
+ fname: Filename to use for new file
+
+ Returns:
+ The full path of the filename, within the output directory
+ """
+ return os.path.join(outdir, fname)
+
+def get_output_dir():
+ """Return the current output directory
+
+ Returns:
+ str: The output directory
+ """
+ return outdir
+
+def _finalise_for_test():
+ """Remove the output directory (for use by tests)"""
+ global outdir
+
+ if outdir:
+ _remove_output_dir()
+ outdir = None
+
+def set_input_dirs(dirname):
+ """Add a list of input directories, where input files are kept.
+
+ Args:
+ dirname: a list of paths to input directories to use for obtaining
+ files needed by binman to place in the image.
+ """
+ global indir
+
+ indir = dirname
+ tout.debug("Using input directories %s" % indir)
+
+def append_input_dirs(dirname):
+ """Append a list of input directories to the current list of input
+ directories
+
+ Args:
+ dirname: a list of paths to input directories to use for obtaining
+ files needed by binman to place in the image.
+ """
+ global indir
+
+ for dir in dirname:
+ if dirname not in indir:
+ indir.append(dirname)
+
+ tout.debug("Updated input directories %s" % indir)
+
+def get_input_filename(fname, allow_missing=False):
+ """Return a filename for use as input.
+
+ Args:
+ fname: Filename to use for new file
+ allow_missing: True if the filename can be missing
+
+ Returns:
+ fname, if indir is None;
+ full path of the filename, within the input directory;
+ None, if file is missing and allow_missing is True
+
+ Raises:
+ ValueError if file is missing and allow_missing is False
+ """
+ if not indir or fname[:1] == '/':
+ return fname
+ for dirname in indir:
+ pathname = os.path.join(dirname, fname)
+ if os.path.exists(pathname):
+ return pathname
+
+ if allow_missing:
+ return None
+ raise ValueError("Filename '%s' not found in input path (%s) (cwd='%s')" %
+ (fname, ','.join(indir), os.getcwd()))
+
+def get_input_filename_glob(pattern):
+ """Return a list of filenames for use as input.
+
+ Args:
+ pattern: Filename pattern to search for
+
+ Returns:
+ A list of matching files in all input directories
+ """
+ if not indir:
+ return glob.glob(pattern)
+ files = []
+ for dirname in indir:
+ pathname = os.path.join(dirname, pattern)
+ files += glob.glob(pathname)
+ return sorted(files)
+
+def align(pos, align):
+ if align:
+ mask = align - 1
+ pos = (pos + mask) & ~mask
+ return pos
+
+def not_power_of_two(num):
+ return num and (num & (num - 1))
+
+def set_tool_paths(toolpaths):
+ """Set the path to search for tools
+
+ Args:
+ toolpaths: List of paths to search for tools executed by run()
+ """
+ global tool_search_paths
+
+ tool_search_paths = toolpaths
+
+def path_has_file(path_spec, fname):
+ """Check if a given filename is in the PATH
+
+ Args:
+ path_spec: Value of PATH variable to check
+ fname: Filename to check
+
+ Returns:
+ True if found, False if not
+ """
+ for dir in path_spec.split(':'):
+ if os.path.exists(os.path.join(dir, fname)):
+ return True
+ return False
+
+def get_host_compile_tool(env, name):
+ """Get the host-specific version for a compile tool
+
+ This checks the environment variables that specify which version of
+ the tool should be used (e.g. ${HOSTCC}).
+
+ The following table lists the host-specific versions of the tools
+ this function resolves to:
+
+ Compile Tool | Host version
+ --------------+----------------
+ as | ${HOSTAS}
+ ld | ${HOSTLD}
+ cc | ${HOSTCC}
+ cpp | ${HOSTCPP}
+ c++ | ${HOSTCXX}
+ ar | ${HOSTAR}
+ nm | ${HOSTNM}
+ ldr | ${HOSTLDR}
+ strip | ${HOSTSTRIP}
+ objcopy | ${HOSTOBJCOPY}
+ objdump | ${HOSTOBJDUMP}
+ dtc | ${HOSTDTC}
+
+ Args:
+ name: Command name to run
+
+ Returns:
+ host_name: Exact command name to run instead
+ extra_args: List of extra arguments to pass
+ """
+ host_name = None
+ extra_args = []
+ if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip',
+ 'objcopy', 'objdump', 'dtc'):
+ host_name, *host_args = env.get('HOST' + name.upper(), '').split(' ')
+ elif name == 'c++':
+ host_name, *host_args = env.get('HOSTCXX', '').split(' ')
+
+ if host_name:
+ return host_name, extra_args
+ return name, []
+
+def get_target_compile_tool(name, cross_compile=None):
+ """Get the target-specific version for a compile tool
+
+ This first checks the environment variables that specify which
+ version of the tool should be used (e.g. ${CC}). If those aren't
+ specified, it checks the CROSS_COMPILE variable as a prefix for the
+ tool with some substitutions (e.g. "${CROSS_COMPILE}gcc" for cc).
+
+ The following table lists the target-specific versions of the tools
+ this function resolves to:
+
+ Compile Tool | First choice | Second choice
+ --------------+----------------+----------------------------
+ as | ${AS} | ${CROSS_COMPILE}as
+ ld | ${LD} | ${CROSS_COMPILE}ld.bfd
+ | | or ${CROSS_COMPILE}ld
+ cc | ${CC} | ${CROSS_COMPILE}gcc
+ cpp | ${CPP} | ${CROSS_COMPILE}gcc -E
+ c++ | ${CXX} | ${CROSS_COMPILE}g++
+ ar | ${AR} | ${CROSS_COMPILE}ar
+ nm | ${NM} | ${CROSS_COMPILE}nm
+ ldr | ${LDR} | ${CROSS_COMPILE}ldr
+ strip | ${STRIP} | ${CROSS_COMPILE}strip
+ objcopy | ${OBJCOPY} | ${CROSS_COMPILE}objcopy
+ objdump | ${OBJDUMP} | ${CROSS_COMPILE}objdump
+ dtc | ${DTC} | (no CROSS_COMPILE version)
+
+ Args:
+ name: Command name to run
+
+ Returns:
+ target_name: Exact command name to run instead
+ extra_args: List of extra arguments to pass
+ """
+ env = dict(os.environ)
+
+ target_name = None
+ extra_args = []
+ if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip',
+ 'objcopy', 'objdump', 'dtc'):
+ target_name, *extra_args = env.get(name.upper(), '').split(' ')
+ elif name == 'c++':
+ target_name, *extra_args = env.get('CXX', '').split(' ')
+
+ if target_name:
+ return target_name, extra_args
+
+ if cross_compile is None:
+ cross_compile = env.get('CROSS_COMPILE', '')
+
+ if name in ('as', 'ar', 'nm', 'ldr', 'strip', 'objcopy', 'objdump'):
+ target_name = cross_compile + name
+ elif name == 'ld':
+ try:
+ if run(cross_compile + 'ld.bfd', '-v'):
+ target_name = cross_compile + 'ld.bfd'
+ except:
+ target_name = cross_compile + 'ld'
+ elif name == 'cc':
+ target_name = cross_compile + 'gcc'
+ elif name == 'cpp':
+ target_name = cross_compile + 'gcc'
+ extra_args = ['-E']
+ elif name == 'c++':
+ target_name = cross_compile + 'g++'
+ else:
+ target_name = name
+ return target_name, extra_args
+
+def get_env_with_path():
+ """Get an updated environment with the PATH variable set correctly
+
+ If there are any search paths set, these need to come first in the PATH so
+ that these override any other version of the tools.
+
+ Returns:
+ dict: New environment with PATH updated, or None if there are not search
+ paths
+ """
+ if tool_search_paths:
+ env = dict(os.environ)
+ env['PATH'] = ':'.join(tool_search_paths) + ':' + env['PATH']
+ return env
+
+def run_result(name, *args, **kwargs):
+ """Run a tool with some arguments
+
+ This runs a 'tool', which is a program used by binman to process files and
+ perhaps produce some output. Tools can be located on the PATH or in a
+ search path.
+
+ Args:
+ name: Command name to run
+ args: Arguments to the tool
+ for_host: True to resolve the command to the version for the host
+ for_target: False to run the command as-is, without resolving it
+ to the version for the compile target
+ raise_on_error: Raise an error if the command fails (True by default)
+
+ Returns:
+ CommandResult object
+ """
+ try:
+ binary = kwargs.get('binary')
+ for_host = kwargs.get('for_host', False)
+ for_target = kwargs.get('for_target', not for_host)
+ raise_on_error = kwargs.get('raise_on_error', True)
+ env = get_env_with_path()
+ if for_target:
+ name, extra_args = get_target_compile_tool(name)
+ args = tuple(extra_args) + args
+ elif for_host:
+ name, extra_args = get_host_compile_tool(env, name)
+ args = tuple(extra_args) + args
+ name = os.path.expanduser(name) # Expand paths containing ~
+ all_args = (name,) + args
+ result = command.run_one(*all_args, capture=True, capture_stderr=True,
+ env=env, raise_on_error=False, binary=binary)
+ if result.return_code:
+ if raise_on_error:
+ raise ValueError("Error %d running '%s': %s" %
+ (result.return_code,' '.join(all_args),
+ result.stderr or result.stdout))
+ return result
+ except ValueError:
+ if env and not path_has_file(env['PATH'], name):
+ msg = "Please install tool '%s'" % name
+ package = packages.get(name)
+ if package:
+ msg += " (e.g. from package '%s')" % package
+ raise ValueError(msg)
+ raise
+
+def tool_find(name):
+ """Search the current path for a tool
+
+ This uses both PATH and any value from set_tool_paths() to search for a tool
+
+ Args:
+ name (str): Name of tool to locate
+
+ Returns:
+ str: Full path to tool if found, else None
+ """
+ name = os.path.expanduser(name) # Expand paths containing ~
+ paths = []
+ pathvar = os.environ.get('PATH')
+ if pathvar:
+ paths = pathvar.split(':')
+ if tool_search_paths:
+ paths += tool_search_paths
+ for path in paths:
+ fname = os.path.join(path, name)
+ if os.path.isfile(fname) and os.access(fname, os.X_OK):
+ return fname
+
+def run(name, *args, **kwargs):
+ """Run a tool with some arguments
+
+ This runs a 'tool', which is a program used by binman to process files and
+ perhaps produce some output. Tools can be located on the PATH or in a
+ search path.
+
+ Args:
+ name: Command name to run
+ args: Arguments to the tool
+ for_host: True to resolve the command to the version for the host
+ for_target: False to run the command as-is, without resolving it
+ to the version for the compile target
+
+ Returns:
+ CommandResult object
+ """
+ result = run_result(name, *args, **kwargs)
+ if result is not None:
+ return result.stdout
+
+def filename(fname):
+ """Resolve a file path to an absolute path.
+
+ If fname starts with ##/ and chroot is available, ##/ gets replaced with
+ the chroot path. If chroot is not available, this file name can not be
+ resolved, `None' is returned.
+
+ If fname is not prepended with the above prefix, and is not an existing
+ file, the actual file name is retrieved from the passed in string and the
+ search_paths directories (if any) are searched to for the file. If found -
+ the path to the found file is returned, `None' is returned otherwise.
+
+ Args:
+ fname: a string, the path to resolve.
+
+ Returns:
+ Absolute path to the file or None if not found.
+ """
+ if fname.startswith('##/'):
+ if chroot_path:
+ fname = os.path.join(chroot_path, fname[3:])
+ else:
+ return None
+
+ # Search for a pathname that exists, and return it if found
+ if fname and not os.path.exists(fname):
+ for path in search_paths:
+ pathname = os.path.join(path, os.path.basename(fname))
+ if os.path.exists(pathname):
+ return pathname
+
+ # If not found, just return the standard, unchanged path
+ return fname
+
+def read_file(fname, binary=True):
+ """Read and return the contents of a file.
+
+ Args:
+ fname: path to filename to read, where ## signifiies the chroot.
+
+ Returns:
+ data read from file, as a string.
+ """
+ with open(filename(fname), binary and 'rb' or 'r') as fd:
+ data = fd.read()
+ #self._out.Info("Read file '%s' size %d (%#0x)" %
+ #(fname, len(data), len(data)))
+ return data
+
+def write_file(fname, data, binary=True):
+ """Write data into a file.
+
+ Args:
+ fname: path to filename to write
+ data: data to write to file, as a string
+ """
+ #self._out.Info("Write file '%s' size %d (%#0x)" %
+ #(fname, len(data), len(data)))
+ with open(filename(fname), binary and 'wb' or 'w') as fd:
+ fd.write(data)
+
+def get_bytes(byte, size):
+ """Get a string of bytes of a given size
+
+ Args:
+ byte: Numeric byte value to use
+ size: Size of bytes/string to return
+
+ Returns:
+ A bytes type with 'byte' repeated 'size' times
+ """
+ return bytes([byte]) * size
+
+def to_bytes(string):
+ """Convert a str type into a bytes type
+
+ Args:
+ string: string to convert
+
+ Returns:
+ A bytes type
+ """
+ return string.encode('utf-8')
+
+def to_string(bval):
+ """Convert a bytes type into a str type
+
+ Args:
+ bval: bytes value to convert
+
+ Returns:
+ Python 3: A bytes type
+ Python 2: A string type
+ """
+ return bval.decode('utf-8')
+
+def to_hex(val):
+ """Convert an integer value (or None) to a string
+
+ Returns:
+ hex value, or 'None' if the value is None
+ """
+ return 'None' if val is None else '%#x' % val
+
+def to_hex_size(val):
+ """Return the size of an object in hex
+
+ Returns:
+ hex value of size, or 'None' if the value is None
+ """
+ return 'None' if val is None else '%#x' % len(val)
+
+def print_full_help(fname):
+ """Print the full help message for a tool using an appropriate pager.
+
+ Args:
+ fname: Path to a file containing the full help message
+ """
+ pager = shlex.split(os.getenv('PAGER', ''))
+ if not pager:
+ lesspath = shutil.which('less')
+ pager = [lesspath] if lesspath else None
+ if not pager:
+ pager = ['more']
+ command.run(*pager, fname)
+
+def download(url, tmpdir_pattern='.patman'):
+ """Download a file to a temporary directory
+
+ Args:
+ url (str): URL to download
+ tmpdir_pattern (str): pattern to use for the temporary directory
+
+ Returns:
+ Tuple:
+ Full path to the downloaded archive file in that directory,
+ or None if there was an error while downloading
+ Temporary directory name
+ """
+ print('- downloading: %s' % url)
+ leaf = url.split('/')[-1]
+ tmpdir = tempfile.mkdtemp(tmpdir_pattern)
+ response = urllib.request.urlopen(url)
+ fname = os.path.join(tmpdir, leaf)
+ fd = open(fname, 'wb')
+ meta = response.info()
+ size = int(meta.get('Content-Length'))
+ done = 0
+ block_size = 1 << 16
+ status = ''
+
+ # Read the file in chunks and show progress as we go
+ while True:
+ buffer = response.read(block_size)
+ if not buffer:
+ print(chr(8) * (len(status) + 1), '\r', end=' ')
+ break
+
+ done += len(buffer)
+ fd.write(buffer)
+ status = r'%10d MiB [%3d%%]' % (done // 1024 // 1024,
+ done * 100 // size)
+ status = status + chr(8) * (len(status) + 1)
+ print(status, end=' ')
+ sys.stdout.flush()
+ print('\r', end='')
+ sys.stdout.flush()
+ fd.close()
+ if done != size:
+ print('Error, failed to download')
+ os.remove(fname)
+ fname = None
+ return fname, tmpdir
diff --git a/tools/u_boot_pylib/tout.py b/tools/u_boot_pylib/tout.py
new file mode 100644
index 00000000000..ca72108d6bc
--- /dev/null
+++ b/tools/u_boot_pylib/tout.py
@@ -0,0 +1,190 @@
+# SPDX-License-Identifier: GPL-2.0+
+# Copyright (c) 2016 Google, Inc
+#
+# Terminal output logging.
+#
+
+import sys
+
+from u_boot_pylib import terminal
+
+# Output verbosity levels that we support
+FATAL, ERROR, WARNING, NOTICE, INFO, DETAIL, DEBUG = range(7)
+
+in_progress = False
+
+"""
+This class handles output of progress and other useful information
+to the user. It provides for simple verbosity level control and can
+output nothing but errors at verbosity zero.
+
+The idea is that modules set up an Output object early in their years and pass
+it around to other modules that need it. This keeps the output under control
+of a single class.
+
+Public properties:
+ verbose: Verbosity level: 0=silent, 1=progress, 3=full, 4=debug
+"""
+def __enter__():
+ return
+
+def __exit__(unused1, unused2, unused3):
+ """Clean up and remove any progress message."""
+ clear_progress()
+ return False
+
+def user_is_present():
+ """This returns True if it is likely that a user is present.
+
+ Sometimes we want to prompt the user, but if no one is there then this
+ is a waste of time, and may lock a script which should otherwise fail.
+
+ Returns:
+ True if it thinks the user is there, and False otherwise
+ """
+ return stdout_is_tty and verbose > ERROR
+
+def clear_progress():
+ """Clear any active progress message on the terminal."""
+ global in_progress
+ if verbose > ERROR and stdout_is_tty and in_progress:
+ _stdout.write('\r%s\r' % (" " * len (_progress)))
+ _stdout.flush()
+ in_progress = False
+
+def progress(msg, warning=False, trailer='...'):
+ """Display progress information.
+
+ Args:
+ msg: Message to display.
+ warning: True if this is a warning."""
+ global in_progress
+ clear_progress()
+ if verbose > ERROR:
+ _progress = msg + trailer
+ if stdout_is_tty:
+ col = _color.YELLOW if warning else _color.GREEN
+ _stdout.write('\r' + _color.build(col, _progress))
+ _stdout.flush()
+ in_progress = True
+ else:
+ _stdout.write(_progress + '\n')
+
+def _output(level, msg, color=None):
+ """Output a message to the terminal.
+
+ Args:
+ level: Verbosity level for this message. It will only be displayed if
+ this as high as the currently selected level.
+ msg; Message to display.
+ error: True if this is an error message, else False.
+ """
+ if verbose >= level:
+ clear_progress()
+ if color:
+ msg = _color.build(color, msg)
+ if level < NOTICE:
+ print(msg, file=sys.stderr)
+ else:
+ print(msg)
+ if level == FATAL:
+ sys.exit(1)
+
+def do_output(level, msg):
+ """Output a message to the terminal.
+
+ Args:
+ level: Verbosity level for this message. It will only be displayed if
+ this as high as the currently selected level.
+ msg; Message to display.
+ """
+ _output(level, msg)
+
+def fatal(msg):
+ """Display an error message and exit
+
+ Args:
+ msg; Message to display.
+ """
+ _output(FATAL, msg, _color.RED)
+
+def error(msg):
+ """Display an error message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(ERROR, msg, _color.RED)
+
+def warning(msg):
+ """Display a warning message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(WARNING, msg, _color.YELLOW)
+
+def notice(msg):
+ """Display an important infomation message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(NOTICE, msg)
+
+def info(msg):
+ """Display an infomation message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(INFO, msg)
+
+def detail(msg):
+ """Display a detailed message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(DETAIL, msg)
+
+def debug(msg):
+ """Display a debug message
+
+ Args:
+ msg; Message to display.
+ """
+ _output(DEBUG, msg)
+
+def user_output(msg):
+ """Display a message regardless of the current output level.
+
+ This is used when the output was specifically requested by the user.
+ Args:
+ msg; Message to display.
+ """
+ _output(ERROR, msg)
+
+def init(_verbose=WARNING, stdout=sys.stdout, allow_colour=True):
+ """Initialize a new output object.
+
+ Args:
+ verbose: Verbosity level (0-6).
+ stdout: File to use for stdout.
+ """
+ global verbose, _progress, _color, _stdout, stdout_is_tty
+
+ verbose = _verbose
+ _progress = '' # Our last progress message
+ _color = terminal.Color(terminal.COLOR_IF_TERMINAL if allow_colour
+ else terminal.COLOR_NEVER)
+ _stdout = stdout
+
+ # TODO(sjg): Move this into Chromite libraries when we have them
+ stdout_is_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty()
+ stderr_is_tty = hasattr(sys.stderr, 'isatty') and sys.stderr.isatty()
+
+def uninit():
+ clear_progress()
+
+init()
diff --git a/tools/u_boot_pylib/u_boot_pylib b/tools/u_boot_pylib/u_boot_pylib
new file mode 120000
index 00000000000..5a427d19424
--- /dev/null
+++ b/tools/u_boot_pylib/u_boot_pylib
@@ -0,0 +1 @@
+__main__.py \ No newline at end of file