diff options
Diffstat (limited to 'tools/u_boot_pylib')
-rw-r--r-- | tools/u_boot_pylib/LICENSE | 339 | ||||
-rw-r--r-- | tools/u_boot_pylib/README.rst | 15 | ||||
-rw-r--r-- | tools/u_boot_pylib/__init__.py | 4 | ||||
-rwxr-xr-x | tools/u_boot_pylib/__main__.py | 22 | ||||
-rw-r--r-- | tools/u_boot_pylib/command.py | 221 | ||||
-rw-r--r-- | tools/u_boot_pylib/cros_subprocess.py | 401 | ||||
-rw-r--r-- | tools/u_boot_pylib/gitutil.py | 886 | ||||
-rw-r--r-- | tools/u_boot_pylib/pyproject.toml | 25 | ||||
-rw-r--r-- | tools/u_boot_pylib/requirements.txt | 1 | ||||
-rw-r--r-- | tools/u_boot_pylib/terminal.py | 346 | ||||
-rw-r--r-- | tools/u_boot_pylib/test_util.py | 229 | ||||
-rw-r--r-- | tools/u_boot_pylib/tools.py | 612 | ||||
-rw-r--r-- | tools/u_boot_pylib/tout.py | 190 | ||||
l--------- | tools/u_boot_pylib/u_boot_pylib | 1 |
14 files changed, 3292 insertions, 0 deletions
diff --git a/tools/u_boot_pylib/LICENSE b/tools/u_boot_pylib/LICENSE new file mode 100644 index 00000000000..d159169d105 --- /dev/null +++ b/tools/u_boot_pylib/LICENSE @@ -0,0 +1,339 @@ + GNU GENERAL PUBLIC LICENSE + Version 2, June 1991 + + Copyright (C) 1989, 1991 Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +License is intended to guarantee your freedom to share and change free +software--to make sure the software is free for all its users. This +General Public License applies to most of the Free Software +Foundation's software and to any other program whose authors commit to +using it. (Some other Free Software Foundation software is covered by +the GNU Lesser General Public License instead.) You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +this service if you wish), that you receive source code or can get it +if you want it, that you can change the software or use pieces of it +in new free programs; and that you know you can do these things. + + To protect your rights, we need to make restrictions that forbid +anyone to deny you these rights or to ask you to surrender the rights. +These restrictions translate to certain responsibilities for you if you +distribute copies of the software, or if you modify it. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must give the recipients all the rights that +you have. You must make sure that they, too, receive or can get the +source code. And you must show them these terms so they know their +rights. + + We protect your rights with two steps: (1) copyright the software, and +(2) offer you this license which gives you legal permission to copy, +distribute and/or modify the software. + + Also, for each author's protection and ours, we want to make certain +that everyone understands that there is no warranty for this free +software. If the software is modified by someone else and passed on, we +want its recipients to know that what they have is not the original, so +that any problems introduced by others will not reflect on the original +authors' reputations. + + Finally, any free program is threatened constantly by software +patents. We wish to avoid the danger that redistributors of a free +program will individually obtain patent licenses, in effect making the +program proprietary. To prevent this, we have made it clear that any +patent must be licensed for everyone's free use or not licensed at all. + + The precise terms and conditions for copying, distribution and +modification follow. + + GNU GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License applies to any program or other work which contains +a notice placed by the copyright holder saying it may be distributed +under the terms of this General Public License. The "Program", below, +refers to any such program or work, and a "work based on the Program" +means either the Program or any derivative work under copyright law: +that is to say, a work containing the Program or a portion of it, +either verbatim or with modifications and/or translated into another +language. (Hereinafter, translation is included without limitation in +the term "modification".) Each licensee is addressed as "you". + +Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running the Program is not restricted, and the output from the Program +is covered only if its contents constitute a work based on the +Program (independent of having been made by running the Program). +Whether that is true depends on what the Program does. + + 1. You may copy and distribute verbatim copies of the Program's +source code as you receive it, in any medium, provided that you +conspicuously and appropriately publish on each copy an appropriate +copyright notice and disclaimer of warranty; keep intact all the +notices that refer to this License and to the absence of any warranty; +and give any other recipients of the Program a copy of this License +along with the Program. + +You may charge a fee for the physical act of transferring a copy, and +you may at your option offer warranty protection in exchange for a fee. + + 2. You may modify your copy or copies of the Program or any portion +of it, thus forming a work based on the Program, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) You must cause the modified files to carry prominent notices + stating that you changed the files and the date of any change. + + b) You must cause any work that you distribute or publish, that in + whole or in part contains or is derived from the Program or any + part thereof, to be licensed as a whole at no charge to all third + parties under the terms of this License. + + c) If the modified program normally reads commands interactively + when run, you must cause it, when started running for such + interactive use in the most ordinary way, to print or display an + announcement including an appropriate copyright notice and a + notice that there is no warranty (or else, saying that you provide + a warranty) and that users may redistribute the program under + these conditions, and telling the user how to view a copy of this + License. (Exception: if the Program itself is interactive but + does not normally print such an announcement, your work based on + the Program is not required to print an announcement.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Program, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Program, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Program. + +In addition, mere aggregation of another work not based on the Program +with the Program (or with a work based on the Program) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may copy and distribute the Program (or a work based on it, +under Section 2) in object code or executable form under the terms of +Sections 1 and 2 above provided that you also do one of the following: + + a) Accompany it with the complete corresponding machine-readable + source code, which must be distributed under the terms of Sections + 1 and 2 above on a medium customarily used for software interchange; or, + + b) Accompany it with a written offer, valid for at least three + years, to give any third party, for a charge no more than your + cost of physically performing source distribution, a complete + machine-readable copy of the corresponding source code, to be + distributed under the terms of Sections 1 and 2 above on a medium + customarily used for software interchange; or, + + c) Accompany it with the information you received as to the offer + to distribute corresponding source code. (This alternative is + allowed only for noncommercial distribution and only if you + received the program in object code or executable form with such + an offer, in accord with Subsection b above.) + +The source code for a work means the preferred form of the work for +making modifications to it. For an executable work, complete source +code means all the source code for all modules it contains, plus any +associated interface definition files, plus the scripts used to +control compilation and installation of the executable. However, as a +special exception, the source code distributed need not include +anything that is normally distributed (in either source or binary +form) with the major components (compiler, kernel, and so on) of the +operating system on which the executable runs, unless that component +itself accompanies the executable. + +If distribution of executable or object code is made by offering +access to copy from a designated place, then offering equivalent +access to copy the source code from the same place counts as +distribution of the source code, even though third parties are not +compelled to copy the source along with the object code. + + 4. You may not copy, modify, sublicense, or distribute the Program +except as expressly provided under this License. Any attempt +otherwise to copy, modify, sublicense or distribute the Program is +void, and will automatically terminate your rights under this License. +However, parties who have received copies, or rights, from you under +this License will not have their licenses terminated so long as such +parties remain in full compliance. + + 5. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Program or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Program (or any work based on the +Program), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Program or works based on it. + + 6. Each time you redistribute the Program (or any work based on the +Program), the recipient automatically receives a license from the +original licensor to copy, distribute or modify the Program subject to +these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties to +this License. + + 7. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Program at all. For example, if a patent +license would not permit royalty-free redistribution of the Program by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Program. + +If any portion of this section is held invalid or unenforceable under +any particular circumstance, the balance of the section is intended to +apply and the section as a whole is intended to apply in other +circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system, which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 8. If the distribution and/or use of the Program is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Program under this License +may add an explicit geographical distribution limitation excluding +those countries, so that distribution is permitted only in or among +countries not thus excluded. In such case, this License incorporates +the limitation as if written in the body of this License. + + 9. The Free Software Foundation may publish revised and/or new versions +of the General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + +Each version is given a distinguishing version number. If the Program +specifies a version number of this License which applies to it and "any +later version", you have the option of following the terms and conditions +either of that version or of any later version published by the Free +Software Foundation. If the Program does not specify a version number of +this License, you may choose any version ever published by the Free Software +Foundation. + + 10. If you wish to incorporate parts of the Program into other free +programs whose distribution conditions are different, write to the author +to ask for permission. For software which is copyrighted by the Free +Software Foundation, write to the Free Software Foundation; we sometimes +make exceptions for this. Our decision will be guided by the two goals +of preserving the free status of all derivatives of our free software and +of promoting the sharing and reuse of software generally. + + NO WARRANTY + + 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY +FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN +OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES +PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED +OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS +TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE +PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, +REPAIR OR CORRECTION. + + 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR +REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, +INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING +OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED +TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY +YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER +PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + <one line to give the program's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +Also add information on how to contact you by electronic and paper mail. + +If the program is interactive, make it output a short notice like this +when it starts in an interactive mode: + + Gnomovision version 69, Copyright (C) year name of author + Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, the commands you use may +be called something other than `show w' and `show c'; they could even be +mouse-clicks or menu items--whatever suits your program. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the program, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the program + `Gnomovision' (which makes passes at compilers) written by James Hacker. + + <signature of Ty Coon>, 1 April 1989 + Ty Coon, President of Vice + +This General Public License does not permit incorporating your program into +proprietary programs. If your program is a subroutine library, you may +consider it more useful to permit linking proprietary applications with the +library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. diff --git a/tools/u_boot_pylib/README.rst b/tools/u_boot_pylib/README.rst new file mode 100644 index 00000000000..36a18256d8b --- /dev/null +++ b/tools/u_boot_pylib/README.rst @@ -0,0 +1,15 @@ +.. SPDX-License-Identifier: GPL-2.0+ + +# U-Boot Python Library +======================= + +This is a Python library used by various U-Boot tools, including patman, +buildman and binman. + +The module can be installed with pip:: + + pip install u_boot_pylib + +or via setup.py:: + + ./setup.py install [--user] diff --git a/tools/u_boot_pylib/__init__.py b/tools/u_boot_pylib/__init__.py new file mode 100644 index 00000000000..807a62e0743 --- /dev/null +++ b/tools/u_boot_pylib/__init__.py @@ -0,0 +1,4 @@ +# SPDX-License-Identifier: GPL-2.0+ + +__all__ = ['command', 'cros_subprocess', 'gitutil', 'terminal', 'test_util', + 'tools', 'tout'] diff --git a/tools/u_boot_pylib/__main__.py b/tools/u_boot_pylib/__main__.py new file mode 100755 index 00000000000..d86b9d7dce0 --- /dev/null +++ b/tools/u_boot_pylib/__main__.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: GPL-2.0+ +# +# Copyright 2023 Google LLC +# + +import os +import sys + +if __name__ == "__main__": + # Allow 'from u_boot_pylib import xxx to work' + our_path = os.path.dirname(os.path.realpath(__file__)) + sys.path.append(os.path.join(our_path, '..')) + + # Run tests + from u_boot_pylib import test_util + + result = test_util.run_test_suites( + 'u_boot_pylib', False, False, False, False, None, None, None, + ['terminal']) + + sys.exit(0 if result.wasSuccessful() else 1) diff --git a/tools/u_boot_pylib/command.py b/tools/u_boot_pylib/command.py new file mode 100644 index 00000000000..cb7ebf49ce5 --- /dev/null +++ b/tools/u_boot_pylib/command.py @@ -0,0 +1,221 @@ +# SPDX-License-Identifier: GPL-2.0+ +""" +Shell command ease-ups for Python + +Copyright (c) 2011 The Chromium OS Authors. +""" + +import subprocess + +from u_boot_pylib import cros_subprocess + +# This permits interception of RunPipe for test purposes. If it is set to +# a function, then that function is called with the pipe list being +# executed. Otherwise, it is assumed to be a CommandResult object, and is +# returned as the result for every run_pipe() call. +# When this value is None, commands are executed as normal. +TEST_RESULT = None + + +class CommandExc(Exception): + """Reports an exception to the caller""" + def __init__(self, msg, result): + """Set up a new exception object + + Args: + result (CommandResult): Execution result so far + """ + super().__init__(msg) + self.result = result + + +class CommandResult: + """A class which captures the result of executing a command. + + Members: + stdout (bytes): stdout obtained from command, as a string + stderr (bytes): stderr obtained from command, as a string + combined (bytes): stdout and stderr interleaved + return_code (int): Return code from command + exception (Exception): Exception received, or None if all ok + output (str or None): Returns output as a single line if requested + """ + def __init__(self, stdout='', stderr='', combined='', return_code=0, + exception=None): + self.stdout = stdout + self.stderr = stderr + self.combined = combined + self.return_code = return_code + self.exception = exception + self.output = None + + def to_output(self, binary): + """Converts binary output to its final form + + Args: + binary (bool): True to report binary output, False to use strings + Returns: + self + """ + if not binary: + self.stdout = self.stdout.decode('utf-8') + self.stderr = self.stderr.decode('utf-8') + self.combined = self.combined.decode('utf-8') + return self + + +def run_pipe(pipe_list, infile=None, outfile=None, capture=False, + capture_stderr=False, oneline=False, raise_on_error=True, cwd=None, + binary=False, output_func=None, **kwargs): + """ + Perform a command pipeline, with optional input/output filenames. + + Args: + pipe_list (list of list): List of command lines to execute. Each command + line is piped into the next, and is itself a list of strings. For + example [ ['ls', '.git'] ['wc'] ] will pipe the output of + 'ls .git' into 'wc'. + infile (str): File to provide stdin to the pipeline + outfile (str): File to store stdout + capture (bool): True to capture output + capture_stderr (bool): True to capture stderr + oneline (bool): True to strip newline chars from output + raise_on_error (bool): True to raise on an error, False to return it in + the CommandResult + cwd (str or None): Directory to run the command in + binary (bool): True to report binary output, False to use strings + output_func (function): Output function to call with each output + fragment (if it returns True the function terminates) + **kwargs: Additional keyword arguments to cros_subprocess.Popen() + Returns: + CommandResult object + Raises: + CommandExc if an exception happens + """ + if TEST_RESULT: + if hasattr(TEST_RESULT, '__call__'): + # pylint: disable=E1102 + result = TEST_RESULT(pipe_list=pipe_list) + if result: + return result + else: + return TEST_RESULT + # No result: fall through to normal processing + result = CommandResult(b'', b'', b'') + last_pipe = None + pipeline = list(pipe_list) + user_pipestr = '|'.join([' '.join(pipe) for pipe in pipe_list]) + kwargs['stdout'] = None + kwargs['stderr'] = None + while pipeline: + cmd = pipeline.pop(0) + if last_pipe is not None: + kwargs['stdin'] = last_pipe.stdout + elif infile: + kwargs['stdin'] = open(infile, 'rb') + if pipeline or capture: + kwargs['stdout'] = cros_subprocess.PIPE + elif outfile: + kwargs['stdout'] = open(outfile, 'wb') + if capture_stderr: + kwargs['stderr'] = cros_subprocess.PIPE + + try: + last_pipe = cros_subprocess.Popen(cmd, cwd=cwd, **kwargs) + except Exception as err: + result.exception = err + if raise_on_error: + raise CommandExc(f"Error running '{user_pipestr}': {err}", + result) from err + result.return_code = 255 + return result.to_output(binary) + + if capture: + result.stdout, result.stderr, result.combined = ( + last_pipe.communicate_filter(output_func)) + if result.stdout and oneline: + result.output = result.stdout.rstrip(b'\r\n') + result.return_code = last_pipe.wait() + if raise_on_error and result.return_code: + raise CommandExc(f"Error running '{user_pipestr}'", result) + return result.to_output(binary) + + +def output(*cmd, **kwargs): + """Run a command and return its output + + Args: + *cmd (list of str): Command to run + **kwargs (dict of args): Extra arguments to pass in + + Returns: + str: command output + """ + kwargs['raise_on_error'] = kwargs.get('raise_on_error', True) + return run_pipe([cmd], capture=True, **kwargs).stdout + + +def output_one_line(*cmd, **kwargs): + """Run a command and output it as a single-line string + + The command is expected to produce a single line of output + + Args: + *cmd (list of str): Command to run + **kwargs (dict of args): Extra arguments to pass in + + Returns: + str: output of command with all newlines removed + """ + raise_on_error = kwargs.pop('raise_on_error', True) + result = run_pipe([cmd], capture=True, oneline=True, + raise_on_error=raise_on_error, **kwargs).stdout.strip() + return result + + +def run(*cmd, **kwargs): + """Run a command + + Note that you must add 'capture' to kwargs to obtain non-empty output + + Args: + *cmd (list of str): Command to run + **kwargs (dict of args): Extra arguments to pass in + + Returns: + str: output of command + """ + return run_pipe([cmd], **kwargs).stdout + + +def run_one(*cmd, **kwargs): + """Run a single command + + Note that you must add 'capture' to kwargs to obtain non-empty output + + Args: + *cmd (list of str): Command to run + **kwargs (dict of args): Extra arguments to pass in + + Returns: + CommandResult: output of command + """ + return run_pipe([cmd], **kwargs) + + +def run_list(cmd, **kwargs): + """Run a command and return its output + + Args: + cmd (list of str): Command to run + + Returns: + str: output of command + **kwargs (dict of args): Extra arguments to pass in + """ + return run_pipe([cmd], capture=True, **kwargs).stdout + + +def stop_all(): + """Stop all subprocesses initiated with cros_subprocess""" + cros_subprocess.stay_alive = False diff --git a/tools/u_boot_pylib/cros_subprocess.py b/tools/u_boot_pylib/cros_subprocess.py new file mode 100644 index 00000000000..cd614f38a64 --- /dev/null +++ b/tools/u_boot_pylib/cros_subprocess.py @@ -0,0 +1,401 @@ +# Copyright (c) 2012 The Chromium OS Authors. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. +# +# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> +# Licensed to PSF under a Contributor Agreement. +# See http://www.python.org/2.4/license for licensing details. + +"""Subprocess execution + +This module holds a subclass of subprocess.Popen with our own required +features, mainly that we get access to the subprocess output while it +is running rather than just at the end. This makes it easier to show +progress information and filter output in real time. +""" + +import errno +import os +import pty +import select +import subprocess +import sys +import unittest + + +# Import these here so the caller does not need to import subprocess also. +PIPE = subprocess.PIPE +STDOUT = subprocess.STDOUT +PIPE_PTY = -3 # Pipe output through a pty +stay_alive = True + + +class Popen(subprocess.Popen): + """Like subprocess.Popen with ptys and incremental output + + This class deals with running a child process and filtering its output on + both stdout and stderr while it is running. We do this so we can monitor + progress, and possibly relay the output to the user if requested. + + The class is similar to subprocess.Popen, the equivalent is something like: + + Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + But this class has many fewer features, and two enhancement: + + 1. Rather than getting the output data only at the end, this class sends it + to a provided operation as it arrives. + 2. We use pseudo terminals so that the child will hopefully flush its output + to us as soon as it is produced, rather than waiting for the end of a + line. + + Use communicate_filter() to handle output from the subprocess. + + """ + + def __init__(self, args, stdin=None, stdout=PIPE_PTY, stderr=PIPE_PTY, + shell=False, cwd=None, env=None, **kwargs): + """Cut-down constructor + + Args: + args: Program and arguments for subprocess to execute. + stdin: See subprocess.Popen() + stdout: See subprocess.Popen(), except that we support the sentinel + value of cros_subprocess.PIPE_PTY. + stderr: See subprocess.Popen(), except that we support the sentinel + value of cros_subprocess.PIPE_PTY. + shell: See subprocess.Popen() + cwd: Working directory to change to for subprocess, or None if none. + env: Environment to use for this subprocess, or None to inherit parent. + kwargs: No other arguments are supported at the moment. Passing other + arguments will cause a ValueError to be raised. + """ + stdout_pty = None + stderr_pty = None + + if stdout == PIPE_PTY: + stdout_pty = pty.openpty() + stdout = os.fdopen(stdout_pty[1]) + if stderr == PIPE_PTY: + stderr_pty = pty.openpty() + stderr = os.fdopen(stderr_pty[1]) + + super(Popen, self).__init__(args, stdin=stdin, + stdout=stdout, stderr=stderr, shell=shell, cwd=cwd, env=env, + **kwargs) + + # If we're on a PTY, we passed the slave half of the PTY to the subprocess. + # We want to use the master half on our end from now on. Setting this here + # does make some assumptions about the implementation of subprocess, but + # those assumptions are pretty minor. + + # Note that if stderr is STDOUT, then self.stderr will be set to None by + # this constructor. + if stdout_pty is not None: + self.stdout = os.fdopen(stdout_pty[0]) + if stderr_pty is not None: + self.stderr = os.fdopen(stderr_pty[0]) + + # Insist that unit tests exist for other arguments we don't support. + if kwargs: + raise ValueError("Unit tests do not test extra args - please add tests") + + def convert_data(self, data): + """Convert stdout/stderr data to the correct format for output + + Args: + data: Data to convert, or None for '' + + Returns: + Converted data, as bytes + """ + if data is None: + return b'' + return data + + def communicate_filter(self, output, input_buf=''): + """Interact with process: Read data from stdout and stderr. + + This method runs until end-of-file is reached, then waits for the + subprocess to terminate. + + The output function is sent all output from the subprocess and must be + defined like this: + + def output([self,] stream, data) + Args: + stream: the stream the output was received on, which will be + sys.stdout or sys.stderr. + data: a string containing the data + + Returns: + True to terminate the process + + Note: The data read is buffered in memory, so do not use this + method if the data size is large or unlimited. + + Args: + output: Function to call with each fragment of output. + + Returns: + A tuple (stdout, stderr, combined) which is the data received on + stdout, stderr and the combined data (interleaved stdout and stderr). + + Note that the interleaved output will only be sensible if you have + set both stdout and stderr to PIPE or PIPE_PTY. Even then it depends on + the timing of the output in the subprocess. If a subprocess flips + between stdout and stderr quickly in succession, by the time we come to + read the output from each we may see several lines in each, and will read + all the stdout lines, then all the stderr lines. So the interleaving + may not be correct. In this case you might want to pass + stderr=cros_subprocess.STDOUT to the constructor. + + This feature is still useful for subprocesses where stderr is + rarely used and indicates an error. + + Note also that if you set stderr to STDOUT, then stderr will be empty + and the combined output will just be the same as stdout. + """ + + read_set = [] + write_set = [] + stdout = None # Return + stderr = None # Return + + if self.stdin: + # Flush stdio buffer. This might block, if the user has + # been writing to .stdin in an uncontrolled fashion. + self.stdin.flush() + if input_buf: + write_set.append(self.stdin) + else: + self.stdin.close() + if self.stdout: + read_set.append(self.stdout) + stdout = bytearray() + if self.stderr and self.stderr != self.stdout: + read_set.append(self.stderr) + stderr = bytearray() + combined = bytearray() + + stop_now = False + input_offset = 0 + while read_set or write_set: + try: + rlist, wlist, _ = select.select(read_set, write_set, [], 0.2) + except select.error as e: + if e.args[0] == errno.EINTR: + continue + raise + + if not stay_alive: + self.terminate() + + if self.stdin in wlist: + # When select has indicated that the file is writable, + # we can write up to PIPE_BUF bytes without risk + # blocking. POSIX defines PIPE_BUF >= 512 + chunk = input_buf[input_offset : input_offset + 512] + bytes_written = os.write(self.stdin.fileno(), chunk) + input_offset += bytes_written + if input_offset >= len(input_buf): + self.stdin.close() + write_set.remove(self.stdin) + + if self.stdout in rlist: + data = b'' + # We will get an error on read if the pty is closed + try: + data = os.read(self.stdout.fileno(), 1024) + except OSError: + pass + if not len(data): + self.stdout.close() + read_set.remove(self.stdout) + else: + stdout += data + combined += data + if output: + stop_now = output(sys.stdout, data) + if self.stderr in rlist: + data = b'' + # We will get an error on read if the pty is closed + try: + data = os.read(self.stderr.fileno(), 1024) + except OSError: + pass + if not len(data): + self.stderr.close() + read_set.remove(self.stderr) + else: + stderr += data + combined += data + if output: + stop_now = output(sys.stderr, data) + if stop_now: + self.terminate() + + # All data exchanged. Translate lists into strings. + stdout = self.convert_data(stdout) + stderr = self.convert_data(stderr) + combined = self.convert_data(combined) + + self.wait() + return (stdout, stderr, combined) + + +# Just being a unittest.TestCase gives us 14 public methods. Unless we +# disable this, we can only have 6 tests in a TestCase. That's not enough. +# +# pylint: disable=R0904 + +class TestSubprocess(unittest.TestCase): + """Our simple unit test for this module""" + + class MyOperation: + """Provides a operation that we can pass to Popen""" + def __init__(self, input_to_send=None): + """Constructor to set up the operation and possible input. + + Args: + input_to_send: a text string to send when we first get input. We will + add \r\n to the string. + """ + self.stdout_data = '' + self.stderr_data = '' + self.combined_data = '' + self.stdin_pipe = None + self._input_to_send = input_to_send + if input_to_send: + pipe = os.pipe() + self.stdin_read_pipe = pipe[0] + self._stdin_write_pipe = os.fdopen(pipe[1], 'w') + + def output(self, stream, data): + """Output handler for Popen. Stores the data for later comparison""" + if stream == sys.stdout: + self.stdout_data += data + if stream == sys.stderr: + self.stderr_data += data + self.combined_data += data + + # Output the input string if we have one. + if self._input_to_send: + self._stdin_write_pipe.write(self._input_to_send + '\r\n') + self._stdin_write_pipe.flush() + + def _basic_check(self, plist, oper): + """Basic checks that the output looks sane.""" + self.assertEqual(plist[0], oper.stdout_data) + self.assertEqual(plist[1], oper.stderr_data) + self.assertEqual(plist[2], oper.combined_data) + + # The total length of stdout and stderr should equal the combined length + self.assertEqual(len(plist[0]) + len(plist[1]), len(plist[2])) + + def test_simple(self): + """Simple redirection: Get process list""" + oper = TestSubprocess.MyOperation() + plist = Popen(['ps']).communicate_filter(oper.output) + self._basic_check(plist, oper) + + def test_stderr(self): + """Check stdout and stderr""" + oper = TestSubprocess.MyOperation() + cmd = 'echo fred >/dev/stderr && false || echo bad' + plist = Popen([cmd], shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], 'bad\r\n') + self.assertEqual(plist [1], 'fred\r\n') + + def test_shell(self): + """Check with and without shell works""" + oper = TestSubprocess.MyOperation() + cmd = 'echo test >/dev/stderr' + self.assertRaises(OSError, Popen, [cmd], shell=False) + plist = Popen([cmd], shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(len(plist [0]), 0) + self.assertEqual(plist [1], 'test\r\n') + + def test_list_args(self): + """Check with and without shell works using list arguments""" + oper = TestSubprocess.MyOperation() + cmd = ['echo', 'test', '>/dev/stderr'] + plist = Popen(cmd, shell=False).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], ' '.join(cmd[1:]) + '\r\n') + self.assertEqual(len(plist [1]), 0) + + oper = TestSubprocess.MyOperation() + + # this should be interpreted as 'echo' with the other args dropped + cmd = ['echo', 'test', '>/dev/stderr'] + plist = Popen(cmd, shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], '\r\n') + + def test_cwd(self): + """Check we can change directory""" + for shell in (False, True): + oper = TestSubprocess.MyOperation() + plist = Popen('pwd', shell=shell, cwd='/tmp').communicate_filter( + oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], '/tmp\r\n') + + def test_env(self): + """Check we can change environment""" + for add in (False, True): + oper = TestSubprocess.MyOperation() + env = os.environ + if add: + env ['FRED'] = 'fred' + cmd = 'echo $FRED' + plist = Popen(cmd, shell=True, env=env).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], add and 'fred\r\n' or '\r\n') + + def test_extra_args(self): + """Check we can't add extra arguments""" + self.assertRaises(ValueError, Popen, 'true', close_fds=False) + + def test_basic_input(self): + """Check that incremental input works + + We set up a subprocess which will prompt for name. When we see this prompt + we send the name as input to the process. It should then print the name + properly to stdout. + """ + oper = TestSubprocess.MyOperation('Flash') + prompt = 'What is your name?: ' + cmd = 'echo -n "%s"; read name; echo Hello $name' % prompt + plist = Popen([cmd], stdin=oper.stdin_read_pipe, + shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(len(plist [1]), 0) + self.assertEqual(plist [0], prompt + 'Hello Flash\r\r\n') + + def test_isatty(self): + """Check that ptys appear as terminals to the subprocess""" + oper = TestSubprocess.MyOperation() + cmd = ('if [ -t %d ]; then echo "terminal %d" >&%d; ' + 'else echo "not %d" >&%d; fi;') + both_cmds = '' + for fd in (1, 2): + both_cmds += cmd % (fd, fd, fd, fd, fd) + plist = Popen(both_cmds, shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], 'terminal 1\r\n') + self.assertEqual(plist [1], 'terminal 2\r\n') + + # Now try with PIPE and make sure it is not a terminal + oper = TestSubprocess.MyOperation() + plist = Popen(both_cmds, stdout=subprocess.PIPE, stderr=subprocess.PIPE, + shell=True).communicate_filter(oper.output) + self._basic_check(plist, oper) + self.assertEqual(plist [0], 'not 1\n') + self.assertEqual(plist [1], 'not 2\n') + +if __name__ == '__main__': + unittest.main() diff --git a/tools/u_boot_pylib/gitutil.py b/tools/u_boot_pylib/gitutil.py new file mode 100644 index 00000000000..34b4dbb4839 --- /dev/null +++ b/tools/u_boot_pylib/gitutil.py @@ -0,0 +1,886 @@ +# SPDX-License-Identifier: GPL-2.0+ +# Copyright (c) 2011 The Chromium OS Authors. +# + +"""Basic utilities for running the git command-line tool from Python""" + +import os +import sys + +from u_boot_pylib import command +from u_boot_pylib import terminal + +# True to use --no-decorate - we check this in setup() +USE_NO_DECORATE = True + + +def log_cmd(commit_range, git_dir=None, oneline=False, reverse=False, + count=None, decorate=False): + """Create a command to perform a 'git log' + + Args: + commit_range (str): Range expression to use for log, None for none + git_dir (str): Path to git repository (None to use default) + oneline (bool): True to use --oneline, else False + reverse (bool): True to reverse the log (--reverse) + count (int or None): Number of commits to list, or None for no limit + decorate (bool): True to use --decorate + + Return: + List containing command and arguments to run + """ + cmd = ['git'] + if git_dir: + cmd += ['--git-dir', git_dir] + cmd += ['--no-pager', 'log', '--no-color'] + if oneline: + cmd.append('--oneline') + if USE_NO_DECORATE and not decorate: + cmd.append('--no-decorate') + if decorate: + cmd.append('--decorate') + if reverse: + cmd.append('--reverse') + if count is not None: + cmd.append(f'-n{count}') + if commit_range: + cmd.append(commit_range) + + # Add this in case we have a branch with the same name as a directory. + # This avoids messages like this, for example: + # fatal: ambiguous argument 'test': both revision and filename + cmd.append('--') + return cmd + + +def count_commits_to_branch(branch, git_dir=None, end=None): + """Returns number of commits between HEAD and the tracking branch. + + This looks back to the tracking branch and works out the number of commits + since then. + + Args: + branch (str or None): Branch to count from (None for current branch) + git_dir (str): Path to git repository (None to use default) + end (str): End commit to stop before + + Return: + Number of patches that exist on top of the branch + """ + if end: + rev_range = f'{end}..{branch}' + elif branch: + us, msg = get_upstream(git_dir or '.git', branch) + if not us: + raise ValueError(msg) + rev_range = f'{us}..{branch}' + else: + rev_range = '@{upstream}..' + cmd = log_cmd(rev_range, git_dir=git_dir, oneline=True) + result = command.run_one(*cmd, capture=True, capture_stderr=True, + oneline=True, raise_on_error=False) + if result.return_code: + raise ValueError( + f'Failed to determine upstream: {result.stderr.strip()}') + patch_count = len(result.stdout.splitlines()) + return patch_count + + +def name_revision(commit_hash): + """Gets the revision name for a commit + + Args: + commit_hash (str): Commit hash to look up + + Return: + Name of revision, if any, else None + """ + stdout = command.output_one_line('git', 'name-rev', commit_hash) + if not stdout: + return None + + # We expect a commit, a space, then a revision name + name = stdout.split()[1].strip() + return name + + +def guess_upstream(git_dir, branch): + """Tries to guess the upstream for a branch + + This lists out top commits on a branch and tries to find a suitable + upstream. It does this by looking for the first commit where + 'git name-rev' returns a plain branch name, with no ! or ^ modifiers. + + Args: + git_dir (str): Git directory containing repo + branch (str): Name of branch + + Returns: + Tuple: + Name of upstream branch (e.g. 'upstream/master') or None if none + Warning/error message, or None if none + """ + cmd = log_cmd(branch, git_dir=git_dir, oneline=True, count=100, + decorate=True) + result = command.run_one(*cmd, capture=True, capture_stderr=True, + raise_on_error=False) + if result.return_code: + return None, f"Branch '{branch}' not found" + for line in result.stdout.splitlines()[1:]: + parts = line.split(maxsplit=1) + if len(parts) >= 2 and parts[1].startswith('('): + commit_hash = parts[0] + name = name_revision(commit_hash) + if '~' not in name and '^' not in name: + if name.startswith('remotes/'): + name = name[8:] + return name, f"Guessing upstream as '{name}'" + return None, f"Cannot find a suitable upstream for branch '{branch}'" + + +def get_upstream(git_dir, branch): + """Returns the name of the upstream for a branch + + Args: + git_dir (str): Git directory containing repo + branch (str): Name of branch + + Returns: + Tuple: + Name of upstream branch (e.g. 'upstream/master') or None if none + Warning/error message, or None if none + """ + try: + remote = command.output_one_line('git', '--git-dir', git_dir, 'config', + f'branch.{branch}.remote') + merge = command.output_one_line('git', '--git-dir', git_dir, 'config', + f'branch.{branch}.merge') + except command.CommandExc: + upstream, msg = guess_upstream(git_dir, branch) + return upstream, msg + + if remote == '.': + return merge, None + if remote and merge: + # Drop the initial refs/heads from merge + leaf = merge.split('/', maxsplit=2)[2:] + return f'{remote}/{"/".join(leaf)}', None + raise ValueError("Cannot determine upstream branch for branch " + f"'{branch}' remote='{remote}', merge='{merge}'") + + +def get_range_in_branch(git_dir, branch, include_upstream=False): + """Returns an expression for the commits in the given branch. + + Args: + git_dir (str): Directory containing git repo + branch (str): Name of branch + include_upstream (bool): Include the upstream commit as well + Return: + Expression in the form 'upstream..branch' which can be used to + access the commits. If the branch does not exist, returns None. + """ + upstream, msg = get_upstream(git_dir, branch) + if not upstream: + return None, msg + rstr = f"{upstream}{'~' if include_upstream else ''}..{branch}" + return rstr, msg + + +def count_commits_in_range(git_dir, range_expr): + """Returns the number of commits in the given range. + + Args: + git_dir (str): Directory containing git repo + range_expr (str): Range to check + Return: + Number of patches that exist in the supplied range or None if none + were found + """ + cmd = log_cmd(range_expr, git_dir=git_dir, oneline=True) + result = command.run_one(*cmd, capture=True, capture_stderr=True, + raise_on_error=False) + if result.return_code: + return None, f"Range '{range_expr}' not found or is invalid" + patch_count = len(result.stdout.splitlines()) + return patch_count, None + + +def count_commits_in_branch(git_dir, branch, include_upstream=False): + """Returns the number of commits in the given branch. + + Args: + git_dir (str): Directory containing git repo + branch (str): Name of branch + include_upstream (bool): Include the upstream commit as well + Return: + Number of patches that exist on top of the branch, or None if the + branch does not exist. + """ + range_expr, msg = get_range_in_branch(git_dir, branch, include_upstream) + if not range_expr: + return None, msg + return count_commits_in_range(git_dir, range_expr) + + +def count_commits(commit_range): + """Returns the number of commits in the given range. + + Args: + commit_range (str): Range of commits to count (e.g. 'HEAD..base') + Return: + Number of patches that exist on top of the branch + """ + pipe = [log_cmd(commit_range, oneline=True), + ['wc', '-l']] + stdout = command.run_pipe(pipe, capture=True, oneline=True).stdout + patch_count = int(stdout) + return patch_count + + +def checkout(commit_hash, git_dir=None, work_tree=None, force=False): + """Checkout the selected commit for this build + + Args: + commit_hash (str): Commit hash to check out + git_dir (str): Directory containing git repo, or None for current dir + work_tree (str): Git worktree to use, or None if none + force (bool): True to force the checkout (git checkout -f) + """ + pipe = ['git'] + if git_dir: + pipe.extend(['--git-dir', git_dir]) + if work_tree: + pipe.extend(['--work-tree', work_tree]) + pipe.append('checkout') + if force: + pipe.append('-f') + pipe.append(commit_hash) + result = command.run_pipe([pipe], capture=True, raise_on_error=False, + capture_stderr=True) + if result.return_code != 0: + raise OSError(f'git checkout ({pipe}): {result.stderr}') + + +def clone(repo, output_dir): + """Clone a repo + + Args: + repo (str): Repo to clone (e.g. web address) + output_dir (str): Directory to close into + """ + result = command.run_one('git', 'clone', repo, '.', capture=True, + cwd=output_dir, capture_stderr=True) + if result.return_code != 0: + raise OSError(f'git clone: {result.stderr}') + + +def fetch(git_dir=None, work_tree=None): + """Fetch from the origin repo + + Args: + git_dir (str): Directory containing git repo, or None for current dir + work_tree (str or None): Git worktree to use, or None if none + """ + cmd = ['git'] + if git_dir: + cmd.extend(['--git-dir', git_dir]) + if work_tree: + cmd.extend(['--work-tree', work_tree]) + cmd.append('fetch') + result = command.run_one(*cmd, capture=True, capture_stderr=True) + if result.return_code != 0: + raise OSError(f'git fetch: {result.stderr}') + + +def check_worktree_is_available(git_dir): + """Check if git-worktree functionality is available + + Args: + git_dir (str): The repository to test in + + Returns: + True if git-worktree commands will work, False otherwise. + """ + result = command.run_one('git', '--git-dir', git_dir, 'worktree', 'list', + capture=True, capture_stderr=True, + raise_on_error=False) + return result.return_code == 0 + + +def add_worktree(git_dir, output_dir, commit_hash=None): + """Create and checkout a new git worktree for this build + + Args: + git_dir (str): The repository to checkout the worktree from + output_dir (str): Path for the new worktree + commit_hash (str): Commit hash to checkout + """ + # We need to pass --detach to avoid creating a new branch + cmd = ['git', '--git-dir', git_dir, 'worktree', 'add', '.', '--detach'] + if commit_hash: + cmd.append(commit_hash) + result = command.run_one(*cmd, capture=True, cwd=output_dir, + capture_stderr=True) + if result.return_code != 0: + raise OSError(f'git worktree add: {result.stderr}') + + +def prune_worktrees(git_dir): + """Remove administrative files for deleted worktrees + + Args: + git_dir (str): The repository whose deleted worktrees should be pruned + """ + result = command.run_one('git', '--git-dir', git_dir, 'worktree', 'prune', + capture=True, capture_stderr=True) + if result.return_code != 0: + raise OSError(f'git worktree prune: {result.stderr}') + + +def create_patches(branch, start, count, ignore_binary, series, signoff=True, + git_dir=None, cwd=None): + """Create a series of patches from the top of the current branch. + + The patch files are written to the current directory using + git format-patch. + + Args: + branch (str): Branch to create patches from (None for current branch) + start (int): Commit to start from: 0=HEAD, 1=next one, etc. + count (int): number of commits to include + ignore_binary (bool): Don't generate patches for binary files + series (Series): Series object for this series (set of patches) + signoff (bool): True to add signoff lines automatically + git_dir (str): Path to git repository (None to use default) + cwd (str): Path to use for git operations + Return: + Filename of cover letter (None if none) + List of filenames of patch files + """ + cmd = ['git'] + if git_dir: + cmd += ['--git-dir', git_dir] + cmd += ['format-patch', '-M'] + if signoff: + cmd.append('--signoff') + if ignore_binary: + cmd.append('--no-binary') + if series.get('cover'): + cmd.append('--cover-letter') + prefix = series.GetPatchPrefix() + if prefix: + cmd += [f'--subject-prefix={prefix}'] + brname = branch or 'HEAD' + cmd += [f'{brname}~{start + count}..{brname}~{start}'] + + stdout = command.run_list(cmd, cwd=cwd) + files = stdout.splitlines() + + # We have an extra file if there is a cover letter + if series.get('cover'): + return files[0], files[1:] + return None, files + + +def build_email_list(in_list, alias, tag=None, warn_on_error=True): + """Build a list of email addresses based on an input list. + + Takes a list of email addresses and aliases, and turns this into a list + of only email address, by resolving any aliases that are present. + + If the tag is given, then each email address is prepended with this + tag and a space. If the tag starts with a minus sign (indicating a + command line parameter) then the email address is quoted. + + Args: + in_list (list of str): List of aliases/email addresses + alias (dict): Alias dictionary: + key: alias + value: list of aliases or email addresses + tag (str): Text to put before each address + warn_on_error (bool): True to raise an error when an alias fails to + match, False to just print a message. + + Returns: + List of email addresses + + >>> alias = {} + >>> alias['fred'] = ['f.bloggs@napier.co.nz'] + >>> alias['john'] = ['j.bloggs@napier.co.nz'] + >>> alias['mary'] = ['Mary Poppins <m.poppins@cloud.net>'] + >>> alias['boys'] = ['fred', ' john'] + >>> alias['all'] = ['fred ', 'john', ' mary '] + >>> build_email_list(['john', 'mary'], alias, None) + ['j.bloggs@napier.co.nz', 'Mary Poppins <m.poppins@cloud.net>'] + >>> build_email_list(['john', 'mary'], alias, '--to') + ['--to "j.bloggs@napier.co.nz"', \ +'--to "Mary Poppins <m.poppins@cloud.net>"'] + >>> build_email_list(['john', 'mary'], alias, 'Cc') + ['Cc j.bloggs@napier.co.nz', 'Cc Mary Poppins <m.poppins@cloud.net>'] + """ + raw = [] + for item in in_list: + raw += lookup_email(item, alias, warn_on_error=warn_on_error) + result = [] + for item in raw: + if item not in result: + result.append(item) + if tag: + return [x for email in result for x in (tag, email)] + return result + + +def check_suppress_cc_config(): + """Check if sendemail.suppresscc is configured correctly. + + Returns: + bool: True if the option is configured correctly, False otherwise. + """ + suppresscc = command.output_one_line( + 'git', 'config', 'sendemail.suppresscc', raise_on_error=False) + + # Other settings should be fine. + if suppresscc in ('all', 'cccmd'): + col = terminal.Color() + + print(col.build(col.RED, 'error') + + f': git config sendemail.suppresscc set to {suppresscc}\n' + + ' patman needs --cc-cmd to be run to set the cc list.\n' + + ' Please run:\n' + + ' git config --unset sendemail.suppresscc\n' + + ' Or read the man page:\n' + + ' git send-email --help\n' + + ' and set an option that runs --cc-cmd\n') + return False + + return True + + +def email_patches(series, cover_fname, args, dry_run, warn_on_error, cc_fname, + alias, self_only=False, in_reply_to=None, thread=False, + smtp_server=None, cwd=None): + """Email a patch series. + + Args: + series (Series): Series object containing destination info + cover_fname (str or None): filename of cover letter + args (list of str): list of filenames of patch files + dry_run (bool): Just return the command that would be run + warn_on_error (bool): True to print a warning when an alias fails to + match, False to ignore it. + cc_fname (str): Filename of Cc file for per-commit Cc + alias (dict): Alias dictionary: + key: alias + value: list of aliases or email addresses + self_only (bool): True to just email to yourself as a test + in_reply_to (str or None): If set we'll pass this to git as + --in-reply-to - should be a message ID that this is in reply to. + thread (bool): True to add --thread to git send-email (make + all patches reply to cover-letter or first patch in series) + smtp_server (str or None): SMTP server to use to send patches + cwd (str): Path to use for patch files (None to use current dir) + + Returns: + Git command that was/would be run + + # For the duration of this doctest pretend that we ran patman with ./patman + >>> _old_argv0 = sys.argv[0] + >>> sys.argv[0] = './patman' + + >>> alias = {} + >>> alias['fred'] = ['f.bloggs@napier.co.nz'] + >>> alias['john'] = ['j.bloggs@napier.co.nz'] + >>> alias['mary'] = ['m.poppins@cloud.net'] + >>> alias['boys'] = ['fred', ' john'] + >>> alias['all'] = ['fred ', 'john', ' mary '] + >>> alias[os.getenv('USER')] = ['this-is-me@me.com'] + >>> series = {} + >>> series['to'] = ['fred'] + >>> series['cc'] = ['mary'] + >>> email_patches(series, 'cover', ['p1', 'p2'], True, True, 'cc-fname', \ + False, alias) + 'git send-email --annotate --to "f.bloggs@napier.co.nz" --cc \ +"m.poppins@cloud.net" --cc-cmd "./patman send --cc-cmd cc-fname" cover p1 p2' + >>> email_patches(series, None, ['p1'], True, True, 'cc-fname', False, \ + alias) + 'git send-email --annotate --to "f.bloggs@napier.co.nz" --cc \ +"m.poppins@cloud.net" --cc-cmd "./patman send --cc-cmd cc-fname" p1' + >>> series['cc'] = ['all'] + >>> email_patches(series, 'cover', ['p1', 'p2'], True, True, 'cc-fname', \ + True, alias) + 'git send-email --annotate --to "this-is-me@me.com" --cc-cmd "./patman \ +send --cc-cmd cc-fname" cover p1 p2' + >>> email_patches(series, 'cover', ['p1', 'p2'], True, True, 'cc-fname', \ + False, alias) + 'git send-email --annotate --to "f.bloggs@napier.co.nz" --cc \ +"f.bloggs@napier.co.nz" --cc "j.bloggs@napier.co.nz" --cc \ +"m.poppins@cloud.net" --cc-cmd "./patman send --cc-cmd cc-fname" cover p1 p2' + + # Restore argv[0] since we clobbered it. + >>> sys.argv[0] = _old_argv0 + """ + to = build_email_list(series.get('to'), alias, '--to', warn_on_error) + if not to: + if not command.output('git', 'config', 'sendemail.to', + raise_on_error=False): + print("No recipient.\n" + "Please add something like this to a commit\n" + "Series-to: Fred Bloggs <f.blogs@napier.co.nz>\n" + "Or do something like this\n" + "git config sendemail.to u-boot@lists.denx.de") + return None + cc = build_email_list(list(set(series.get('cc')) - set(series.get('to'))), + alias, '--cc', warn_on_error) + if self_only: + to = build_email_list([os.getenv('USER')], '--to', alias, + warn_on_error) + cc = [] + cmd = ['git', 'send-email', '--annotate'] + if smtp_server: + cmd.append(f'--smtp-server={smtp_server}') + if in_reply_to: + cmd.append(f'--in-reply-to="{in_reply_to}"') + if thread: + cmd.append('--thread') + + cmd += to + cmd += cc + cmd += ['--cc-cmd', f'{sys.argv[0]} send --cc-cmd {cc_fname}'] + if cover_fname: + cmd.append(cover_fname) + cmd += args + if not dry_run: + command.run(*cmd, capture=False, capture_stderr=False, cwd=cwd) + return' '.join([f'"{x}"' if ' ' in x and '"' not in x else x + for x in cmd]) + + +def lookup_email(lookup_name, alias, warn_on_error=True, level=0): + """If an email address is an alias, look it up and return the full name + + TODO: Why not just use git's own alias feature? + + Args: + lookup_name (str): Alias or email address to look up + alias (dict): Alias dictionary + key: alias + value: list of aliases or email addresses + warn_on_error (bool): True to print a warning when an alias fails to + match, False to ignore it. + level (int): Depth of alias stack, used to detect recusion/loops + + Returns: + tuple: + list containing a list of email addresses + + Raises: + OSError if a recursive alias reference was found + ValueError if an alias was not found + + >>> alias = {} + >>> alias['fred'] = ['f.bloggs@napier.co.nz'] + >>> alias['john'] = ['j.bloggs@napier.co.nz'] + >>> alias['mary'] = ['m.poppins@cloud.net'] + >>> alias['boys'] = ['fred', ' john', 'f.bloggs@napier.co.nz'] + >>> alias['all'] = ['fred ', 'john', ' mary '] + >>> alias['loop'] = ['other', 'john', ' mary '] + >>> alias['other'] = ['loop', 'john', ' mary '] + >>> lookup_email('mary', alias) + ['m.poppins@cloud.net'] + >>> lookup_email('arthur.wellesley@howe.ro.uk', alias) + ['arthur.wellesley@howe.ro.uk'] + >>> lookup_email('boys', alias) + ['f.bloggs@napier.co.nz', 'j.bloggs@napier.co.nz'] + >>> lookup_email('all', alias) + ['f.bloggs@napier.co.nz', 'j.bloggs@napier.co.nz', 'm.poppins@cloud.net'] + >>> lookup_email('odd', alias) + Alias 'odd' not found + [] + >>> lookup_email('loop', alias) + Traceback (most recent call last): + ... + OSError: Recursive email alias at 'other' + >>> lookup_email('odd', alias, warn_on_error=False) + [] + >>> # In this case the loop part will effectively be ignored. + >>> lookup_email('loop', alias, warn_on_error=False) + Recursive email alias at 'other' + Recursive email alias at 'john' + Recursive email alias at 'mary' + ['j.bloggs@napier.co.nz', 'm.poppins@cloud.net'] + """ + lookup_name = lookup_name.strip() + if '@' in lookup_name: # Perhaps a real email address + return [lookup_name] + + lookup_name = lookup_name.lower() + col = terminal.Color() + + out_list = [] + if level > 10: + msg = f"Recursive email alias at '{lookup_name}'" + if warn_on_error: + raise OSError(msg) + print(col.build(col.RED, msg)) + return out_list + + if lookup_name: + if lookup_name not in alias: + msg = f"Alias '{lookup_name}' not found" + if warn_on_error: + print(col.build(col.RED, msg)) + return out_list + for item in alias[lookup_name]: + todo = lookup_email(item, alias, warn_on_error, level + 1) + for new_item in todo: + if new_item not in out_list: + out_list.append(new_item) + + return out_list + + +def get_top_level(): + """Return name of top-level directory for this git repo. + + Returns: + str: Full path to git top-level directory, or None if not found + + This test makes sure that we are running tests in the right subdir + + >>> os.path.realpath(os.path.dirname(__file__)) == \ + os.path.join(get_top_level(), 'tools', 'patman') + True + """ + result = command.run_one( + 'git', 'rev-parse', '--show-toplevel', oneline=True, capture=True, + capture_stderr=True, raise_on_error=False) + if result.return_code: + return None + return result.stdout.strip() + + +def get_alias_file(): + """Gets the name of the git alias file. + + Returns: + str: Filename of git alias file, or None if none + """ + fname = command.output_one_line('git', 'config', 'sendemail.aliasesfile', + raise_on_error=False) + if not fname: + return None + + fname = os.path.expanduser(fname.strip()) + if os.path.isabs(fname): + return fname + + return os.path.join(get_top_level() or '', fname) + + +def get_default_user_name(): + """Gets the user.name from .gitconfig file. + + Returns: + User name found in .gitconfig file, or None if none + """ + uname = command.output_one_line('git', 'config', '--global', '--includes', + 'user.name') + return uname + + +def get_default_user_email(): + """Gets the user.email from the global .gitconfig file. + + Returns: + User's email found in .gitconfig file, or None if none + """ + uemail = command.output_one_line('git', 'config', '--global', '--includes', + 'user.email') + return uemail + + +def get_default_subject_prefix(): + """Gets the format.subjectprefix from local .git/config file. + + Returns: + Subject prefix found in local .git/config file, or None if none + """ + sub_prefix = command.output_one_line( + 'git', 'config', 'format.subjectprefix', raise_on_error=False) + + return sub_prefix + + +def setup(): + """setup() - Set up git utils, by reading the alias files.""" + # Check for a git alias file also + global USE_NO_DECORATE + + cmd = log_cmd(None, count=0) + USE_NO_DECORATE = (command.run_one(*cmd, raise_on_error=False) + .return_code == 0) + + +def get_hash(spec, git_dir=None): + """Get the hash of a commit + + Args: + spec (str): Git commit to show, e.g. 'my-branch~12' + git_dir (str): Path to git repository (None to use default) + + Returns: + str: Hash of commit + """ + cmd = ['git'] + if git_dir: + cmd += ['--git-dir', git_dir] + cmd += ['show', '-s', '--pretty=format:%H', spec] + return command.output_one_line(*cmd) + + +def get_head(): + """Get the hash of the current HEAD + + Returns: + Hash of HEAD + """ + return get_hash('HEAD') + + +def get_branch(git_dir=None): + """Get the branch we are currently on + + Return: + str: branch name, or None if none + git_dir (str): Path to git repository (None to use default) + """ + cmd = ['git'] + if git_dir: + cmd += ['--git-dir', git_dir] + cmd += ['rev-parse', '--abbrev-ref', 'HEAD'] + out = command.output_one_line(*cmd, raise_on_error=False) + if out == 'HEAD': + return None + return out + + +def check_dirty(git_dir=None, work_tree=None): + """Check if the tree is dirty + + Args: + git_dir (str): Path to git repository (None to use default) + work_tree (str): Git worktree to use, or None if none + + Return: + str: List of dirty filenames and state + """ + cmd = ['git'] + if git_dir: + cmd += ['--git-dir', git_dir] + if work_tree: + cmd += ['--work-tree', work_tree] + cmd += ['status', '--porcelain', '--untracked-files=no'] + return command.output(*cmd).splitlines() + + +def check_branch(name, git_dir=None): + """Check if a branch exists + + Args: + name (str): Name of the branch to check + git_dir (str): Path to git repository (None to use default) + """ + cmd = ['git'] + if git_dir: + cmd += ['--git-dir', git_dir] + cmd += ['branch', '--list', name] + + # This produces ' <name>' or '* <name>' + out = command.output(*cmd).rstrip() + return out[2:] == name + + +def rename_branch(old_name, name, git_dir=None): + """Check if a branch exists + + Args: + old_name (str): Name of the branch to rename + name (str): New name for the branch + git_dir (str): Path to git repository (None to use default) + + Return: + str: Output from command + """ + cmd = ['git'] + if git_dir: + cmd += ['--git-dir', git_dir] + cmd += ['branch', '--move', old_name, name] + + # This produces ' <name>' or '* <name>' + return command.output(*cmd).rstrip() + + +def get_commit_message(commit, git_dir=None): + """Gets the commit message for a commit + + Args: + commit (str): commit to check + git_dir (str): Path to git repository (None to use default) + + Return: + list of str: Lines from the commit message + """ + cmd = ['git'] + if git_dir: + cmd += ['--git-dir', git_dir] + cmd += ['show', '--quiet', commit] + + out = command.output(*cmd) + # the header is followed by a blank line + lines = out.splitlines() + empty = lines.index('') + msg = lines[empty + 1:] + unindented = [line[4:] for line in msg] + + return unindented + + +def show_commit(commit, msg=True, diffstat=False, patch=False, colour=True, + git_dir=None): + """Runs 'git show' and returns the output + + Args: + commit (str): commit to check + msg (bool): Show the commit message + diffstat (bool): True to include the diffstat + patch (bool): True to include the patch + colour (bool): True to force use of colour + git_dir (str): Path to git repository (None to use default) + + Return: + list of str: Lines from the commit message + """ + cmd = ['git'] + if git_dir: + cmd += ['--git-dir', git_dir] + cmd += ['show'] + if colour: + cmd.append('--color') + if not msg: + cmd.append('--oneline') + if diffstat: + cmd.append('--stat') + else: + cmd.append('--quiet') + if patch: + cmd.append('--patch') + cmd.append(commit) + + return command.output(*cmd) + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/tools/u_boot_pylib/pyproject.toml b/tools/u_boot_pylib/pyproject.toml new file mode 100644 index 00000000000..ce2355084ac --- /dev/null +++ b/tools/u_boot_pylib/pyproject.toml @@ -0,0 +1,25 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "u_boot_pylib" +version = "0.0.6" +authors = [ + { name="Simon Glass", email="sjg@chromium.org" }, +] +description = "U-Boot python library" +readme = "README.rst" +requires-python = ">=3.7" +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)", + "Operating System :: OS Independent", +] + +[project.urls] +"Homepage" = "https://docs.u-boot.org" +"Bug Tracker" = "https://source.denx.de/groups/u-boot/-/issues" + +[tool.setuptools.package-data] +u_boot_pylib = ["*.rst"] diff --git a/tools/u_boot_pylib/requirements.txt b/tools/u_boot_pylib/requirements.txt new file mode 100644 index 00000000000..1087e6f2857 --- /dev/null +++ b/tools/u_boot_pylib/requirements.txt @@ -0,0 +1 @@ +concurrencytest==0.1.2 diff --git a/tools/u_boot_pylib/terminal.py b/tools/u_boot_pylib/terminal.py new file mode 100644 index 00000000000..69c183e85e5 --- /dev/null +++ b/tools/u_boot_pylib/terminal.py @@ -0,0 +1,346 @@ +# SPDX-License-Identifier: GPL-2.0+ +# Copyright (c) 2011 The Chromium OS Authors. +# + +"""Terminal utilities + +This module handles terminal interaction including ANSI color codes. +""" + +from contextlib import contextmanager +from io import StringIO +import os +import re +import shutil +import subprocess +import sys + +# Selection of when we want our output to be colored +COLOR_IF_TERMINAL, COLOR_ALWAYS, COLOR_NEVER = range(3) + +# Initially, we are set up to print to the terminal +print_test_mode = False +print_test_list = [] + +# The length of the last line printed without a newline +last_print_len = None + +# credit: +# stackoverflow.com/questions/14693701/how-can-i-remove-the-ansi-escape-sequences-from-a-string-in-python +ansi_escape = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + +# True if we are capturing console output +CAPTURING = False + +# Set this to False to disable output-capturing globally +USE_CAPTURE = True + + +class PrintLine: + """A line of text output + + Members: + text: Text line that was printed + newline: True to output a newline after the text + colour: Text colour to use + """ + def __init__(self, text, colour, newline=True, bright=True): + self.text = text + self.newline = newline + self.colour = colour + self.bright = bright + + def __eq__(self, other): + return (self.text == other.text and + self.newline == other.newline and + self.colour == other.colour and + self.bright == other.bright) + + def __str__(self): + return ("newline=%s, colour=%s, bright=%d, text='%s'" % + (self.newline, self.colour, self.bright, self.text)) + + +def calc_ascii_len(text): + """Calculate the length of a string, ignoring any ANSI sequences + + When displayed on a terminal, ANSI sequences don't take any space, so we + need to ignore them when calculating the length of a string. + + Args: + text: Text to check + + Returns: + Length of text, after skipping ANSI sequences + + >>> col = Color(COLOR_ALWAYS) + >>> text = col.build(Color.RED, 'abc') + >>> len(text) + 14 + >>> calc_ascii_len(text) + 3 + >>> + >>> text += 'def' + >>> calc_ascii_len(text) + 6 + >>> text += col.build(Color.RED, 'abc') + >>> calc_ascii_len(text) + 9 + """ + result = ansi_escape.sub('', text) + return len(result) + +def trim_ascii_len(text, size): + """Trim a string containing ANSI sequences to the given ASCII length + + The string is trimmed with ANSI sequences being ignored for the length + calculation. + + >>> col = Color(COLOR_ALWAYS) + >>> text = col.build(Color.RED, 'abc') + >>> len(text) + 14 + >>> calc_ascii_len(trim_ascii_len(text, 4)) + 3 + >>> calc_ascii_len(trim_ascii_len(text, 2)) + 2 + >>> text += 'def' + >>> calc_ascii_len(trim_ascii_len(text, 4)) + 4 + >>> text += col.build(Color.RED, 'ghi') + >>> calc_ascii_len(trim_ascii_len(text, 7)) + 7 + """ + if calc_ascii_len(text) < size: + return text + pos = 0 + out = '' + left = size + + # Work through each ANSI sequence in turn + for m in ansi_escape.finditer(text): + # Find the text before the sequence and add it to our string, making + # sure it doesn't overflow + before = text[pos:m.start()] + toadd = before[:left] + out += toadd + + # Figure out how much non-ANSI space we have left + left -= len(toadd) + + # Add the ANSI sequence and move to the position immediately after it + out += m.group() + pos = m.start() + len(m.group()) + + # Deal with text after the last ANSI sequence + after = text[pos:] + toadd = after[:left] + out += toadd + + return out + + +def tprint(text='', newline=True, colour=None, limit_to_line=False, + bright=True, back=None, col=None): + """Handle a line of output to the terminal. + + In test mode this is recorded in a list. Otherwise it is output to the + terminal. + + Args: + text: Text to print + newline: True to add a new line at the end of the text + colour: Colour to use for the text + """ + global last_print_len + + if print_test_mode: + print_test_list.append(PrintLine(text, colour, newline, bright)) + else: + if colour is not None: + if not col: + col = Color() + text = col.build(colour, text, bright=bright, back=back) + if newline: + print(text) + last_print_len = None + else: + if limit_to_line: + cols = shutil.get_terminal_size().columns + text = trim_ascii_len(text, cols) + print(text, end='', flush=True) + last_print_len = calc_ascii_len(text) + +def print_clear(): + """Clear a previously line that was printed with no newline""" + global last_print_len + + if last_print_len: + if print_test_mode: + print_test_list.append(PrintLine(None, None, None, None)) + else: + print('\r%s\r' % (' '* last_print_len), end='', flush=True) + last_print_len = None + +def set_print_test_mode(enable=True): + """Go into test mode, where all printing is recorded""" + global print_test_mode + + print_test_mode = enable + get_print_test_lines() + +def get_print_test_lines(): + """Get a list of all lines output through tprint() + + Returns: + A list of PrintLine objects + """ + global print_test_list + + ret = print_test_list + print_test_list = [] + return ret + +def echo_print_test_lines(): + """Print out the text lines collected""" + for line in print_test_list: + if line.colour: + col = Color() + print(col.build(line.colour, line.text), end='') + else: + print(line.text, end='') + if line.newline: + print() + +def have_terminal(): + """Check if we have an interactive terminal or not + + Returns: + bool: true if an interactive terminal is attached + """ + return os.isatty(sys.stdout.fileno()) + + +class Color(): + """Conditionally wraps text in ANSI color escape sequences.""" + BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) + BOLD = -1 + BRIGHT_START = '\033[1;%d%sm' + NORMAL_START = '\033[22;%d%sm' + BOLD_START = '\033[1m' + BACK_EXTRA = ';%d' + RESET = '\033[0m' + + def __init__(self, colored=COLOR_IF_TERMINAL): + """Create a new Color object, optionally disabling color output. + + Args: + enabled: True if color output should be enabled. If False then this + class will not add color codes at all. + """ + try: + self._enabled = (colored == COLOR_ALWAYS or + (colored == COLOR_IF_TERMINAL and + os.isatty(sys.stdout.fileno()))) + except: + self._enabled = False + + def enabled(self): + """Check if colour is enabled + + Return: True if enabled, else False + """ + return self._enabled + + def start(self, color, bright=True, back=None): + """Returns a start color code. + + Args: + color: Color to use, .e.g BLACK, RED, etc. + + Returns: + If color is enabled, returns an ANSI sequence to start the given + color, otherwise returns empty string + """ + if self._enabled: + if color == self.BOLD: + return self.BOLD_START + base = self.BRIGHT_START if bright else self.NORMAL_START + extra = self.BACK_EXTRA % (back + 40) if back else '' + return base % (color + 30, extra) + return '' + + def stop(self): + """Returns a stop color code. + + Returns: + If color is enabled, returns an ANSI color reset sequence, + otherwise returns empty string + """ + if self._enabled: + return self.RESET + return '' + + def build(self, color, text, bright=True, back=None): + """Returns text with conditionally added color escape sequences. + + Keyword arguments: + color: Text color -- one of the color constants defined in this + class. + text: The text to color. + + Returns: + If self._enabled is False, returns the original text. If it's True, + returns text with color escape sequences based on the value of + color. + """ + if not self._enabled: + return text + return self.start(color, bright, back) + text + self.RESET + + +# Use this to suppress stdout/stderr output: +# with terminal.capture() as (stdout, stderr) +# ...do something... +@contextmanager +def capture(): + global CAPTURING + + capture_out, capture_err = StringIO(), StringIO() + old_out, old_err = sys.stdout, sys.stderr + try: + CAPTURING = True + sys.stdout, sys.stderr = capture_out, capture_err + yield capture_out, capture_err + finally: + sys.stdout, sys.stderr = old_out, old_err + CAPTURING = False + if not USE_CAPTURE: + sys.stdout.write(capture_out.getvalue()) + sys.stderr.write(capture_err.getvalue()) + + +@contextmanager +def pager(): + """Simple pager for outputting lots of text + + Usage: + with terminal.pager(): + print(...) + """ + proc = None + old_stdout = None + try: + less = os.getenv('PAGER') + if not CAPTURING and less != 'none' and have_terminal(): + if not less: + less = 'less -R --quit-if-one-screen' + proc = subprocess.Popen(less, stdin=subprocess.PIPE, text=True, + shell=True) + old_stdout = sys.stdout + sys.stdout = proc.stdin + yield + finally: + if proc: + sys.stdout = old_stdout + proc.communicate() diff --git a/tools/u_boot_pylib/test_util.py b/tools/u_boot_pylib/test_util.py new file mode 100644 index 00000000000..d258a1935c9 --- /dev/null +++ b/tools/u_boot_pylib/test_util.py @@ -0,0 +1,229 @@ +# SPDX-License-Identifier: GPL-2.0+ +# +# Copyright (c) 2016 Google, Inc +# + +import doctest +import glob +import multiprocessing +import os +import re +import sys +import unittest + +from u_boot_pylib import command +from u_boot_pylib import terminal + +use_concurrent = True +try: + from concurrencytest import ConcurrentTestSuite + from concurrencytest import fork_for_tests +except: + use_concurrent = False + + +def run_test_coverage(prog, filter_fname, exclude_list, build_dir, + required=None, extra_args=None, single_thread='-P1', + args=None, allow_failures=None): + """Run tests and check that we get 100% coverage + + Args: + prog: Program to run (with be passed a '-t' argument to run tests + filter_fname: Normally all *.py files in the program's directory will + be included. If this is not None, then it is used to filter the + list so that only filenames that don't contain filter_fname are + included. + exclude_list: List of file patterns to exclude from the coverage + calculation + build_dir: Build directory, used to locate libfdt.py + required: List of modules which must be in the coverage report + extra_args (str): Extra arguments to pass to the tool before the -t/test + arg + single_thread (str): Argument string to make the tests run + single-threaded. This is necessary to get proper coverage results. + The default is '-P0' + args (list of str): List of tests to run, or None to run all + + Raises: + ValueError if the code coverage is not 100% + """ + # This uses the build output from sandbox_spl to get _libfdt.so + path = os.path.dirname(prog) + if filter_fname: + glob_list = glob.glob(os.path.join(path, '*.py')) + glob_list = [fname for fname in glob_list if filter_fname in fname] + else: + glob_list = [] + glob_list += exclude_list + glob_list += ['*libfdt.py', '*/site-packages/*', '*/dist-packages/*'] + glob_list += ['*concurrencytest*'] + test_cmd = 'test' if 'binman' in prog or 'patman' in prog else '-t' + prefix = '' + if build_dir: + prefix = 'PYTHONPATH=$PYTHONPATH:%s/sandbox_spl/tools ' % build_dir + + # Detect a Python sandbox and use 'coverage' instead + covtool = ('python3-coverage' if sys.prefix == sys.base_prefix else + 'coverage') + + cmd = ('%s%s run ' + '--omit "%s" %s %s %s %s %s' % (prefix, covtool, ','.join(glob_list), + prog, extra_args or '', test_cmd, + single_thread or '-P1', + ' '.join(args) if args else '')) + os.system(cmd) + stdout = command.output(covtool, 'report') + lines = stdout.splitlines() + if required: + # Convert '/path/to/name.py' just the module name 'name' + test_set = set([os.path.splitext(os.path.basename(line.split()[0]))[0] + for line in lines if '/etype/' in line]) + missing_list = required + missing_list.discard('__init__') + missing_list.difference_update(test_set) + if missing_list: + print('Missing tests for %s' % (', '.join(missing_list))) + print(stdout) + ok = False + + coverage = lines[-1].split(' ')[-1] + ok = True + print(coverage) + if coverage != '100%': + print(stdout) + print("To get a report in 'htmlcov/index.html', type: python3-coverage html") + print('Coverage error: %s, but should be 100%%' % coverage) + ok = False + if not ok: + if allow_failures: + # for line in lines: + # print('.', line, re.match(r'^(tools/.*py) *\d+ *(\d+) *(\d+)%$', line)) + lines = [re.match(r'^(tools/.*py) *\d+ *(\d+) *\d+%$', line) + for line in stdout.splitlines()] + bad = [] + for mat in lines: + if mat and mat.group(2) != '0': + fname = mat.group(1) + if fname not in allow_failures: + bad.append(fname) + if not bad: + return + raise ValueError('Test coverage failure') + + +class FullTextTestResult(unittest.TextTestResult): + """A test result class that can print extended text results to a stream + + This is meant to be used by a TestRunner as a result class. Like + TextTestResult, this prints out the names of tests as they are run, + errors as they occur, and a summary of the results at the end of the + test run. Beyond those, this prints information about skipped tests, + expected failures and unexpected successes. + + Args: + stream: A file-like object to write results to + descriptions (bool): True to print descriptions with test names + verbosity (int): Detail of printed output per test as they run + Test stdout and stderr always get printed when buffering + them is disabled by the test runner. In addition to that, + 0: Print nothing + 1: Print a dot per test + 2: Print test names + """ + def __init__(self, stream, descriptions, verbosity): + self.verbosity = verbosity + super().__init__(stream, descriptions, verbosity) + + def printErrors(self): + "Called by TestRunner after test run to summarize the tests" + # The parent class doesn't keep unexpected successes in the same + # format as the rest. Adapt it to what printErrorList expects. + unexpected_successes = [ + (test, 'Test was expected to fail, but succeeded.\n') + for test in self.unexpectedSuccesses + ] + + super().printErrors() # FAIL and ERROR + self.printErrorList('SKIP', self.skipped) + self.printErrorList('XFAIL', self.expectedFailures) + self.printErrorList('XPASS', unexpected_successes) + + def addSkip(self, test, reason): + """Called when a test is skipped.""" + # Add empty line to keep spacing consistent with other results + if not reason.endswith('\n'): + reason += '\n' + super().addSkip(test, reason) + + +def run_test_suites(toolname, debug, verbosity, no_capture, test_preserve_dirs, + processes, test_name, toolpath, class_and_module_list): + """Run a series of test suites and collect the results + + Args: + toolname: Name of the tool that ran the tests + debug: True to enable debugging, which shows a full stack trace on error + verbosity: Verbosity level to use (0-4) + test_preserve_dirs: True to preserve the input directory used by tests + so that it can be examined afterwards (only useful for debugging + tests). If a single test is selected (in args[0]) it also preserves + the output directory for this test. Both directories are displayed + on the command line. + processes: Number of processes to use to run tests (None=same as #CPUs) + test_name: Name of test to run, or None for all + toolpath: List of paths to use for tools + class_and_module_list: List of test classes (type class) and module + names (type str) to run + """ + sys.argv = [sys.argv[0]] + if debug: + sys.argv.append('-D') + if verbosity: + sys.argv.append('-v%d' % verbosity) + if no_capture: + sys.argv.append('-N') + terminal.USE_CAPTURE = False + if toolpath: + for path in toolpath: + sys.argv += ['--toolpath', path] + + suite = unittest.TestSuite() + loader = unittest.TestLoader() + runner = unittest.TextTestRunner( + stream=sys.stdout, + verbosity=(1 if verbosity is None else verbosity), + resultclass=FullTextTestResult, + ) + + if use_concurrent and processes != 1 and not test_name: + suite = ConcurrentTestSuite(suite, + fork_for_tests(processes or multiprocessing.cpu_count())) + + for module in class_and_module_list: + if isinstance(module, str) and (not test_name or test_name == module): + suite.addTests(doctest.DocTestSuite(module)) + + for module in class_and_module_list: + if isinstance(module, str): + continue + # Test the test module about our arguments, if it is interested + if hasattr(module, 'setup_test_args'): + setup_test_args = getattr(module, 'setup_test_args') + setup_test_args(preserve_indir=test_preserve_dirs, + preserve_outdirs=test_preserve_dirs and test_name is not None, + toolpath=toolpath, verbosity=verbosity, no_capture=no_capture) + if test_name: + # Since Python v3.5 If an ImportError or AttributeError occurs + # while traversing a name then a synthetic test that raises that + # error when run will be returned. Check that the requested test + # exists, otherwise these errors are included in the results. + if test_name in loader.getTestCaseNames(module): + suite.addTests(loader.loadTestsFromName(test_name, module)) + else: + suite.addTests(loader.loadTestsFromTestCase(module)) + + print(f" Running {toolname} tests ".center(70, "=")) + result = runner.run(suite) + print() + + return result diff --git a/tools/u_boot_pylib/tools.py b/tools/u_boot_pylib/tools.py new file mode 100644 index 00000000000..1afd289eadd --- /dev/null +++ b/tools/u_boot_pylib/tools.py @@ -0,0 +1,612 @@ +# SPDX-License-Identifier: GPL-2.0+ +# +# Copyright (c) 2016 Google, Inc +# + +import glob +import os +import shlex +import shutil +import sys +import tempfile +import urllib.request + +from u_boot_pylib import command +from u_boot_pylib import tout + +# Output directly (generally this is temporary) +outdir = None + +# True to keep the output directory around after exiting +preserve_outdir = False + +# Path to the Chrome OS chroot, if we know it +chroot_path = None + +# Search paths to use for filename(), used to find files +search_paths = [] + +tool_search_paths = [] + +# Tools and the packages that contain them, on debian +packages = { + 'lz4': 'liblz4-tool', + } + +# List of paths to use when looking for an input file +indir = [] + +def prepare_output_dir(dirname, preserve=False): + """Select an output directory, ensuring it exists. + + This either creates a temporary directory or checks that the one supplied + by the user is valid. For a temporary directory, it makes a note to + remove it later if required. + + Args: + dirname: a string, name of the output directory to use to store + intermediate and output files. If is None - create a temporary + directory. + preserve: a Boolean. If outdir above is None and preserve is False, the + created temporary directory will be destroyed on exit. + + Raises: + OSError: If it cannot create the output directory. + """ + global outdir, preserve_outdir + + preserve_outdir = dirname or preserve + if dirname: + outdir = dirname + if not os.path.isdir(outdir): + try: + os.makedirs(outdir) + except OSError as err: + raise ValueError( + f"Cannot make output directory 'outdir': 'err.strerror'") + tout.debug("Using output directory '%s'" % outdir) + else: + outdir = tempfile.mkdtemp(prefix='binman.') + tout.debug("Using temporary directory '%s'" % outdir) + +def _remove_output_dir(): + global outdir + + shutil.rmtree(outdir) + tout.debug("Deleted temporary directory '%s'" % outdir) + outdir = None + +def finalise_output_dir(): + global outdir, preserve_outdir + + """Tidy up: delete output directory if temporary and not preserved.""" + if outdir and not preserve_outdir: + _remove_output_dir() + outdir = None + +def get_output_filename(fname): + """Return a filename within the output directory. + + Args: + fname: Filename to use for new file + + Returns: + The full path of the filename, within the output directory + """ + return os.path.join(outdir, fname) + +def get_output_dir(): + """Return the current output directory + + Returns: + str: The output directory + """ + return outdir + +def _finalise_for_test(): + """Remove the output directory (for use by tests)""" + global outdir + + if outdir: + _remove_output_dir() + outdir = None + +def set_input_dirs(dirname): + """Add a list of input directories, where input files are kept. + + Args: + dirname: a list of paths to input directories to use for obtaining + files needed by binman to place in the image. + """ + global indir + + indir = dirname + tout.debug("Using input directories %s" % indir) + +def append_input_dirs(dirname): + """Append a list of input directories to the current list of input + directories + + Args: + dirname: a list of paths to input directories to use for obtaining + files needed by binman to place in the image. + """ + global indir + + for dir in dirname: + if dirname not in indir: + indir.append(dirname) + + tout.debug("Updated input directories %s" % indir) + +def get_input_filename(fname, allow_missing=False): + """Return a filename for use as input. + + Args: + fname: Filename to use for new file + allow_missing: True if the filename can be missing + + Returns: + fname, if indir is None; + full path of the filename, within the input directory; + None, if file is missing and allow_missing is True + + Raises: + ValueError if file is missing and allow_missing is False + """ + if not indir or fname[:1] == '/': + return fname + for dirname in indir: + pathname = os.path.join(dirname, fname) + if os.path.exists(pathname): + return pathname + + if allow_missing: + return None + raise ValueError("Filename '%s' not found in input path (%s) (cwd='%s')" % + (fname, ','.join(indir), os.getcwd())) + +def get_input_filename_glob(pattern): + """Return a list of filenames for use as input. + + Args: + pattern: Filename pattern to search for + + Returns: + A list of matching files in all input directories + """ + if not indir: + return glob.glob(pattern) + files = [] + for dirname in indir: + pathname = os.path.join(dirname, pattern) + files += glob.glob(pathname) + return sorted(files) + +def align(pos, align): + if align: + mask = align - 1 + pos = (pos + mask) & ~mask + return pos + +def not_power_of_two(num): + return num and (num & (num - 1)) + +def set_tool_paths(toolpaths): + """Set the path to search for tools + + Args: + toolpaths: List of paths to search for tools executed by run() + """ + global tool_search_paths + + tool_search_paths = toolpaths + +def path_has_file(path_spec, fname): + """Check if a given filename is in the PATH + + Args: + path_spec: Value of PATH variable to check + fname: Filename to check + + Returns: + True if found, False if not + """ + for dir in path_spec.split(':'): + if os.path.exists(os.path.join(dir, fname)): + return True + return False + +def get_host_compile_tool(env, name): + """Get the host-specific version for a compile tool + + This checks the environment variables that specify which version of + the tool should be used (e.g. ${HOSTCC}). + + The following table lists the host-specific versions of the tools + this function resolves to: + + Compile Tool | Host version + --------------+---------------- + as | ${HOSTAS} + ld | ${HOSTLD} + cc | ${HOSTCC} + cpp | ${HOSTCPP} + c++ | ${HOSTCXX} + ar | ${HOSTAR} + nm | ${HOSTNM} + ldr | ${HOSTLDR} + strip | ${HOSTSTRIP} + objcopy | ${HOSTOBJCOPY} + objdump | ${HOSTOBJDUMP} + dtc | ${HOSTDTC} + + Args: + name: Command name to run + + Returns: + host_name: Exact command name to run instead + extra_args: List of extra arguments to pass + """ + host_name = None + extra_args = [] + if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip', + 'objcopy', 'objdump', 'dtc'): + host_name, *host_args = env.get('HOST' + name.upper(), '').split(' ') + elif name == 'c++': + host_name, *host_args = env.get('HOSTCXX', '').split(' ') + + if host_name: + return host_name, extra_args + return name, [] + +def get_target_compile_tool(name, cross_compile=None): + """Get the target-specific version for a compile tool + + This first checks the environment variables that specify which + version of the tool should be used (e.g. ${CC}). If those aren't + specified, it checks the CROSS_COMPILE variable as a prefix for the + tool with some substitutions (e.g. "${CROSS_COMPILE}gcc" for cc). + + The following table lists the target-specific versions of the tools + this function resolves to: + + Compile Tool | First choice | Second choice + --------------+----------------+---------------------------- + as | ${AS} | ${CROSS_COMPILE}as + ld | ${LD} | ${CROSS_COMPILE}ld.bfd + | | or ${CROSS_COMPILE}ld + cc | ${CC} | ${CROSS_COMPILE}gcc + cpp | ${CPP} | ${CROSS_COMPILE}gcc -E + c++ | ${CXX} | ${CROSS_COMPILE}g++ + ar | ${AR} | ${CROSS_COMPILE}ar + nm | ${NM} | ${CROSS_COMPILE}nm + ldr | ${LDR} | ${CROSS_COMPILE}ldr + strip | ${STRIP} | ${CROSS_COMPILE}strip + objcopy | ${OBJCOPY} | ${CROSS_COMPILE}objcopy + objdump | ${OBJDUMP} | ${CROSS_COMPILE}objdump + dtc | ${DTC} | (no CROSS_COMPILE version) + + Args: + name: Command name to run + + Returns: + target_name: Exact command name to run instead + extra_args: List of extra arguments to pass + """ + env = dict(os.environ) + + target_name = None + extra_args = [] + if name in ('as', 'ld', 'cc', 'cpp', 'ar', 'nm', 'ldr', 'strip', + 'objcopy', 'objdump', 'dtc'): + target_name, *extra_args = env.get(name.upper(), '').split(' ') + elif name == 'c++': + target_name, *extra_args = env.get('CXX', '').split(' ') + + if target_name: + return target_name, extra_args + + if cross_compile is None: + cross_compile = env.get('CROSS_COMPILE', '') + + if name in ('as', 'ar', 'nm', 'ldr', 'strip', 'objcopy', 'objdump'): + target_name = cross_compile + name + elif name == 'ld': + try: + if run(cross_compile + 'ld.bfd', '-v'): + target_name = cross_compile + 'ld.bfd' + except: + target_name = cross_compile + 'ld' + elif name == 'cc': + target_name = cross_compile + 'gcc' + elif name == 'cpp': + target_name = cross_compile + 'gcc' + extra_args = ['-E'] + elif name == 'c++': + target_name = cross_compile + 'g++' + else: + target_name = name + return target_name, extra_args + +def get_env_with_path(): + """Get an updated environment with the PATH variable set correctly + + If there are any search paths set, these need to come first in the PATH so + that these override any other version of the tools. + + Returns: + dict: New environment with PATH updated, or None if there are not search + paths + """ + if tool_search_paths: + env = dict(os.environ) + env['PATH'] = ':'.join(tool_search_paths) + ':' + env['PATH'] + return env + +def run_result(name, *args, **kwargs): + """Run a tool with some arguments + + This runs a 'tool', which is a program used by binman to process files and + perhaps produce some output. Tools can be located on the PATH or in a + search path. + + Args: + name: Command name to run + args: Arguments to the tool + for_host: True to resolve the command to the version for the host + for_target: False to run the command as-is, without resolving it + to the version for the compile target + raise_on_error: Raise an error if the command fails (True by default) + + Returns: + CommandResult object + """ + try: + binary = kwargs.get('binary') + for_host = kwargs.get('for_host', False) + for_target = kwargs.get('for_target', not for_host) + raise_on_error = kwargs.get('raise_on_error', True) + env = get_env_with_path() + if for_target: + name, extra_args = get_target_compile_tool(name) + args = tuple(extra_args) + args + elif for_host: + name, extra_args = get_host_compile_tool(env, name) + args = tuple(extra_args) + args + name = os.path.expanduser(name) # Expand paths containing ~ + all_args = (name,) + args + result = command.run_one(*all_args, capture=True, capture_stderr=True, + env=env, raise_on_error=False, binary=binary) + if result.return_code: + if raise_on_error: + raise ValueError("Error %d running '%s': %s" % + (result.return_code,' '.join(all_args), + result.stderr or result.stdout)) + return result + except ValueError: + if env and not path_has_file(env['PATH'], name): + msg = "Please install tool '%s'" % name + package = packages.get(name) + if package: + msg += " (e.g. from package '%s')" % package + raise ValueError(msg) + raise + +def tool_find(name): + """Search the current path for a tool + + This uses both PATH and any value from set_tool_paths() to search for a tool + + Args: + name (str): Name of tool to locate + + Returns: + str: Full path to tool if found, else None + """ + name = os.path.expanduser(name) # Expand paths containing ~ + paths = [] + pathvar = os.environ.get('PATH') + if pathvar: + paths = pathvar.split(':') + if tool_search_paths: + paths += tool_search_paths + for path in paths: + fname = os.path.join(path, name) + if os.path.isfile(fname) and os.access(fname, os.X_OK): + return fname + +def run(name, *args, **kwargs): + """Run a tool with some arguments + + This runs a 'tool', which is a program used by binman to process files and + perhaps produce some output. Tools can be located on the PATH or in a + search path. + + Args: + name: Command name to run + args: Arguments to the tool + for_host: True to resolve the command to the version for the host + for_target: False to run the command as-is, without resolving it + to the version for the compile target + + Returns: + CommandResult object + """ + result = run_result(name, *args, **kwargs) + if result is not None: + return result.stdout + +def filename(fname): + """Resolve a file path to an absolute path. + + If fname starts with ##/ and chroot is available, ##/ gets replaced with + the chroot path. If chroot is not available, this file name can not be + resolved, `None' is returned. + + If fname is not prepended with the above prefix, and is not an existing + file, the actual file name is retrieved from the passed in string and the + search_paths directories (if any) are searched to for the file. If found - + the path to the found file is returned, `None' is returned otherwise. + + Args: + fname: a string, the path to resolve. + + Returns: + Absolute path to the file or None if not found. + """ + if fname.startswith('##/'): + if chroot_path: + fname = os.path.join(chroot_path, fname[3:]) + else: + return None + + # Search for a pathname that exists, and return it if found + if fname and not os.path.exists(fname): + for path in search_paths: + pathname = os.path.join(path, os.path.basename(fname)) + if os.path.exists(pathname): + return pathname + + # If not found, just return the standard, unchanged path + return fname + +def read_file(fname, binary=True): + """Read and return the contents of a file. + + Args: + fname: path to filename to read, where ## signifiies the chroot. + + Returns: + data read from file, as a string. + """ + with open(filename(fname), binary and 'rb' or 'r') as fd: + data = fd.read() + #self._out.Info("Read file '%s' size %d (%#0x)" % + #(fname, len(data), len(data))) + return data + +def write_file(fname, data, binary=True): + """Write data into a file. + + Args: + fname: path to filename to write + data: data to write to file, as a string + """ + #self._out.Info("Write file '%s' size %d (%#0x)" % + #(fname, len(data), len(data))) + with open(filename(fname), binary and 'wb' or 'w') as fd: + fd.write(data) + +def get_bytes(byte, size): + """Get a string of bytes of a given size + + Args: + byte: Numeric byte value to use + size: Size of bytes/string to return + + Returns: + A bytes type with 'byte' repeated 'size' times + """ + return bytes([byte]) * size + +def to_bytes(string): + """Convert a str type into a bytes type + + Args: + string: string to convert + + Returns: + A bytes type + """ + return string.encode('utf-8') + +def to_string(bval): + """Convert a bytes type into a str type + + Args: + bval: bytes value to convert + + Returns: + Python 3: A bytes type + Python 2: A string type + """ + return bval.decode('utf-8') + +def to_hex(val): + """Convert an integer value (or None) to a string + + Returns: + hex value, or 'None' if the value is None + """ + return 'None' if val is None else '%#x' % val + +def to_hex_size(val): + """Return the size of an object in hex + + Returns: + hex value of size, or 'None' if the value is None + """ + return 'None' if val is None else '%#x' % len(val) + +def print_full_help(fname): + """Print the full help message for a tool using an appropriate pager. + + Args: + fname: Path to a file containing the full help message + """ + pager = shlex.split(os.getenv('PAGER', '')) + if not pager: + lesspath = shutil.which('less') + pager = [lesspath] if lesspath else None + if not pager: + pager = ['more'] + command.run(*pager, fname) + +def download(url, tmpdir_pattern='.patman'): + """Download a file to a temporary directory + + Args: + url (str): URL to download + tmpdir_pattern (str): pattern to use for the temporary directory + + Returns: + Tuple: + Full path to the downloaded archive file in that directory, + or None if there was an error while downloading + Temporary directory name + """ + print('- downloading: %s' % url) + leaf = url.split('/')[-1] + tmpdir = tempfile.mkdtemp(tmpdir_pattern) + response = urllib.request.urlopen(url) + fname = os.path.join(tmpdir, leaf) + fd = open(fname, 'wb') + meta = response.info() + size = int(meta.get('Content-Length')) + done = 0 + block_size = 1 << 16 + status = '' + + # Read the file in chunks and show progress as we go + while True: + buffer = response.read(block_size) + if not buffer: + print(chr(8) * (len(status) + 1), '\r', end=' ') + break + + done += len(buffer) + fd.write(buffer) + status = r'%10d MiB [%3d%%]' % (done // 1024 // 1024, + done * 100 // size) + status = status + chr(8) * (len(status) + 1) + print(status, end=' ') + sys.stdout.flush() + print('\r', end='') + sys.stdout.flush() + fd.close() + if done != size: + print('Error, failed to download') + os.remove(fname) + fname = None + return fname, tmpdir diff --git a/tools/u_boot_pylib/tout.py b/tools/u_boot_pylib/tout.py new file mode 100644 index 00000000000..ca72108d6bc --- /dev/null +++ b/tools/u_boot_pylib/tout.py @@ -0,0 +1,190 @@ +# SPDX-License-Identifier: GPL-2.0+ +# Copyright (c) 2016 Google, Inc +# +# Terminal output logging. +# + +import sys + +from u_boot_pylib import terminal + +# Output verbosity levels that we support +FATAL, ERROR, WARNING, NOTICE, INFO, DETAIL, DEBUG = range(7) + +in_progress = False + +""" +This class handles output of progress and other useful information +to the user. It provides for simple verbosity level control and can +output nothing but errors at verbosity zero. + +The idea is that modules set up an Output object early in their years and pass +it around to other modules that need it. This keeps the output under control +of a single class. + +Public properties: + verbose: Verbosity level: 0=silent, 1=progress, 3=full, 4=debug +""" +def __enter__(): + return + +def __exit__(unused1, unused2, unused3): + """Clean up and remove any progress message.""" + clear_progress() + return False + +def user_is_present(): + """This returns True if it is likely that a user is present. + + Sometimes we want to prompt the user, but if no one is there then this + is a waste of time, and may lock a script which should otherwise fail. + + Returns: + True if it thinks the user is there, and False otherwise + """ + return stdout_is_tty and verbose > ERROR + +def clear_progress(): + """Clear any active progress message on the terminal.""" + global in_progress + if verbose > ERROR and stdout_is_tty and in_progress: + _stdout.write('\r%s\r' % (" " * len (_progress))) + _stdout.flush() + in_progress = False + +def progress(msg, warning=False, trailer='...'): + """Display progress information. + + Args: + msg: Message to display. + warning: True if this is a warning.""" + global in_progress + clear_progress() + if verbose > ERROR: + _progress = msg + trailer + if stdout_is_tty: + col = _color.YELLOW if warning else _color.GREEN + _stdout.write('\r' + _color.build(col, _progress)) + _stdout.flush() + in_progress = True + else: + _stdout.write(_progress + '\n') + +def _output(level, msg, color=None): + """Output a message to the terminal. + + Args: + level: Verbosity level for this message. It will only be displayed if + this as high as the currently selected level. + msg; Message to display. + error: True if this is an error message, else False. + """ + if verbose >= level: + clear_progress() + if color: + msg = _color.build(color, msg) + if level < NOTICE: + print(msg, file=sys.stderr) + else: + print(msg) + if level == FATAL: + sys.exit(1) + +def do_output(level, msg): + """Output a message to the terminal. + + Args: + level: Verbosity level for this message. It will only be displayed if + this as high as the currently selected level. + msg; Message to display. + """ + _output(level, msg) + +def fatal(msg): + """Display an error message and exit + + Args: + msg; Message to display. + """ + _output(FATAL, msg, _color.RED) + +def error(msg): + """Display an error message + + Args: + msg; Message to display. + """ + _output(ERROR, msg, _color.RED) + +def warning(msg): + """Display a warning message + + Args: + msg; Message to display. + """ + _output(WARNING, msg, _color.YELLOW) + +def notice(msg): + """Display an important infomation message + + Args: + msg; Message to display. + """ + _output(NOTICE, msg) + +def info(msg): + """Display an infomation message + + Args: + msg; Message to display. + """ + _output(INFO, msg) + +def detail(msg): + """Display a detailed message + + Args: + msg; Message to display. + """ + _output(DETAIL, msg) + +def debug(msg): + """Display a debug message + + Args: + msg; Message to display. + """ + _output(DEBUG, msg) + +def user_output(msg): + """Display a message regardless of the current output level. + + This is used when the output was specifically requested by the user. + Args: + msg; Message to display. + """ + _output(ERROR, msg) + +def init(_verbose=WARNING, stdout=sys.stdout, allow_colour=True): + """Initialize a new output object. + + Args: + verbose: Verbosity level (0-6). + stdout: File to use for stdout. + """ + global verbose, _progress, _color, _stdout, stdout_is_tty + + verbose = _verbose + _progress = '' # Our last progress message + _color = terminal.Color(terminal.COLOR_IF_TERMINAL if allow_colour + else terminal.COLOR_NEVER) + _stdout = stdout + + # TODO(sjg): Move this into Chromite libraries when we have them + stdout_is_tty = hasattr(sys.stdout, 'isatty') and sys.stdout.isatty() + stderr_is_tty = hasattr(sys.stderr, 'isatty') and sys.stderr.isatty() + +def uninit(): + clear_progress() + +init() diff --git a/tools/u_boot_pylib/u_boot_pylib b/tools/u_boot_pylib/u_boot_pylib new file mode 120000 index 00000000000..5a427d19424 --- /dev/null +++ b/tools/u_boot_pylib/u_boot_pylib @@ -0,0 +1 @@ +__main__.py
\ No newline at end of file |