# SPDX-License-Identifier: GPL-2.0+ # Copyright 2025 Simon Glass # """Functional tests for checking that patman behaves correctly""" import asyncio from datetime import datetime import os import re import unittest from unittest import mock import pygit2 from u_boot_pylib import cros_subprocess from u_boot_pylib import gitutil from u_boot_pylib import terminal from u_boot_pylib import tools from patman import cmdline from patman import control from patman import cser_helper from patman import cseries from patman.database import Pcommit from patman import database from patman import patchstream from patman.patchwork import Patchwork from patman.test_common import TestCommon HASH_RE = r'[0-9a-f]+' #pylint: disable=protected-access class Namespace: """Simple namespace for use instead of argparse in tests""" def __init__(self, **kwargs): self.__dict__.update(kwargs) class TestCseries(unittest.TestCase, TestCommon): """Test cases for the Cseries class In some cases there are tests for both direct Cseries calls and for accessing the feature via the cmdline. It is possible to do this with mocks but it is a bit painful to catch all cases that way. The approach here is to create a check_...() function which yields back to the test routines to make the call or run the command. The check_...() function typically yields a Cseries while it is working and False when it is done, allowing the test to check that everything is finished. Some subcommands don't have command tests, if it would be duplicative. Some tests avoid using the check_...() function and just write the test out twice, if it would be too confusing to use a coroutine. Note the -N flag which sort-of disables capturing of output, although in fact it is still captured, just output at the end. When debugging the code you may need to temporarily comment out the 'with terminal.capture()' parts. """ def setUp(self): TestCommon.setUp(self) self.autolink_extra = None self.loop = asyncio.get_event_loop() self.cser = None def tearDown(self): TestCommon.tearDown(self) class _Stage: def __init__(self, name): self.name = name def __enter__(self): if not terminal.USE_CAPTURE: print(f"--- starting '{self.name}'") def __exit__(self, exc_type, exc_val, exc_tb): if not terminal.USE_CAPTURE: print(f"--- finished '{self.name}'\n") def stage(self, name): """Context manager to count requests across a range of patchwork calls Args: name (str): Stage name Return: _Stage: contect object Usage: with self.stage('name'): ...do things Note that the output only appears if the -N flag is used """ return self._Stage(name) def assert_finished(self, itr): """Assert that an iterator is finished Args: itr (iter): Iterator to check """ self.assertFalse(list(itr)) def test_database_setup(self): """Check setting up of the series database""" cser = cseries.Cseries(self.tmpdir) with terminal.capture() as (_, err): cser.open_database() self.assertEqual(f'Creating new database {self.tmpdir}/.patman.db', err.getvalue().strip()) res = cser.db.execute("SELECT name FROM series") self.assertTrue(res) cser.close_database() def get_database(self): """Open the database and silence the warning output Return: Cseries: Resulting Cseries object """ cser = cseries.Cseries(self.tmpdir, terminal.COLOR_NEVER) with terminal.capture() as _: cser.open_database() self.cser = cser return cser def get_cser(self): """Set up a git tree and database Return: Cseries: object """ self.make_git_tree() return self.get_database() def db_close(self): """Close the database if open""" if self.cser and self.cser.db.cur: self.cser.close_database() return True return False def db_open(self): """Open the database if closed""" if self.cser and not self.cser.db.cur: self.cser.open_database() def run_args(self, *argv, expect_ret=0, pwork=None, cser=None): """Run patman with the given arguments Args: argv (list of str): List of arguments, excluding 'patman' expect_ret (int): Expected return code, used to check errors pwork (Patchwork): Patchwork object to use when executing the command, or None to create one cser (Cseries): Cseries object to use when executing the command, or None to create one """ was_open = self.db_close() args = cmdline.parse_args(['-D'] + list(argv), config_fname=False) exit_code = control.do_patman(args, self.tmpdir, pwork, cser) self.assertEqual(expect_ret, exit_code) if was_open: self.db_open() def test_series_add(self): """Test adding a new cseries""" cser = self.get_cser() self.assertFalse(cser.db.series_get_dict()) with terminal.capture() as (out, _): cser.add('first', 'my description', allow_unmarked=True) lines = out.getvalue().strip().splitlines() self.assertEqual( "Adding series 'first' v1: mark False allow_unmarked True", lines[0]) self.assertEqual("Added series 'first' v1 (2 commits)", lines[1]) self.assertEqual(2, len(lines)) slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) self.assertEqual('first', slist['first'].name) self.assertEqual('my description', slist['first'].desc) svlist = cser.get_ser_ver_list() self.assertEqual(1, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(1, svlist[0].version) pclist = cser.get_pcommit_dict() self.assertEqual(2, len(pclist)) self.assertIn(1, pclist) self.assertEqual( Pcommit(1, 0, 'i2c: I2C things', 1, None, None, None, None), pclist[1]) self.assertEqual( Pcommit(2, 1, 'spi: SPI fixes', 1, None, None, None, None), pclist[2]) def test_series_not_checked_out(self): """Test adding a new cseries when a different one is checked out""" cser = self.get_cser() self.assertFalse(cser.db.series_get_dict()) with terminal.capture() as (out, _): cser.add('second', allow_unmarked=True) lines = out.getvalue().strip().splitlines() self.assertEqual( "Adding series 'second' v1: mark False allow_unmarked True", lines[0]) self.assertEqual("Added series 'second' v1 (3 commits)", lines[1]) self.assertEqual(2, len(lines)) def test_series_add_manual(self): """Test adding a new cseries with a version number""" cser = self.get_cser() self.assertFalse(cser.db.series_get_dict()) repo = pygit2.init_repository(self.gitdir) first_target = repo.revparse_single('first') repo.branches.local.create('first2', first_target) repo.config.set_multivar('branch.first2.remote', '', '.') repo.config.set_multivar('branch.first2.merge', '', 'refs/heads/base') with terminal.capture() as (out, _): cser.add('first2', 'description', allow_unmarked=True) lines = out.getvalue().splitlines() self.assertEqual( "Adding series 'first' v2: mark False allow_unmarked True", lines[0]) self.assertEqual("Added series 'first' v2 (2 commits)", lines[1]) self.assertEqual(2, len(lines)) slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) self.assertEqual('first', slist['first'].name) # We should have just one entry, with version 2 svlist = cser.get_ser_ver_list() self.assertEqual(1, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(2, svlist[0].version) def add_first2(self, checkout): """Add a new first2 branch, a copy of first""" repo = pygit2.init_repository(self.gitdir) first_target = repo.revparse_single('first') repo.branches.local.create('first2', first_target) repo.config.set_multivar('branch.first2.remote', '', '.') repo.config.set_multivar('branch.first2.merge', '', 'refs/heads/base') if checkout: target = repo.lookup_reference('refs/heads/first2') repo.checkout(target, strategy=pygit2.enums.CheckoutStrategy.FORCE) def test_series_add_different(self): """Test adding a different version of a series from that checked out""" cser = self.get_cser() self.add_first2(True) # Add first2 initially with terminal.capture() as (out, _): cser.add(None, 'description', allow_unmarked=True) lines = out.getvalue().splitlines() self.assertEqual( "Adding series 'first' v2: mark False allow_unmarked True", lines[0]) self.assertEqual("Added series 'first' v2 (2 commits)", lines[1]) self.assertEqual(2, len(lines)) # Now add first: it should be added as a new version with terminal.capture() as (out, _): cser.add('first', 'description', allow_unmarked=True) lines = out.getvalue().splitlines() self.assertEqual( "Adding series 'first' v1: mark False allow_unmarked True", lines[0]) self.assertEqual( "Added v1 to existing series 'first' (2 commits)", lines[1]) self.assertEqual(2, len(lines)) slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) self.assertEqual('first', slist['first'].name) # We should have two entries, one of each version svlist = cser.get_ser_ver_list() self.assertEqual(2, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(2, svlist[0].version) self.assertEqual(2, svlist[1].idnum) self.assertEqual(1, svlist[1].series_id) self.assertEqual(1, svlist[1].version) def test_series_add_dup(self): """Test adding a series twice""" cser = self.get_cser() with terminal.capture() as (out, _): cser.add(None, 'description', allow_unmarked=True) with terminal.capture() as (out, _): cser.add(None, 'description', allow_unmarked=True) self.assertIn("Series 'first' v1 already exists", out.getvalue().strip()) self.add_first2(False) with terminal.capture() as (out, _): cser.add('first2', 'description', allow_unmarked=True) lines = out.getvalue().splitlines() self.assertEqual( "Added v2 to existing series 'first' (2 commits)", lines[1]) def test_series_add_dup_reverse(self): """Test adding a series twice, v2 then v1""" cser = self.get_cser() self.add_first2(True) with terminal.capture() as (out, _): cser.add(None, 'description', allow_unmarked=True) self.assertIn("Added series 'first' v2", out.getvalue().strip()) with terminal.capture() as (out, _): cser.add('first', 'description', allow_unmarked=True) self.assertIn("Added v1 to existing series 'first'", out.getvalue().strip()) def test_series_add_dup_reverse_cmdline(self): """Test adding a series twice, v2 then v1""" cser = self.get_cser() self.add_first2(True) with terminal.capture() as (out, _): self.run_args('series', 'add', '-M', '-D', 'description', pwork=True) self.assertIn("Added series 'first' v2 (2 commits)", out.getvalue().strip()) with terminal.capture() as (out, _): self.run_args('series', '-s', 'first', 'add', '-M', '-D', 'description', pwork=True) cser.add('first', 'description', allow_unmarked=True) self.assertIn("Added v1 to existing series 'first'", out.getvalue().strip()) def test_series_add_skip_version(self): """Test adding a series which is v4 but has no earlier version""" cser = self.get_cser() with terminal.capture() as (out, _): cser.add('third4', 'The glorious third series', mark=False, allow_unmarked=True) lines = out.getvalue().splitlines() self.assertEqual( "Adding series 'third' v4: mark False allow_unmarked True", lines[0]) self.assertEqual("Added series 'third' v4 (4 commits)", lines[1]) self.assertEqual(2, len(lines)) sdict = cser.db.series_get_dict() self.assertIn('third', sdict) chk = sdict['third'] self.assertEqual('third', chk['name']) self.assertEqual('The glorious third series', chk['desc']) svid = cser.get_series_svid(chk['idnum'], 4) self.assertEqual(4, len(cser.get_pcommit_dict(svid))) # Remove the series and add it again with just two commits with terminal.capture(): cser.remove('third4') with terminal.capture() as (out, _): cser.add('third4', 'The glorious third series', mark=False, allow_unmarked=True, end='third4~2') lines = out.getvalue().splitlines() self.assertEqual( "Adding series 'third' v4: mark False allow_unmarked True", lines[0]) self.assertRegex( lines[1], 'Ending before .* main: Change to the main program') self.assertEqual("Added series 'third' v4 (2 commits)", lines[2]) sdict = cser.db.series_get_dict() self.assertIn('third', sdict) chk = sdict['third'] self.assertEqual('third', chk['name']) self.assertEqual('The glorious third series', chk['desc']) svid = cser.get_series_svid(chk['idnum'], 4) self.assertEqual(2, len(cser.get_pcommit_dict(svid))) def test_series_add_wrong_version(self): """Test adding a series with an incorrect branch name or version This updates branch 'first' to have version 2, then tries to add it. """ cser = self.get_cser() self.assertFalse(cser.db.series_get_dict()) with terminal.capture(): _, ser, max_vers, _ = cser.prep_series('first') cser.update_series('first', ser, max_vers, None, False, add_vers=2) with self.assertRaises(ValueError) as exc: with terminal.capture(): cser.add('first', 'my description', allow_unmarked=True) self.assertEqual( "Series name 'first' suggests version 1 but Series-version tag " 'indicates 2 (see --force-version)', str(exc.exception)) # Now try again with --force-version which should force version 1 with terminal.capture() as (out, _): cser.add('first', 'my description', allow_unmarked=True, force_version=True) itr = iter(out.getvalue().splitlines()) self.assertEqual( "Adding series 'first' v1: mark False allow_unmarked True", next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual( "Processing 2 commits from branch 'first'", next(itr)) self.assertRegex(next(itr), f'- {HASH_RE} as {HASH_RE} i2c: I2C things') self.assertRegex(next(itr), f'- rm v1: {HASH_RE} as {HASH_RE} spi: SPI fixes') self.assertRegex(next(itr), f'Updating branch first from {HASH_RE} to {HASH_RE}') self.assertEqual("Added series 'first' v1 (2 commits)", next(itr)) try: self.assertEqual('extra line', next(itr)) except StopIteration: pass # Since this is v1 the Series-version tag should have been removed series = patchstream.get_metadata('first', 0, 2, git_dir=self.gitdir) self.assertNotIn('version', series) def _fake_patchwork_cser(self, subpath): """Fake Patchwork server for the function below This handles accessing various things used by the tests below. It has hard-coded data, about from self.autolink_extra which can be adjusted by the test. Args: subpath (str): URL subpath to use """ # Get a list of projects if subpath == 'projects/': return [ {'id': self.PROJ_ID, 'name': 'U-Boot', 'link_name': self.PROJ_LINK_NAME}, {'id': 9, 'name': 'other', 'link_name': 'other'} ] # Search for series by their cover-letter name re_search = re.match(r'series/\?project=(\d+)&q=.*$', subpath) if re_search: result = [ {'id': 56, 'name': 'contains first name', 'version': 1}, {'id': 43, 'name': 'has first in it', 'version': 1}, {'id': 1234, 'name': 'first series', 'version': 1}, {'id': self.SERIES_ID_SECOND_V1, 'name': self.TITLE_SECOND, 'version': 1}, {'id': self.SERIES_ID_SECOND_V2, 'name': self.TITLE_SECOND, 'version': 2}, {'id': 12345, 'name': 'i2c: I2C things', 'version': 1}, ] if self.autolink_extra: result += [self.autolink_extra] return result # Read information about a series, given its link (patchwork series ID) m_series = re.match(r'series/(\d+)/$', subpath) series_id = int(m_series.group(1)) if m_series else '' if series_id: if series_id == self.SERIES_ID_SECOND_V1: # series 'second' return { 'patches': [ {'id': '10', 'name': '[PATCH,1/3] video: Some video improvements', 'content': ''}, {'id': '11', 'name': '[PATCH,2/3] serial: Add a serial driver', 'content': ''}, {'id': '12', 'name': '[PATCH,3/3] bootm: Make it boot', 'content': ''}, ], 'cover_letter': { 'id': 39, 'name': 'The name of the cover letter', } } if series_id == self.SERIES_ID_SECOND_V2: # series 'second2' return { 'patches': [ {'id': '110', 'name': '[PATCH,v2,1/3] video: Some video improvements', 'content': ''}, {'id': '111', 'name': '[PATCH,v2,2/3] serial: Add a serial driver', 'content': ''}, {'id': '112', 'name': '[PATCH,v2,3/3] bootm: Make it boot', 'content': ''}, ], 'cover_letter': { 'id': 139, 'name': 'The name of the cover letter', } } if series_id == self.SERIES_ID_FIRST_V3: # series 'first3' return { 'patches': [ {'id': 20, 'name': '[PATCH,v3,1/2] i2c: I2C things', 'content': ''}, {'id': 21, 'name': '[PATCH,v3,2/2] spi: SPI fixes', 'content': ''}, ], 'cover_letter': { 'id': 29, 'name': 'Cover letter for first', } } if series_id == 123: return { 'patches': [ {'id': 20, 'name': '[PATCH,1/2] i2c: I2C things', 'content': ''}, {'id': 21, 'name': '[PATCH,2/2] spi: SPI fixes', 'content': ''}, ], } if series_id == 1234: return { 'patches': [ {'id': 20, 'name': '[PATCH,v2,1/2] i2c: I2C things', 'content': ''}, {'id': 21, 'name': '[PATCH,v2,2/2] spi: SPI fixes', 'content': ''}, ], } raise ValueError(f'Fake Patchwork unknown series_id: {series_id}') # Read patch status m_pat = re.search(r'patches/(\d*)/$', subpath) patch_id = int(m_pat.group(1)) if m_pat else '' if patch_id: if patch_id in [10, 110]: return {'state': 'accepted', 'content': 'Reviewed-by: Fred Bloggs '} if patch_id in [11, 111]: return {'state': 'changes-requested', 'content': ''} if patch_id in [12, 112]: return {'state': 'rejected', 'content': "I don't like this at all, sorry"} if patch_id == 20: return {'state': 'awaiting-upstream', 'content': ''} if patch_id == 21: return {'state': 'not-applicable', 'content': ''} raise ValueError(f'Fake Patchwork unknown patch_id: {patch_id}') # Read comments a from patch m_comm = re.search(r'patches/(\d*)/comments/', subpath) patch_id = int(m_comm.group(1)) if m_comm else '' if patch_id: if patch_id in [10, 110]: return [ {'id': 1, 'content': ''}, {'id': 2, 'content': '''On some date Mary Smith wrote: > This was my original patch > which is being quoted I like the approach here and I would love to see more of it. Reviewed-by: Fred Bloggs ''', 'submitter': { 'name': 'Fred Bloggs', 'email': 'fred@bloggs.com', } }, ] if patch_id in [11, 111]: return [] if patch_id in [12, 112]: return [ {'id': 4, 'content': ''}, {'id': 5, 'content': ''}, {'id': 6, 'content': ''}, ] if patch_id == 20: return [ {'id': 7, 'content': '''On some date Alex Miller wrote: > Sometimes we need to create a patch. > This is one of those times Tested-by: Mary Smith # yak '''}, {'id': 8, 'content': ''}, ] if patch_id == 21: return [] raise ValueError( f'Fake Patchwork does not understand patch_id {patch_id}: ' f'{subpath}') # Read comments from a cover letter m_cover_id = re.search(r'covers/(\d*)/comments/', subpath) cover_id = int(m_cover_id.group(1)) if m_cover_id else '' if cover_id: if cover_id in [39, 139]: return [ {'content': 'some comment', 'submitter': { 'name': 'A user', 'email': 'user@user.com', }, 'date': 'Sun 13 Apr 14:06:02 MDT 2025', }, {'content': 'another comment', 'submitter': { 'name': 'Ghenkis Khan', 'email': 'gk@eurasia.gov', }, 'date': 'Sun 13 Apr 13:06:02 MDT 2025', }, ] if cover_id == 29: return [] raise ValueError(f'Fake Patchwork unknown cover_id: {cover_id}') raise ValueError(f'Fake Patchwork does not understand: {subpath}') def setup_second(self, do_sync=True): """Set up the 'second' series synced with the fake patchwork Args: do_sync (bool): True to sync the series Return: tuple: Cseries: New Cseries object pwork: Patchwork object """ with self.stage('setup second'): cser = self.get_cser() pwork = Patchwork.for_testing(self._fake_patchwork_cser) pwork.project_set(self.PROJ_ID, self.PROJ_LINK_NAME) with terminal.capture() as (out, _): cser.add('first', '', allow_unmarked=True) cser.add('second', allow_unmarked=True) series = patchstream.get_metadata_for_list('second', self.gitdir, 3) self.assertEqual('456', series.links) with terminal.capture() as (out, _): cser.increment('second') series = patchstream.get_metadata_for_list('second', self.gitdir, 3) self.assertEqual('456', series.links) series = patchstream.get_metadata_for_list('second2', self.gitdir, 3) self.assertEqual('1:456', series.links) if do_sync: with terminal.capture() as (out, _): cser.link_auto(pwork, 'second', 2, True) with terminal.capture() as (out, _): cser.gather(pwork, 'second', 2, False, True, False) lines = out.getvalue().splitlines() self.assertEqual( "Updating series 'second' version 2 from link '457'", lines[0]) self.assertEqual( '3 patches and cover letter updated (8 requests)', lines[1]) self.assertEqual(2, len(lines)) return cser, pwork def test_series_add_no_cover(self): """Test patchwork when adding a series which has no cover letter""" cser = self.get_cser() pwork = Patchwork.for_testing(self._fake_patchwork_cser) pwork.project_set(self.PROJ_ID, self.PROJ_LINK_NAME) with terminal.capture() as (out, _): cser.add('first', 'my name for this', mark=False, allow_unmarked=True) self.assertIn("Added series 'first' v1 (2 commits)", out.getvalue()) with terminal.capture() as (out, _): cser.link_auto(pwork, 'first', 1, True) self.assertIn("Setting link for series 'first' v1 to 12345", out.getvalue()) def test_series_list(self): """Test listing cseries""" self.setup_second() self.db_close() args = Namespace(subcmd='ls') with terminal.capture() as (out, _): control.do_series(args, test_db=self.tmpdir, pwork=True) lines = out.getvalue().splitlines() self.assertEqual(5, len(lines)) self.assertEqual( 'Name Description ' 'Accepted Versions', lines[0]) self.assertTrue(lines[1].startswith('--')) self.assertEqual( 'first ' ' -/2 1', lines[2]) self.assertEqual( 'second Series for my board ' ' 1/3 1 2', lines[3]) self.assertTrue(lines[4].startswith('--')) def test_do_series_add(self): """Add a new cseries""" self.make_git_tree() args = Namespace(subcmd='add', desc='my-description', series='first', mark=False, allow_unmarked=True, upstream=None, dry_run=False) with terminal.capture() as (out, _): control.do_series(args, test_db=self.tmpdir, pwork=True) cser = self.get_database() slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) ser = slist.get('first') self.assertTrue(ser) self.assertEqual('first', ser.name) self.assertEqual('my-description', ser.desc) self.db_close() args.subcmd = 'ls' with terminal.capture() as (out, _): control.do_series(args, test_db=self.tmpdir, pwork=True) lines = out.getvalue().splitlines() self.assertEqual(4, len(lines)) self.assertTrue(lines[1].startswith('--')) self.assertEqual( 'first my-description ' '-/2 1', lines[2]) def test_do_series_add_cmdline(self): """Add a new cseries using the cmdline""" self.make_git_tree() with terminal.capture(): self.run_args('series', '-s', 'first', 'add', '-M', '-D', 'my-description', pwork=True) cser = self.get_database() slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) ser = slist.get('first') self.assertTrue(ser) self.assertEqual('first', ser.name) self.assertEqual('my-description', ser.desc) def test_do_series_add_auto(self): """Add a new cseries without any arguments""" self.make_git_tree() # Use the 'second' branch, which has a cover letter gitutil.checkout('second', self.gitdir, work_tree=self.tmpdir, force=True) args = Namespace(subcmd='add', series=None, mark=False, allow_unmarked=True, upstream=None, dry_run=False, desc=None) with terminal.capture(): control.do_series(args, test_db=self.tmpdir, pwork=True) cser = self.get_database() slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) ser = slist.get('second') self.assertTrue(ser) self.assertEqual('second', ser.name) self.assertEqual('Series for my board', ser.desc) cser.close_database() def _check_inc(self, out): """Check output from an 'increment' operation Args: out (StringIO): Text to check """ itr = iter(out.getvalue().splitlines()) self.assertEqual("Increment 'first' v1: 2 patches", next(itr)) self.assertRegex(next(itr), 'Checking out upstream commit .*') self.assertEqual("Processing 2 commits from branch 'first2'", next(itr)) self.assertRegex(next(itr), f'- {HASH_RE} as {HASH_RE} i2c: I2C things') self.assertRegex(next(itr), f'- add v2: {HASH_RE} as {HASH_RE} spi: SPI fixes') self.assertRegex( next(itr), f'Updating branch first2 from {HASH_RE} to {HASH_RE}') self.assertEqual('Added new branch first2', next(itr)) return itr def test_series_link(self): """Test adding a patchwork link to a cseries""" cser = self.get_cser() repo = pygit2.init_repository(self.gitdir) first = repo.lookup_branch('first').peel( pygit2.enums.ObjectType.COMMIT).oid base = repo.lookup_branch('base').peel( pygit2.enums.ObjectType.COMMIT).oid gitutil.checkout('first', self.gitdir, work_tree=self.tmpdir, force=True) with terminal.capture() as (out, _): cser.add('first', '', allow_unmarked=True) with self.assertRaises(ValueError) as exc: cser.link_set('first', 2, '1234', True) self.assertEqual("Series 'first' does not have a version 2", str(exc.exception)) self.assertEqual('first', gitutil.get_branch(self.gitdir)) with terminal.capture() as (out, _): cser.increment('first') self.assertTrue(repo.lookup_branch('first2')) with terminal.capture() as (out, _): cser.link_set('first', 2, '2345', True) lines = out.getvalue().splitlines() self.assertEqual(6, len(lines)) self.assertRegex( lines[0], 'Checking out upstream commit refs/heads/base: .*') self.assertEqual("Processing 2 commits from branch 'first2'", lines[1]) self.assertRegex( lines[2], f'- {HASH_RE} as {HASH_RE} i2c: I2C things') self.assertRegex( lines[3], f"- add v2 links '2:2345': {HASH_RE} as {HASH_RE} spi: SPI fixes") self.assertRegex( lines[4], f'Updating branch first2 from {HASH_RE} to {HASH_RE}') self.assertEqual("Setting link for series 'first' v2 to 2345", lines[5]) self.assertEqual('2345', cser.link_get('first', 2)) series = patchstream.get_metadata_for_list('first2', self.gitdir, 2) self.assertEqual('2:2345', series.links) self.assertEqual('first2', gitutil.get_branch(self.gitdir)) # Check the original series was left alone self.assertEqual( first, repo.lookup_branch('first').peel( pygit2.enums.ObjectType.COMMIT).oid) count = 2 series1 = patchstream.get_metadata_for_list('first', self.gitdir, count) self.assertFalse('links' in series1) self.assertFalse('version' in series1) # Check that base is left alone self.assertEqual( base, repo.lookup_branch('base').peel( pygit2.enums.ObjectType.COMMIT).oid) series1 = patchstream.get_metadata_for_list('base', self.gitdir, count) self.assertFalse('links' in series1) self.assertFalse('version' in series1) # Check out second and try to update first gitutil.checkout('second', self.gitdir, work_tree=self.tmpdir, force=True) with terminal.capture(): cser.link_set('first', 1, '16', True) # Overwrite the link with terminal.capture(): cser.link_set('first', 1, '17', True) series2 = patchstream.get_metadata_for_list('first', self.gitdir, count) self.assertEqual('1:17', series2.links) def test_series_link_cmdline(self): """Test adding a patchwork link to a cseries using the cmdline""" cser = self.get_cser() gitutil.checkout('first', self.gitdir, work_tree=self.tmpdir, force=True) with terminal.capture() as (out, _): cser.add('first', '', allow_unmarked=True) with terminal.capture() as (out, _): self.run_args('series', '-s', 'first', '-V', '4', 'set-link', '-u', '1234', expect_ret=1, pwork=True) self.assertIn("Series 'first' does not have a version 4", out.getvalue()) with self.assertRaises(ValueError) as exc: cser.link_get('first', 4) self.assertEqual("Series 'first' does not have a version 4", str(exc.exception)) with terminal.capture() as (out, _): cser.increment('first') with self.assertRaises(ValueError) as exc: cser.link_get('first', 4) self.assertEqual("Series 'first' does not have a version 4", str(exc.exception)) with terminal.capture() as (out, _): cser.increment('first') cser.increment('first') with terminal.capture() as (out, _): self.run_args('series', '-s', 'first', '-V', '4', 'set-link', '-u', '1234', pwork=True) lines = out.getvalue().splitlines() self.assertRegex( lines[-3], f"- add v4 links '4:1234': {HASH_RE} as {HASH_RE} spi: SPI fixes") self.assertEqual("Setting link for series 'first' v4 to 1234", lines[-1]) with terminal.capture() as (out, _): self.run_args('series', '-s', 'first', '-V', '4', 'get-link', pwork=True) self.assertIn('1234', out.getvalue()) series = patchstream.get_metadata_for_list('first4', self.gitdir, 1) self.assertEqual('4:1234', series.links) with terminal.capture() as (out, _): self.run_args('series', '-s', 'first', '-V', '5', 'get-link', expect_ret=1, pwork=True) self.assertIn("Series 'first' does not have a version 5", out.getvalue()) # Checkout 'first' and try to get the link from 'first4' gitutil.checkout('first', self.gitdir, work_tree=self.tmpdir, force=True) with terminal.capture() as (out, _): self.run_args('series', '-s', 'first4', 'get-link', pwork=True) self.assertIn('1234', out.getvalue()) # This should get the link for 'first' with terminal.capture() as (out, _): self.run_args('series', 'get-link', pwork=True) self.assertIn('None', out.getvalue()) # Checkout 'first4' again; this should get the link for 'first4' gitutil.checkout('first4', self.gitdir, work_tree=self.tmpdir, force=True) with terminal.capture() as (out, _): self.run_args('series', 'get-link', pwork=True) self.assertIn('1234', out.getvalue()) def test_series_link_auto_version(self): """Test finding the patchwork link for a cseries automatically""" cser = self.get_cser() with terminal.capture() as (out, _): cser.add('second', allow_unmarked=True) # Make sure that the link is there count = 3 series = patchstream.get_metadata('second', 0, count, git_dir=self.gitdir) self.assertEqual(f'{self.SERIES_ID_SECOND_V1}', series.links) # Set link with detected version with terminal.capture() as (out, _): cser.link_set('second', None, f'{self.SERIES_ID_SECOND_V1}', True) self.assertEqual( "Setting link for series 'second' v1 to 456", out.getvalue().splitlines()[-1]) # Make sure that the link was set series = patchstream.get_metadata('second', 0, count, git_dir=self.gitdir) self.assertEqual(f'1:{self.SERIES_ID_SECOND_V1}', series.links) with terminal.capture(): cser.increment('second') # Make sure that the new series gets the same link series = patchstream.get_metadata('second2', 0, 3, git_dir=self.gitdir) pwork = Patchwork.for_testing(self._fake_patchwork_cser) pwork.project_set(self.PROJ_ID, self.PROJ_LINK_NAME) self.assertFalse(cser.project_get()) cser.project_set(pwork, 'U-Boot', quiet=True) self.assertEqual( (self.SERIES_ID_SECOND_V1, None, 'second', 1, 'Series for my board'), cser.link_search(pwork, 'second', 1)) with terminal.capture(): cser.increment('second') self.assertEqual((457, None, 'second', 2, 'Series for my board'), cser.link_search(pwork, 'second', 2)) def test_series_link_auto_name(self): """Test finding the patchwork link for a cseries with auto name""" cser = self.get_cser() with terminal.capture() as (out, _): cser.add('first', '', allow_unmarked=True) # Set link with detected name with self.assertRaises(ValueError) as exc: cser.link_set(None, 2, '2345', True) self.assertEqual( "Series 'first' does not have a version 2", str(exc.exception)) with terminal.capture(): cser.increment('first') with terminal.capture() as (out, _): cser.link_set(None, 2, '2345', True) self.assertEqual( "Setting link for series 'first' v2 to 2345", out.getvalue().splitlines()[-1]) svlist = cser.get_ser_ver_list() self.assertEqual(2, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(1, svlist[0].version) self.assertIsNone(svlist[0].link) self.assertEqual(2, svlist[1].idnum) self.assertEqual(1, svlist[1].series_id) self.assertEqual(2, svlist[1].version) self.assertEqual('2345', svlist[1].link) def test_series_link_auto_name_version(self): """Find patchwork link for a cseries with auto name + version""" cser = self.get_cser() with terminal.capture() as (out, _): cser.add('first', '', allow_unmarked=True) # Set link with detected name and version with terminal.capture() as (out, _): cser.link_set(None, None, '1234', True) self.assertEqual( "Setting link for series 'first' v1 to 1234", out.getvalue().splitlines()[-1]) with terminal.capture(): cser.increment('first') with terminal.capture() as (out, _): cser.link_set(None, None, '2345', True) self.assertEqual( "Setting link for series 'first' v2 to 2345", out.getvalue().splitlines()[-1]) svlist = cser.get_ser_ver_list() self.assertEqual(2, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(1, svlist[0].version) self.assertEqual('1234', svlist[0].link) self.assertEqual(2, svlist[1].idnum) self.assertEqual(1, svlist[1].series_id) self.assertEqual(2, svlist[1].version) self.assertEqual('2345', svlist[1].link) def test_series_link_missing(self): """Test finding patchwork link for a cseries but it is missing""" cser = self.get_cser() with terminal.capture(): cser.add('second', allow_unmarked=True) with terminal.capture(): cser.increment('second') cser.increment('second') pwork = Patchwork.for_testing(self._fake_patchwork_cser) pwork.project_set(self.PROJ_ID, self.PROJ_LINK_NAME) self.assertFalse(cser.project_get()) cser.project_set(pwork, 'U-Boot', quiet=True) self.assertEqual( (self.SERIES_ID_SECOND_V1, None, 'second', 1, 'Series for my board'), cser.link_search(pwork, 'second', 1)) self.assertEqual((457, None, 'second', 2, 'Series for my board'), cser.link_search(pwork, 'second', 2)) res = cser.link_search(pwork, 'second', 3) self.assertEqual( (None, [{'id': self.SERIES_ID_SECOND_V1, 'name': 'Series for my board', 'version': 1}, {'id': 457, 'name': 'Series for my board', 'version': 2}], 'second', 3, 'Series for my board'), res) def check_series_autolink(self): """Common code for autolink tests""" cser = self.get_cser() with self.stage('setup'): pwork = Patchwork.for_testing(self._fake_patchwork_cser) pwork.project_set(self.PROJ_ID, self.PROJ_LINK_NAME) self.assertFalse(cser.project_get()) cser.project_set(pwork, 'U-Boot', quiet=True) with terminal.capture(): cser.add('first', '', allow_unmarked=True) cser.add('second', allow_unmarked=True) with self.stage('autolink unset'): with terminal.capture() as (out, _): yield cser, pwork self.assertEqual( "Setting link for series 'second' v1 to " f'{self.SERIES_ID_SECOND_V1}', out.getvalue().splitlines()[-1]) svlist = cser.get_ser_ver_list() self.assertEqual(2, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(1, svlist[0].version) self.assertEqual(2, svlist[1].idnum) self.assertEqual(2, svlist[1].series_id) self.assertEqual(1, svlist[1].version) self.assertEqual(str(self.SERIES_ID_SECOND_V1), svlist[1].link) yield None def test_series_autolink(self): """Test linking a cseries to its patchwork series by description""" cor = self.check_series_autolink() cser, pwork = next(cor) with self.assertRaises(ValueError) as exc: cser.link_auto(pwork, 'first', None, True) self.assertIn("Series 'first' has an empty description", str(exc.exception)) # autolink unset cser.link_auto(pwork, 'second', None, True) self.assertFalse(next(cor)) cor.close() def test_series_autolink_cmdline(self): """Test linking to patchwork series by description on cmdline""" cor = self.check_series_autolink() _, pwork = next(cor) with terminal.capture() as (out, _): self.run_args('series', '-s', 'first', 'autolink', expect_ret=1, pwork=pwork) self.assertEqual( "patman: ValueError: Series 'first' has an empty description", out.getvalue().strip()) # autolink unset self.run_args('series', '-s', 'second', 'autolink', '-u', pwork=pwork) self.assertFalse(next(cor)) cor.close() def _autolink_setup(self): """Set things up for autolink tests Return: tuple: Cseries object Patchwork object """ cser = self.get_cser() pwork = Patchwork.for_testing(self._fake_patchwork_cser) pwork.project_set(self.PROJ_ID, self.PROJ_LINK_NAME) self.assertFalse(cser.project_get()) cser.project_set(pwork, 'U-Boot', quiet=True) with terminal.capture(): cser.add('first', 'first series', allow_unmarked=True) cser.add('second', allow_unmarked=True) cser.increment('first') return cser, pwork def test_series_link_auto_all(self): """Test linking all cseries to their patchwork series by description""" cser, pwork = self._autolink_setup() with terminal.capture() as (out, _): summary = cser.link_auto_all(pwork, update_commit=True, link_all_versions=True, replace_existing=False, dry_run=True, show_summary=False) self.assertEqual(3, len(summary)) items = iter(summary.values()) linked = next(items) self.assertEqual( ('first', 1, None, 'first series', 'linked:1234'), linked) self.assertEqual( ('first', 2, None, 'first series', 'not found'), next(items)) self.assertEqual( ('second', 1, f'{self.SERIES_ID_SECOND_V1}', 'Series for my board', f'already:{self.SERIES_ID_SECOND_V1}'), next(items)) self.assertEqual('Dry run completed', out.getvalue().splitlines()[-1]) # A second dry run should do exactly the same thing with terminal.capture() as (out2, _): summary2 = cser.link_auto_all(pwork, update_commit=True, link_all_versions=True, replace_existing=False, dry_run=True, show_summary=False) self.assertEqual(out.getvalue(), out2.getvalue()) self.assertEqual(summary, summary2) # Now do it for real with terminal.capture(): summary = cser.link_auto_all(pwork, update_commit=True, link_all_versions=True, replace_existing=False, dry_run=False, show_summary=False) # Check the link was updated pdict = cser.get_ser_ver_dict() svid = list(summary)[0] self.assertEqual('1234', pdict[svid].link) series = patchstream.get_metadata_for_list('first', self.gitdir, 2) self.assertEqual('1:1234', series.links) def test_series_autolink_latest(self): """Test linking the lastest versions""" cser, pwork = self._autolink_setup() with terminal.capture(): summary = cser.link_auto_all(pwork, update_commit=True, link_all_versions=False, replace_existing=False, dry_run=False, show_summary=False) self.assertEqual(2, len(summary)) items = iter(summary.values()) self.assertEqual( ('first', 2, None, 'first series', 'not found'), next(items)) self.assertEqual( ('second', 1, f'{self.SERIES_ID_SECOND_V1}', 'Series for my board', f'already:{self.SERIES_ID_SECOND_V1}'), next(items)) def test_series_autolink_no_update(self): """Test linking the lastest versions without updating commits""" cser, pwork = self._autolink_setup() with terminal.capture(): cser.link_auto_all(pwork, update_commit=False, link_all_versions=True, replace_existing=False, dry_run=False, show_summary=False) series = patchstream.get_metadata_for_list('first', self.gitdir, 2) self.assertNotIn('links', series) def test_series_autolink_replace(self): """Test linking the lastest versions without updating commits""" cser, pwork = self._autolink_setup() with terminal.capture(): summary = cser.link_auto_all(pwork, update_commit=True, link_all_versions=True, replace_existing=True, dry_run=False, show_summary=False) self.assertEqual(3, len(summary)) items = iter(summary.values()) linked = next(items) self.assertEqual( ('first', 1, None, 'first series', 'linked:1234'), linked) self.assertEqual( ('first', 2, None, 'first series', 'not found'), next(items)) self.assertEqual( ('second', 1, f'{self.SERIES_ID_SECOND_V1}', 'Series for my board', f'linked:{self.SERIES_ID_SECOND_V1}'), next(items)) def test_series_autolink_extra(self): """Test command-line operation This just uses mocks for now since we can rely on the direct tests for the actual operation. """ _, pwork = self._autolink_setup() with (mock.patch.object(cseries.Cseries, 'link_auto_all', return_value=None) as method): self.run_args('series', 'autolink-all', pwork=True) method.assert_called_once_with(True, update_commit=False, link_all_versions=False, replace_existing=False, dry_run=False, show_summary=True) with (mock.patch.object(cseries.Cseries, 'link_auto_all', return_value=None) as method): self.run_args('series', 'autolink-all', '-a', pwork=True) method.assert_called_once_with(True, update_commit=False, link_all_versions=True, replace_existing=False, dry_run=False, show_summary=True) with (mock.patch.object(cseries.Cseries, 'link_auto_all', return_value=None) as method): self.run_args('series', 'autolink-all', '-a', '-r', pwork=True) method.assert_called_once_with(True, update_commit=False, link_all_versions=True, replace_existing=True, dry_run=False, show_summary=True) with (mock.patch.object(cseries.Cseries, 'link_auto_all', return_value=None) as method): self.run_args('series', '-n', 'autolink-all', '-r', pwork=True) method.assert_called_once_with(True, update_commit=False, link_all_versions=False, replace_existing=True, dry_run=True, show_summary=True) with (mock.patch.object(cseries.Cseries, 'link_auto_all', return_value=None) as method): self.run_args('series', 'autolink-all', '-u', pwork=True) method.assert_called_once_with(True, update_commit=True, link_all_versions=False, replace_existing=False, dry_run=False, show_summary=True) # Now do a real one to check the patchwork handling and output with terminal.capture() as (out, _): self.run_args('series', 'autolink-all', '-a', pwork=pwork) itr = iter(out.getvalue().splitlines()) self.assertEqual( '1 series linked, 1 already linked, 1 not found (3 requests)', next(itr)) self.assertEqual('', next(itr)) self.assertEqual( 'Name Version Description ' ' Result', next(itr)) self.assertTrue(next(itr).startswith('--')) self.assertEqual( 'first 1 first series ' ' linked:1234', next(itr)) self.assertEqual( 'first 2 first series ' ' not found', next(itr)) self.assertEqual( 'second 1 Series for my board ' f' already:{self.SERIES_ID_SECOND_V1}', next(itr)) self.assertTrue(next(itr).startswith('--')) self.assert_finished(itr) def check_series_archive(self): """Coroutine to run the archive test""" cser = self.get_cser() with self.stage('setup'): with terminal.capture(): cser.add('first', '', allow_unmarked=True) # Check the series is visible in the list slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) self.assertEqual('first', slist['first'].name) # Add a second branch with terminal.capture(): cser.increment('first') cser.fake_now = datetime(24, 9, 14) repo = pygit2.init_repository(self.gitdir) with self.stage('archive'): expected_commit1 = repo.revparse_single('first') expected_commit2 = repo.revparse_single('first2') expected_tag1 = 'first-14sep24' expected_tag2 = 'first2-14sep24' # Archive it and make sure it is invisible yield cser slist = cser.db.series_get_dict() self.assertFalse(slist) # ...unless we include archived items slist = cser.db.series_get_dict(include_archived=True) self.assertEqual(1, len(slist)) first = slist['first'] self.assertEqual('first', first.name) # Make sure the branches have been tagged svlist = cser.db.ser_ver_get_for_series(first.idnum) self.assertEqual(expected_tag1, svlist[0].archive_tag) self.assertEqual(expected_tag2, svlist[1].archive_tag) # Check that the tags were created and point to old branch commits target1 = repo.revparse_single(expected_tag1) self.assertEqual(expected_commit1, target1.get_object()) target2 = repo.revparse_single(expected_tag2) self.assertEqual(expected_commit2, target2.get_object()) # The branches should be deleted self.assertFalse('first' in repo.branches) self.assertFalse('first2' in repo.branches) with self.stage('unarchive'): # or we unarchive it yield cser slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) # Make sure the branches have been restored branch1 = repo.branches['first'] branch2 = repo.branches['first2'] self.assertEqual(expected_commit1.oid, branch1.target) self.assertEqual(expected_commit2.oid, branch2.target) # Make sure the tags were deleted try: target1 = repo.revparse_single(expected_tag1) self.fail('target1 is still present') except KeyError: pass try: target1 = repo.revparse_single(expected_tag2) self.fail('target2 is still present') except KeyError: pass # Make sure the tag information has been removed svlist = cser.db.ser_ver_get_for_series(first.idnum) self.assertFalse(svlist[0].archive_tag) self.assertFalse(svlist[1].archive_tag) yield False def test_series_archive(self): """Test marking a series as archived""" cor = self.check_series_archive() cser = next(cor) # Archive it and make sure it is invisible cser.archive('first') cser = next(cor) cser.unarchive('first') self.assertFalse(next(cor)) cor.close() def test_series_archive_cmdline(self): """Test marking a series as archived with cmdline""" cor = self.check_series_archive() cser = next(cor) # Archive it and make sure it is invisible self.run_args('series', '-s', 'first', 'archive', pwork=True, cser=cser) next(cor) self.run_args('series', '-s', 'first', 'unarchive', pwork=True, cser=cser) self.assertFalse(next(cor)) cor.close() def check_series_inc(self): """Coroutine to run the increment test""" cser = self.get_cser() with self.stage('setup'): gitutil.checkout('first', self.gitdir, work_tree=self.tmpdir, force=True) with terminal.capture() as (out, _): cser.add('first', '', allow_unmarked=True) with self.stage('increment'): with terminal.capture() as (out, _): yield cser self._check_inc(out) slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) svlist = cser.get_ser_ver_list() self.assertEqual(2, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(1, svlist[0].version) self.assertEqual(2, svlist[1].idnum) self.assertEqual(1, svlist[1].series_id) self.assertEqual(2, svlist[1].version) series = patchstream.get_metadata_for_list('first2', self.gitdir, 1) self.assertEqual('2', series.version) series = patchstream.get_metadata_for_list('first', self.gitdir, 1) self.assertNotIn('version', series) self.assertEqual('first2', gitutil.get_branch(self.gitdir)) yield None def test_series_inc(self): """Test incrementing the version""" cor = self.check_series_inc() cser = next(cor) cser.increment('first') self.assertFalse(next(cor)) cor.close() def test_series_inc_cmdline(self): """Test incrementing the version with cmdline""" cor = self.check_series_inc() next(cor) self.run_args('series', '-s', 'first', 'inc', pwork=True) self.assertFalse(next(cor)) cor.close() def test_series_inc_no_upstream(self): """Increment a series which has no upstream branch""" cser = self.get_cser() gitutil.checkout('first', self.gitdir, work_tree=self.tmpdir, force=True) with terminal.capture(): cser.add('first', '', allow_unmarked=True) repo = pygit2.init_repository(self.gitdir) upstream = repo.lookup_branch('base') upstream.delete() with terminal.capture(): cser.increment('first') slist = cser.db.series_get_dict() self.assertEqual(1, len(slist)) def test_series_inc_dryrun(self): """Test incrementing the version with cmdline""" cser = self.get_cser() gitutil.checkout('first', self.gitdir, work_tree=self.tmpdir, force=True) with terminal.capture() as (out, _): cser.add('first', '', allow_unmarked=True) with terminal.capture() as (out, _): cser.increment('first', dry_run=True) itr = self._check_inc(out) self.assertEqual('Dry run completed', next(itr)) # Make sure that nothing was added svlist = cser.get_ser_ver_list() self.assertEqual(1, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(1, svlist[0].version) # We should still be on the same branch self.assertEqual('first', gitutil.get_branch(self.gitdir)) def test_series_dec(self): """Test decrementing the version""" cser = self.get_cser() gitutil.checkout('first', self.gitdir, work_tree=self.tmpdir, force=True) with terminal.capture() as (out, _): cser.add('first', '', allow_unmarked=True) pclist = cser.get_pcommit_dict() self.assertEqual(2, len(pclist)) # Try decrementing when there is only one version with self.assertRaises(ValueError) as exc: cser.decrement('first') self.assertEqual("Series 'first' only has one version", str(exc.exception)) # Add a version; now there should be two with terminal.capture() as (out, _): cser.increment('first') svdict = cser.get_ser_ver_dict() self.assertEqual(2, len(svdict)) pclist = cser.get_pcommit_dict() self.assertEqual(4, len(pclist)) # Remove version two, using dry run (i.e. no effect) with terminal.capture() as (out, _): cser.decrement('first', dry_run=True) svdict = cser.get_ser_ver_dict() self.assertEqual(2, len(svdict)) repo = pygit2.init_repository(self.gitdir) branch = repo.lookup_branch('first2') self.assertTrue(branch) branch_oid = branch.peel(pygit2.enums.ObjectType.COMMIT).oid pclist = cser.get_pcommit_dict() self.assertEqual(4, len(pclist)) # Now remove version two for real with terminal.capture() as (out, _): cser.decrement('first') lines = out.getvalue().splitlines() self.assertEqual(2, len(lines)) self.assertEqual("Removing series 'first' v2", lines[0]) self.assertEqual( f"Deleted branch 'first2' {str(branch_oid)[:10]}", lines[1]) svdict = cser.get_ser_ver_dict() self.assertEqual(1, len(svdict)) pclist = cser.get_pcommit_dict() self.assertEqual(2, len(pclist)) branch = repo.lookup_branch('first2') self.assertFalse(branch) # Removing the only version should not be allowed with self.assertRaises(ValueError) as exc: cser.decrement('first', dry_run=True) self.assertEqual("Series 'first' only has one version", str(exc.exception)) def test_upstream_add(self): """Test adding an upsream""" cser = self.get_cser() cser.upstream_add('us', 'https://one') ulist = cser.get_upstream_dict() self.assertEqual(1, len(ulist)) self.assertEqual(('https://one', None), ulist['us']) cser.upstream_add('ci', 'git@two') ulist = cser.get_upstream_dict() self.assertEqual(2, len(ulist)) self.assertEqual(('https://one', None), ulist['us']) self.assertEqual(('git@two', None), ulist['ci']) # Try to add a duplicate with self.assertRaises(ValueError) as exc: cser.upstream_add('ci', 'git@three') self.assertEqual("Upstream 'ci' already exists", str(exc.exception)) with terminal.capture() as (out, _): cser.upstream_list() lines = out.getvalue().splitlines() self.assertEqual(2, len(lines)) self.assertEqual('us https://one', lines[0]) self.assertEqual('ci git@two', lines[1]) def test_upstream_add_cmdline(self): """Test adding an upsream with cmdline""" with terminal.capture(): self.run_args('upstream', 'add', 'us', 'https://one') with terminal.capture() as (out, _): self.run_args('upstream', 'list') lines = out.getvalue().splitlines() self.assertEqual(1, len(lines)) self.assertEqual('us https://one', lines[0]) def test_upstream_default(self): """Operation of the default upstream""" cser = self.get_cser() with self.assertRaises(ValueError) as exc: cser.upstream_set_default('us') self.assertEqual("No such upstream 'us'", str(exc.exception)) cser.upstream_add('us', 'https://one') cser.upstream_add('ci', 'git@two') self.assertIsNone(cser.upstream_get_default()) cser.upstream_set_default('us') self.assertEqual('us', cser.upstream_get_default()) cser.upstream_set_default('us') cser.upstream_set_default('ci') self.assertEqual('ci', cser.upstream_get_default()) with terminal.capture() as (out, _): cser.upstream_list() lines = out.getvalue().splitlines() self.assertEqual(2, len(lines)) self.assertEqual('us https://one', lines[0]) self.assertEqual('ci default git@two', lines[1]) cser.upstream_set_default(None) self.assertIsNone(cser.upstream_get_default()) def test_upstream_default_cmdline(self): """Operation of the default upstream on cmdline""" with terminal.capture() as (out, _): self.run_args('upstream', 'default', 'us', expect_ret=1) self.assertEqual("patman: ValueError: No such upstream 'us'", out.getvalue().strip().splitlines()[-1]) self.run_args('upstream', 'add', 'us', 'https://one') self.run_args('upstream', 'add', 'ci', 'git@two') with terminal.capture() as (out, _): self.run_args('upstream', 'default') self.assertEqual('unset', out.getvalue().strip()) self.run_args('upstream', 'default', 'us') with terminal.capture() as (out, _): self.run_args('upstream', 'default') self.assertEqual('us', out.getvalue().strip()) self.run_args('upstream', 'default', 'ci') with terminal.capture() as (out, _): self.run_args('upstream', 'default') self.assertEqual('ci', out.getvalue().strip()) with terminal.capture() as (out, _): self.run_args('upstream', 'default', '--unset') self.assertFalse(out.getvalue().strip()) with terminal.capture() as (out, _): self.run_args('upstream', 'default') self.assertEqual('unset', out.getvalue().strip()) def test_upstream_delete(self): """Test operation of the default upstream""" cser = self.get_cser() with self.assertRaises(ValueError) as exc: cser.upstream_delete('us') self.assertEqual("No such upstream 'us'", str(exc.exception)) cser.upstream_add('us', 'https://one') cser.upstream_add('ci', 'git@two') cser.upstream_set_default('us') cser.upstream_delete('us') self.assertIsNone(cser.upstream_get_default()) cser.upstream_delete('ci') ulist = cser.get_upstream_dict() self.assertFalse(ulist) def test_upstream_delete_cmdline(self): """Test deleting an upstream""" with terminal.capture() as (out, _): self.run_args('upstream', 'delete', 'us', expect_ret=1) self.assertEqual("patman: ValueError: No such upstream 'us'", out.getvalue().strip().splitlines()[-1]) self.run_args('us', 'add', 'us', 'https://one') self.run_args('us', 'add', 'ci', 'git@two') self.run_args('upstream', 'default', 'us') self.run_args('upstream', 'delete', 'us') with terminal.capture() as (out, _): self.run_args('upstream', 'default', 'us', expect_ret=1) self.assertEqual("patman: ValueError: No such upstream 'us'", out.getvalue().strip()) self.run_args('upstream', 'delete', 'ci') with terminal.capture() as (out, _): self.run_args('upstream', 'list') self.assertFalse(out.getvalue().strip()) def test_series_add_mark(self): """Test marking a cseries with Change-Id fields""" cser = self.get_cser() with terminal.capture(): cser.add('first', '', mark=True) pcdict = cser.get_pcommit_dict() series = patchstream.get_metadata('first', 0, 2, git_dir=self.gitdir) self.assertEqual(2, len(series.commits)) self.assertIn(1, pcdict) self.assertEqual(1, pcdict[1].idnum) self.assertEqual('i2c: I2C things', pcdict[1].subject) self.assertEqual(1, pcdict[1].svid) self.assertEqual(series.commits[0].change_id, pcdict[1].change_id) self.assertIn(2, pcdict) self.assertEqual(2, pcdict[2].idnum) self.assertEqual('spi: SPI fixes', pcdict[2].subject) self.assertEqual(1, pcdict[2].svid) self.assertEqual(series.commits[1].change_id, pcdict[2].change_id) def test_series_add_mark_fail(self): """Test marking a cseries when the tree is dirty""" cser = self.get_cser() tools.write_file(os.path.join(self.tmpdir, 'fname'), b'123') with terminal.capture(): cser.add('first', '', mark=True) tools.write_file(os.path.join(self.tmpdir, 'i2c.c'), b'123') with self.assertRaises(ValueError) as exc: with terminal.capture(): cser.add('first', '', mark=True) self.assertEqual( "Modified files exist: use 'git status' to check: [' M i2c.c']", str(exc.exception)) def test_series_add_mark_dry_run(self): """Test marking a cseries with Change-Id fields""" cser = self.get_cser() with terminal.capture() as (out, _): cser.add('first', '', mark=True, dry_run=True) itr = iter(out.getvalue().splitlines()) self.assertEqual( "Adding series 'first' v1: mark True allow_unmarked False", next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual("Processing 2 commits from branch 'first'", next(itr)) self.assertRegex( next(itr), f'- marked: {HASH_RE} as {HASH_RE} i2c: I2C things') self.assertRegex( next(itr), f'- marked: {HASH_RE} as {HASH_RE} spi: SPI fixes') self.assertRegex( next(itr), f'Updating branch first from {HASH_RE} to {HASH_RE}') self.assertEqual("Added series 'first' v1 (2 commits)", next(itr)) self.assertEqual('Dry run completed', next(itr)) # Doing another dry run should produce the same result with terminal.capture() as (out2, _): cser.add('first', '', mark=True, dry_run=True) self.assertEqual(out.getvalue(), out2.getvalue()) tools.write_file(os.path.join(self.tmpdir, 'i2c.c'), b'123') with terminal.capture() as (out, _): with self.assertRaises(ValueError) as exc: cser.add('first', '', mark=True, dry_run=True) self.assertEqual( "Modified files exist: use 'git status' to check: [' M i2c.c']", str(exc.exception)) pcdict = cser.get_pcommit_dict() self.assertFalse(pcdict) def test_series_add_mark_cmdline(self): """Test marking a cseries with Change-Id fields using the cmdline""" cser = self.get_cser() with terminal.capture(): self.run_args('series', '-s', 'first', 'add', '-m', '-D', 'my-description', pwork=True) pcdict = cser.get_pcommit_dict() self.assertTrue(pcdict[1].change_id) self.assertTrue(pcdict[2].change_id) def test_series_add_unmarked_cmdline(self): """Test adding an unmarked cseries using the command line""" cser = self.get_cser() with terminal.capture(): self.run_args('series', '-s', 'first', 'add', '-M', '-D', 'my-description', pwork=True) pcdict = cser.get_pcommit_dict() self.assertFalse(pcdict[1].change_id) self.assertFalse(pcdict[2].change_id) def test_series_add_unmarked_bad_cmdline(self): """Test failure to add an unmarked cseries using a bad command line""" self.get_cser() with terminal.capture() as (out, _): self.run_args('series', '-s', 'first', 'add', '-D', 'my-description', expect_ret=1, pwork=True) last_line = out.getvalue().splitlines()[-2] self.assertEqual( 'patman: ValueError: 2 commit(s) are unmarked; ' 'please use -m or -M', last_line) def check_series_unmark(self): """Checker for unmarking tests""" cser = self.get_cser() with self.stage('unmarked commits'): yield cser with self.stage('mark commits'): with terminal.capture() as (out, _): yield cser with self.stage('unmark: dry run'): with terminal.capture() as (out, _): yield cser itr = iter(out.getvalue().splitlines()) self.assertEqual( "Unmarking series 'first': allow_unmarked False", next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual("Processing 2 commits from branch 'first'", next(itr)) self.assertRegex( next(itr), f'- unmarked: {HASH_RE} as {HASH_RE} i2c: I2C things') self.assertRegex( next(itr), f'- unmarked: {HASH_RE} as {HASH_RE} spi: SPI fixes') self.assertRegex( next(itr), f'Updating branch first from {HASH_RE} to {HASH_RE}') self.assertEqual('Dry run completed', next(itr)) with self.stage('unmark'): with terminal.capture() as (out, _): yield cser self.assertIn('- unmarked', out.getvalue()) with self.stage('unmark: allow unmarked'): with terminal.capture() as (out, _): yield cser self.assertIn('- no mark', out.getvalue()) yield None def test_series_unmark(self): """Test unmarking a cseries, i.e. removing Change-Id fields""" cor = self.check_series_unmark() cser = next(cor) # check the allow_unmarked flag with terminal.capture(): with self.assertRaises(ValueError) as exc: cser.unmark('first', dry_run=True) self.assertEqual('Unmarked commits 2/2', str(exc.exception)) # mark commits cser = next(cor) cser.add('first', '', mark=True) # unmark: dry run cser = next(cor) cser.unmark('first', dry_run=True) # unmark cser = next(cor) cser.unmark('first') # unmark: allow unmarked cser = next(cor) cser.unmark('first', allow_unmarked=True) self.assertFalse(next(cor)) def test_series_unmark_cmdline(self): """Test the unmark command""" cor = self.check_series_unmark() next(cor) # check the allow_unmarked flag with terminal.capture() as (out, _): self.run_args('series', 'unmark', expect_ret=1, pwork=True) self.assertIn('Unmarked commits 2/2', out.getvalue()) # mark commits next(cor) self.run_args('series', '-s', 'first', 'add', '-D', '', '--mark', pwork=True) # unmark: dry run next(cor) self.run_args('series', '-s', 'first', '-n', 'unmark', pwork=True) # unmark next(cor) self.run_args('series', '-s', 'first', 'unmark', pwork=True) # unmark: allow unmarked next(cor) self.run_args('series', '-s', 'first', 'unmark', '--allow-unmarked', pwork=True) self.assertFalse(next(cor)) def test_series_unmark_middle(self): """Test unmarking with Change-Id fields not last in the commit""" cser = self.get_cser() with terminal.capture(): cser.add('first', '', allow_unmarked=True) # Add some change IDs in the middle of the commit message with terminal.capture(): name, ser, _, _ = cser.prep_series('first') old_msgs = [] for vals in cser.process_series(name, ser): old_msgs.append(vals.msg) lines = vals.msg.splitlines() change_id = cser.make_change_id(vals.commit) extra = [f'{cser_helper.CHANGE_ID_TAG}: {change_id}'] vals.msg = '\n'.join(lines[:2] + extra + lines[2:]) + '\n' with terminal.capture(): cser.unmark('first') # We should get back the original commit message series = patchstream.get_metadata('first', 0, 2, git_dir=self.gitdir) self.assertEqual(old_msgs[0], series.commits[0].msg) self.assertEqual(old_msgs[1], series.commits[1].msg) def check_series_mark(self): """Checker for marking tests""" cser = self.get_cser() yield cser # Start with a dry run, which should do nothing with self.stage('dry run'): with terminal.capture(): yield cser series = patchstream.get_metadata_for_list('first', self.gitdir, 2) self.assertEqual(2, len(series.commits)) self.assertFalse(series.commits[0].change_id) self.assertFalse(series.commits[1].change_id) # Now do a real run with self.stage('real run'): with terminal.capture(): yield cser series = patchstream.get_metadata_for_list('first', self.gitdir, 2) self.assertEqual(2, len(series.commits)) self.assertTrue(series.commits[0].change_id) self.assertTrue(series.commits[1].change_id) # Try to mark again, which should fail with self.stage('mark twice'): with terminal.capture(): with self.assertRaises(ValueError) as exc: cser.mark('first', dry_run=False) self.assertEqual('Marked commits 2/2', str(exc.exception)) # Use the --marked flag to make it succeed with self.stage('mark twice with --marked'): with terminal.capture(): yield cser self.assertEqual('Marked commits 2/2', str(exc.exception)) series2 = patchstream.get_metadata_for_list('first', self.gitdir, 2) self.assertEqual(2, len(series2.commits)) self.assertEqual(series.commits[0].change_id, series2.commits[0].change_id) self.assertEqual(series.commits[1].change_id, series2.commits[1].change_id) yield None def test_series_mark(self): """Test marking a cseries, i.e. adding Change-Id fields""" cor = self.check_series_mark() cser = next(cor) # Start with a dry run, which should do nothing cser = next(cor) cser.mark('first', dry_run=True) # Now do a real run cser = next(cor) cser.mark('first', dry_run=False) # Try to mark again, which should fail with terminal.capture(): with self.assertRaises(ValueError) as exc: cser.mark('first', dry_run=False) self.assertEqual('Marked commits 2/2', str(exc.exception)) # Use the --allow-marked flag to make it succeed cser = next(cor) cser.mark('first', allow_marked=True, dry_run=False) self.assertFalse(next(cor)) def test_series_mark_cmdline(self): """Test marking a cseries, i.e. adding Change-Id fields""" cor = self.check_series_mark() next(cor) # Start with a dry run, which should do nothing next(cor) self.run_args('series', '-n', '-s', 'first', 'mark', pwork=True) # Now do a real run next(cor) self.run_args('series', '-s', 'first', 'mark', pwork=True) # Try to mark again, which should fail with terminal.capture() as (out, _): self.run_args('series', '-s', 'first', 'mark', expect_ret=1, pwork=True) self.assertIn('Marked commits 2/2', out.getvalue()) # Use the --allow-marked flag to make it succeed next(cor) self.run_args('series', '-s', 'first', 'mark', '--allow-marked', pwork=True) self.assertFalse(next(cor)) def test_series_remove(self): """Test removing a series""" cser = self.get_cser() with self.stage('remove non-existent series'): with self.assertRaises(ValueError) as exc: cser.remove('first') self.assertEqual("No such series 'first'", str(exc.exception)) with self.stage('add'): with terminal.capture() as (out, _): cser.add('first', '', mark=True) self.assertTrue(cser.db.series_get_dict()) pclist = cser.get_pcommit_dict() self.assertEqual(2, len(pclist)) with self.stage('remove'): with terminal.capture() as (out, _): cser.remove('first') self.assertEqual("Removed series 'first'", out.getvalue().strip()) self.assertFalse(cser.db.series_get_dict()) pclist = cser.get_pcommit_dict() self.assertFalse(len(pclist)) def test_series_remove_cmdline(self): """Test removing a series using the command line""" cser = self.get_cser() with self.stage('remove non-existent series'): with terminal.capture() as (out, _): self.run_args('series', '-s', 'first', 'rm', expect_ret=1, pwork=True) self.assertEqual("patman: ValueError: No such series 'first'", out.getvalue().strip()) with self.stage('add'): with terminal.capture() as (out, _): cser.add('first', '', mark=True) self.assertTrue(cser.db.series_get_dict()) with self.stage('remove'): with terminal.capture() as (out, _): cser.remove('first') self.assertEqual("Removed series 'first'", out.getvalue().strip()) self.assertFalse(cser.db.series_get_dict()) def check_series_remove_multiple(self): """Check for removing a series with more than one version""" cser = self.get_cser() with self.stage('setup'): self.add_first2(True) with terminal.capture() as (out, _): cser.add(None, '', mark=True) cser.add('first', '', mark=True) self.assertTrue(cser.db.series_get_dict()) pclist = cser.get_pcommit_dict() self.assertEqual(4, len(pclist)) # Do a dry-run removal with self.stage('dry run'): with terminal.capture() as (out, _): yield cser self.assertEqual("Removed version 1 from series 'first'\n" 'Dry run completed', out.getvalue().strip()) self.assertEqual({'first'}, cser.db.series_get_dict().keys()) svlist = cser.get_ser_ver_list() self.assertEqual(2, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(2, svlist[0].version) self.assertEqual(2, svlist[1].idnum) self.assertEqual(1, svlist[1].series_id) self.assertEqual(1, svlist[1].version) # Now remove for real with self.stage('real'): with terminal.capture() as (out, _): yield cser self.assertEqual("Removed version 1 from series 'first'", out.getvalue().strip()) self.assertEqual({'first'}, cser.db.series_get_dict().keys()) plist = cser.get_ser_ver_list() self.assertEqual(1, len(plist)) pclist = cser.get_pcommit_dict() self.assertEqual(2, len(pclist)) with self.stage('remove only version'): yield cser self.assertEqual({'first'}, cser.db.series_get_dict().keys()) svlist = cser.get_ser_ver_list() self.assertEqual(1, len(svlist)) self.assertEqual(1, svlist[0].idnum) self.assertEqual(1, svlist[0].series_id) self.assertEqual(2, svlist[0].version) with self.stage('remove series (dry run'): with terminal.capture() as (out, _): yield cser self.assertEqual("Removed series 'first'\nDry run completed", out.getvalue().strip()) self.assertTrue(cser.db.series_get_dict()) self.assertTrue(cser.get_ser_ver_list()) with self.stage('remove series'): with terminal.capture() as (out, _): yield cser self.assertEqual("Removed series 'first'", out.getvalue().strip()) self.assertFalse(cser.db.series_get_dict()) self.assertFalse(cser.get_ser_ver_list()) yield False def test_series_remove_multiple(self): """Test removing a series with more than one version""" cor = self.check_series_remove_multiple() cser = next(cor) # Do a dry-run removal cser.version_remove('first', 1, dry_run=True) cser = next(cor) # Now remove for real cser.version_remove('first', 1) cser = next(cor) # Remove only version with self.assertRaises(ValueError) as exc: cser.version_remove('first', 2, dry_run=True) self.assertEqual( "Series 'first' only has one version: remove the series", str(exc.exception)) cser = next(cor) # Remove series (dry run) cser.remove('first', dry_run=True) cser = next(cor) # Remove series (real) cser.remove('first') self.assertFalse(next(cor)) cor.close() def test_series_remove_multiple_cmdline(self): """Test removing a series with more than one version on cmdline""" cor = self.check_series_remove_multiple() next(cor) # Do a dry-run removal self.run_args('series', '-n', '-s', 'first', '-V', '1', 'rm-version', pwork=True) next(cor) # Now remove for real self.run_args('series', '-s', 'first', '-V', '1', 'rm-version', pwork=True) next(cor) # Remove only version with terminal.capture() as (out, _): self.run_args('series', '-n', '-s', 'first', '-V', '2', 'rm-version', expect_ret=1, pwork=True) self.assertIn( "Series 'first' only has one version: remove the series", out.getvalue().strip()) next(cor) # Remove series (dry run) self.run_args('series', '-n', '-s', 'first', 'rm', pwork=True) next(cor) # Remove series (real) self.run_args('series', '-s', 'first', 'rm', pwork=True) self.assertFalse(next(cor)) cor.close() def test_patchwork_set_project(self): """Test setting the project ID""" cser = self.get_cser() pwork = Patchwork.for_testing(self._fake_patchwork_cser) with terminal.capture() as (out, _): cser.project_set(pwork, 'U-Boot') self.assertEqual( f"Project 'U-Boot' patchwork-ID {self.PROJ_ID} link-name uboot", out.getvalue().strip()) def test_patchwork_project_get(self): """Test setting the project ID""" cser = self.get_cser() pwork = Patchwork.for_testing(self._fake_patchwork_cser) self.assertFalse(cser.project_get()) with terminal.capture() as (out, _): cser.project_set(pwork, 'U-Boot') self.assertEqual( f"Project 'U-Boot' patchwork-ID {self.PROJ_ID} link-name uboot", out.getvalue().strip()) name, pwid, link_name = cser.project_get() self.assertEqual('U-Boot', name) self.assertEqual(self.PROJ_ID, pwid) self.assertEqual('uboot', link_name) def test_patchwork_project_get_cmdline(self): """Test setting the project ID""" cser = self.get_cser() self.assertFalse(cser.project_get()) pwork = Patchwork.for_testing(self._fake_patchwork_cser) with terminal.capture() as (out, _): self.run_args('-P', 'https://url', 'patchwork', 'set-project', 'U-Boot', pwork=pwork) self.assertEqual( f"Project 'U-Boot' patchwork-ID {self.PROJ_ID} link-name uboot", out.getvalue().strip()) name, pwid, link_name = cser.project_get() self.assertEqual('U-Boot', name) self.assertEqual(6, pwid) self.assertEqual('uboot', link_name) with terminal.capture() as (out, _): self.run_args('-P', 'https://url', 'patchwork', 'get-project') self.assertEqual( f"Project 'U-Boot' patchwork-ID {self.PROJ_ID} link-name uboot", out.getvalue().strip()) def check_series_list_patches(self): """Test listing the patches for a series""" cser = self.get_cser() with self.stage('setup'): with terminal.capture() as (out, _): cser.add(None, '', allow_unmarked=True) cser.add('second', allow_unmarked=True) target = self.repo.lookup_reference('refs/heads/second') self.repo.checkout( target, strategy=pygit2.enums.CheckoutStrategy.FORCE) cser.increment('second') with self.stage('list first'): with terminal.capture() as (out, _): yield cser itr = iter(out.getvalue().splitlines()) self.assertEqual("Branch 'first' (total 2): 2:unknown", next(itr)) self.assertIn('PatchId', next(itr)) self.assertRegex(next(itr), r' 0 .* i2c: I2C things') self.assertRegex(next(itr), r' 1 .* spi: SPI fixes') with self.stage('list second2'): with terminal.capture() as (out, _): yield cser itr = iter(out.getvalue().splitlines()) self.assertEqual( "Branch 'second2' (total 3): 3:unknown", next(itr)) self.assertIn('PatchId', next(itr)) self.assertRegex( next(itr), ' 0 .* video: Some video improvements') self.assertRegex(next(itr), ' 1 .* serial: Add a serial driver') self.assertRegex(next(itr), ' 2 .* bootm: Make it boot') yield None def test_series_list_patches(self): """Test listing the patches for a series""" cor = self.check_series_list_patches() cser = next(cor) # list first cser.list_patches('first', 1) cser = next(cor) # list second2 cser.list_patches('second2', 2) self.assertFalse(next(cor)) cor.close() def test_series_list_patches_cmdline(self): """Test listing the patches for a series using the cmdline""" cor = self.check_series_list_patches() next(cor) # list first self.run_args('series', '-s', 'first', 'patches', pwork=True) next(cor) # list second2 self.run_args('series', '-s', 'second', '-V', '2', 'patches', pwork=True) self.assertFalse(next(cor)) cor.close() def test_series_list_patches_detail(self): """Test listing the patches for a series""" cser = self.get_cser() with terminal.capture(): cser.add(None, '', allow_unmarked=True) cser.add('second', allow_unmarked=True) target = self.repo.lookup_reference('refs/heads/second') self.repo.checkout( target, strategy=pygit2.enums.CheckoutStrategy.FORCE) cser.increment('second') with terminal.capture() as (out, _): cser.list_patches('first', 1, show_commit=True) expect = r'''Branch 'first' (total 2): 2:unknown Seq State Com PatchId Commit Subject 0 unknown - .* i2c: I2C things commit .* Author: Test user Date: .* i2c: I2C things This has some stuff to do with I2C i2c.c | 2 ++ 1 file changed, 2 insertions(+) 1 unknown - .* spi: SPI fixes commit .* Author: Test user Date: .* spi: SPI fixes SPI needs some fixes and here they are Signed-off-by: Lord Edmund Blackaddër Series-to: u-boot Commit-notes: title of the series This is the cover letter for the series with various details END spi.c | 3 +++ 1 file changed, 3 insertions(+) ''' itr = iter(out.getvalue().splitlines()) for seq, eline in enumerate(expect.splitlines()): line = next(itr).rstrip() if '*' in eline: self.assertRegex(line, eline, f'line {seq + 1}') else: self.assertEqual(eline, line, f'line {seq + 1}') # Show just the patch; this should exclude the commit message with terminal.capture() as (out, _): cser.list_patches('first', 1, show_patch=True) chk = out.getvalue() self.assertIn('SPI fixes', chk) # subject self.assertNotIn('SPI needs some fixes', chk) # commit body self.assertIn('make SPI work', chk) # patch body # Show both with terminal.capture() as (out, _): cser.list_patches('first', 1, show_commit=True, show_patch=True) chk = out.getvalue() self.assertIn('SPI fixes', chk) # subject self.assertIn('SPI needs some fixes', chk) # commit body self.assertIn('make SPI work', chk) # patch body def check_series_gather(self): """Checker for gathering tags for a series""" cser = self.get_cser() with self.stage('setup'): pwork = Patchwork.for_testing(self._fake_patchwork_cser) self.assertFalse(cser.project_get()) cser.project_set(pwork, 'U-Boot', quiet=True) with terminal.capture() as (out, _): cser.add('second', 'description', allow_unmarked=True) ser = cser.get_series_by_name('second') pwid = cser.get_series_svid(ser.idnum, 1) # First do a dry run with self.stage('gather: dry run'): with terminal.capture() as (out, _): yield cser, pwork lines = out.getvalue().splitlines() self.assertEqual( f"Updating series 'second' version 1 from link " f"'{self.SERIES_ID_SECOND_V1}'", lines[0]) self.assertEqual('3 patches updated (7 requests)', lines[1]) self.assertEqual('Dry run completed', lines[2]) self.assertEqual(3, len(lines)) pwc = cser.get_pcommit_dict(pwid) self.assertIsNone(pwc[0].state) self.assertIsNone(pwc[1].state) self.assertIsNone(pwc[2].state) # Now try it again, gathering tags with self.stage('gather: dry run'): with terminal.capture() as (out, _): yield cser, pwork lines = out.getvalue().splitlines() itr = iter(lines) self.assertEqual( f"Updating series 'second' version 1 from link " f"'{self.SERIES_ID_SECOND_V1}'", next(itr)) self.assertEqual(' 1 video: Some video improvements', next(itr)) self.assertEqual(' + Reviewed-by: Fred Bloggs ', next(itr)) self.assertEqual(' 2 serial: Add a serial driver', next(itr)) self.assertEqual(' 3 bootm: Make it boot', next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual("Processing 3 commits from branch 'second'", next(itr)) self.assertRegex( next(itr), f'- added 1 tag: {HASH_RE} as {HASH_RE} ' 'video: Some video improvements') self.assertRegex( next(itr), f"- upd links '1:456': {HASH_RE} as {HASH_RE} " 'serial: Add a serial driver') self.assertRegex( next(itr), f'- {HASH_RE} as {HASH_RE} ' 'bootm: Make it boot') self.assertRegex( next(itr), f'Updating branch second from {HASH_RE} to {HASH_RE}') self.assertEqual('3 patches updated (7 requests)', next(itr)) self.assertEqual('Dry run completed', next(itr)) self.assert_finished(itr) # Make sure that no tags were added to the branch series = patchstream.get_metadata_for_list('second', self.gitdir, 3) for cmt in series.commits: self.assertFalse(cmt.rtags, 'Commit {cmt.subject} rtags {cmt.rtags}') # Now do it for real with self.stage('gather: real'): with terminal.capture() as (out, _): yield cser, pwork lines2 = out.getvalue().splitlines() self.assertEqual(lines2, lines[:-1]) # Make sure that the tags were added to the branch series = patchstream.get_metadata_for_list('second', self.gitdir, 3) self.assertEqual( {'Reviewed-by': {'Fred Bloggs '}}, series.commits[0].rtags) self.assertFalse(series.commits[1].rtags) self.assertFalse(series.commits[2].rtags) # Make sure the status was updated pwc = cser.get_pcommit_dict(pwid) self.assertEqual('accepted', pwc[0].state) self.assertEqual('changes-requested', pwc[1].state) self.assertEqual('rejected', pwc[2].state) yield None def test_series_gather(self): """Test gathering tags for a series""" cor = self.check_series_gather() cser, pwork = next(cor) # sync (dry_run) cser.gather(pwork, 'second', None, False, False, False, dry_run=True) cser, pwork = next(cor) # gather (dry_run) cser.gather(pwork, 'second', None, False, False, True, dry_run=True) cser, pwork = next(cor) # gather (real) cser.gather(pwork, 'second', None, False, False, True) self.assertFalse(next(cor)) def test_series_gather_cmdline(self): """Test gathering tags for a series with cmdline""" cor = self.check_series_gather() _, pwork = next(cor) # sync (dry_run) self.run_args( 'series', '-n', '-s', 'second', 'gather', '-G', pwork=pwork) # gather (dry_run) _, pwork = next(cor) self.run_args('series', '-n', '-s', 'second', 'gather', pwork=pwork) # gather (real) _, pwork = next(cor) self.run_args('series', '-s', 'second', 'gather', pwork=pwork) self.assertFalse(next(cor)) def check_series_gather_all(self): """Gather all series at once""" with self.stage('setup'): cser, pwork = self.setup_second(False) with terminal.capture(): cser.add('first', 'description', allow_unmarked=True) cser.increment('first') cser.increment('first') cser.link_set('first', 1, '123', True) cser.link_set('first', 2, '1234', True) cser.link_set('first', 3, f'{self.SERIES_ID_FIRST_V3}', True) cser.link_auto(pwork, 'second', 2, True) with self.stage('no options'): with terminal.capture() as (out, _): yield cser, pwork self.assertEqual( "Syncing 'first' v3\n" "Syncing 'second' v2\n" '\n' '5 patches and 2 cover letters updated, 0 missing links ' '(14 requests)\n' 'Dry run completed', out.getvalue().strip()) with self.stage('gather'): with terminal.capture() as (out, _): yield cser, pwork lines = out.getvalue().splitlines() itr = iter(lines) self.assertEqual("Syncing 'first' v3", next(itr)) self.assertEqual(' 1 i2c: I2C things', next(itr)) self.assertEqual( ' + Tested-by: Mary Smith # yak', next(itr)) self.assertEqual(' 2 spi: SPI fixes', next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual( "Processing 2 commits from branch 'first3'", next(itr)) self.assertRegex( next(itr), f'- added 1 tag: {HASH_RE} as {HASH_RE} i2c: I2C things') self.assertRegex( next(itr), f"- upd links '3:31': {HASH_RE} as {HASH_RE} spi: SPI fixes") self.assertRegex( next(itr), f'Updating branch first3 from {HASH_RE} to {HASH_RE}') self.assertEqual('', next(itr)) self.assertEqual("Syncing 'second' v2", next(itr)) self.assertEqual(' 1 video: Some video improvements', next(itr)) self.assertEqual( ' + Reviewed-by: Fred Bloggs ', next(itr)) self.assertEqual(' 2 serial: Add a serial driver', next(itr)) self.assertEqual(' 3 bootm: Make it boot', next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual( "Processing 3 commits from branch 'second2'", next(itr)) self.assertRegex( next(itr), f'- added 1 tag: {HASH_RE} as {HASH_RE} ' 'video: Some video improvements') self.assertRegex( next(itr), f"- upd links '2:457 1:456': {HASH_RE} as {HASH_RE} " 'serial: Add a serial driver') self.assertRegex( next(itr), f'- {HASH_RE} as {HASH_RE} ' 'bootm: Make it boot') self.assertRegex( next(itr), f'Updating branch second2 from {HASH_RE} to {HASH_RE}') self.assertEqual('', next(itr)) self.assertEqual( '5 patches and 2 cover letters updated, 0 missing links ' '(14 requests)', next(itr)) self.assertEqual('Dry run completed', next(itr)) self.assert_finished(itr) with self.stage('gather, patch comments,!dry_run'): with terminal.capture() as (out, _): yield cser, pwork lines = out.getvalue().splitlines() itr = iter(lines) self.assertEqual("Syncing 'first' v1", next(itr)) self.assertEqual(' 1 i2c: I2C things', next(itr)) self.assertEqual( ' + Tested-by: Mary Smith # yak', next(itr)) self.assertEqual(' 2 spi: SPI fixes', next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual( "Processing 2 commits from branch 'first'", next(itr)) self.assertRegex( next(itr), f'- added 1 tag: {HASH_RE} as {HASH_RE} i2c: I2C things') self.assertRegex( next(itr), f"- upd links '1:123': {HASH_RE} as {HASH_RE} spi: SPI fixes") self.assertRegex( next(itr), f'Updating branch first from {HASH_RE} to {HASH_RE}') self.assertEqual('', next(itr)) self.assertEqual("Syncing 'first' v2", next(itr)) self.assertEqual(' 1 i2c: I2C things', next(itr)) self.assertEqual( ' + Tested-by: Mary Smith # yak', next(itr)) self.assertEqual(' 2 spi: SPI fixes', next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual( "Processing 2 commits from branch 'first2'", next(itr)) self.assertRegex( next(itr), f'- added 1 tag: {HASH_RE} as {HASH_RE} ' 'i2c: I2C things') self.assertRegex( next(itr), f"- upd links '2:1234': {HASH_RE} as {HASH_RE} spi: SPI fixes") self.assertRegex( next(itr), f'Updating branch first2 from {HASH_RE} to {HASH_RE}') self.assertEqual('', next(itr)) self.assertEqual("Syncing 'first' v3", next(itr)) self.assertEqual(' 1 i2c: I2C things', next(itr)) self.assertEqual( ' + Tested-by: Mary Smith # yak', next(itr)) self.assertEqual(' 2 spi: SPI fixes', next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual( "Processing 2 commits from branch 'first3'", next(itr)) self.assertRegex( next(itr), f'- added 1 tag: {HASH_RE} as {HASH_RE} i2c: I2C things') self.assertRegex( next(itr), f"- upd links '3:31': {HASH_RE} as {HASH_RE} spi: SPI fixes") self.assertRegex( next(itr), f'Updating branch first3 from {HASH_RE} to {HASH_RE}') self.assertEqual('', next(itr)) self.assertEqual("Syncing 'second' v1", next(itr)) self.assertEqual(' 1 video: Some video improvements', next(itr)) self.assertEqual( ' + Reviewed-by: Fred Bloggs ', next(itr)) self.assertEqual( 'Review: Fred Bloggs ', next(itr)) self.assertEqual(' > This was my original patch', next(itr)) self.assertEqual(' > which is being quoted', next(itr)) self.assertEqual( ' I like the approach here and I would love to see more ' 'of it.', next(itr)) self.assertEqual('', next(itr)) self.assertEqual(' 2 serial: Add a serial driver', next(itr)) self.assertEqual(' 3 bootm: Make it boot', next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual( "Processing 3 commits from branch 'second'", next(itr)) self.assertRegex( next(itr), f'- added 1 tag: {HASH_RE} as {HASH_RE} ' 'video: Some video improvements') self.assertRegex( next(itr), f"- upd links '1:456': {HASH_RE} as {HASH_RE} " 'serial: Add a serial driver') self.assertRegex( next(itr), f'- {HASH_RE} as {HASH_RE} ' 'bootm: Make it boot') self.assertRegex( next(itr), f'Updating branch second from {HASH_RE} to {HASH_RE}') self.assertEqual('', next(itr)) self.assertEqual("Syncing 'second' v2", next(itr)) self.assertEqual(' 1 video: Some video improvements', next(itr)) self.assertEqual( ' + Reviewed-by: Fred Bloggs ', next(itr)) self.assertEqual( 'Review: Fred Bloggs ', next(itr)) self.assertEqual(' > This was my original patch', next(itr)) self.assertEqual(' > which is being quoted', next(itr)) self.assertEqual( ' I like the approach here and I would love to see more ' 'of it.', next(itr)) self.assertEqual('', next(itr)) self.assertEqual(' 2 serial: Add a serial driver', next(itr)) self.assertEqual(' 3 bootm: Make it boot', next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual( "Processing 3 commits from branch 'second2'", next(itr)) self.assertRegex( next(itr), f'- added 1 tag: {HASH_RE} as {HASH_RE} ' 'video: Some video improvements') self.assertRegex( next(itr), f"- upd links '2:457 1:456': {HASH_RE} as {HASH_RE} " 'serial: Add a serial driver') self.assertRegex( next(itr), f'- {HASH_RE} as {HASH_RE} ' 'bootm: Make it boot') self.assertRegex( next(itr), f'Updating branch second2 from {HASH_RE} to {HASH_RE}') self.assertEqual('', next(itr)) self.assertEqual( '12 patches and 3 cover letters updated, 0 missing links ' '(32 requests)', next(itr)) self.assert_finished(itr) yield None def test_series_gather_all(self): """Gather all series at once""" cor = self.check_series_gather_all() cser, pwork = next(cor) # no options cser.gather_all(pwork, False, True, False, False, dry_run=True) cser, pwork = next(cor) # gather cser.gather_all(pwork, False, False, False, True, dry_run=True) cser, pwork = next(cor) # gather, patch comments, !dry_run cser.gather_all(pwork, True, False, True, True) self.assertFalse(next(cor)) def test_series_gather_all_cmdline(self): """Sync all series at once using cmdline""" cor = self.check_series_gather_all() _, pwork = next(cor) # no options self.run_args('series', '-n', '-s', 'second', 'gather-all', '-G', pwork=pwork) _, pwork = next(cor) # gather self.run_args('series', '-n', '-s', 'second', 'gather-all', pwork=pwork) _, pwork = next(cor) # gather, patch comments, !dry_run self.run_args('series', '-s', 'second', 'gather-all', '-a', '-c', pwork=pwork) self.assertFalse(next(cor)) def _check_second(self, itr, show_all): """Check output from a 'progress' command Args: itr (Iterator): Contains the output lines to check show_all (bool): True if all versions are being shown, not just latest """ self.assertEqual('second: Series for my board (versions: 1 2)', next(itr)) if show_all: self.assertEqual("Branch 'second' (total 3): 3:unknown", next(itr)) self.assertIn('PatchId', next(itr)) self.assertRegex( next(itr), ' 0 unknown - .* video: Some video improvements') self.assertRegex( next(itr), ' 1 unknown - .* serial: Add a serial driver') self.assertRegex( next(itr), ' 2 unknown - .* bootm: Make it boot') self.assertEqual('', next(itr)) self.assertEqual( "Branch 'second2' (total 3): 1:accepted 1:changes 1:rejected", next(itr)) self.assertIn('PatchId', next(itr)) self.assertEqual( 'Cov 2 139 ' 'The name of the cover letter', next(itr)) self.assertRegex( next(itr), ' 0 accepted 2 110 .* video: Some video improvements') self.assertRegex( next(itr), ' 1 changes 111 .* serial: Add a serial driver') self.assertRegex( next(itr), ' 2 rejected 3 112 .* bootm: Make it boot') def test_series_progress(self): """Test showing progress for a cseries""" self.setup_second() self.db_close() with self.stage('latest versions'): args = Namespace(subcmd='progress', series='second', show_all_versions=False, list_patches=True) with terminal.capture() as (out, _): control.do_series(args, test_db=self.tmpdir, pwork=True) lines = iter(out.getvalue().splitlines()) self._check_second(lines, False) with self.stage('all versions'): args.show_all_versions = True with terminal.capture() as (out, _): control.do_series(args, test_db=self.tmpdir, pwork=True) lines = iter(out.getvalue().splitlines()) self._check_second(lines, True) def _check_first(self, itr): """Check output from the progress command Args: itr (Iterator): Contains the output lines to check """ self.assertEqual('first: (versions: 1)', next(itr)) self.assertEqual("Branch 'first' (total 2): 2:unknown", next(itr)) self.assertIn('PatchId', next(itr)) self.assertRegex( next(itr), ' 0 unknown - .* i2c: I2C things') self.assertRegex( next(itr), ' 1 unknown - .* spi: SPI fixes') self.assertEqual('', next(itr)) def test_series_progress_all(self): """Test showing progress for all cseries""" self.setup_second() self.db_close() with self.stage('progress with patches'): args = Namespace(subcmd='progress', series=None, show_all_versions=False, list_patches=True) with terminal.capture() as (out, _): control.do_series(args, test_db=self.tmpdir, pwork=True) lines = iter(out.getvalue().splitlines()) self._check_first(lines) self._check_second(lines, False) with self.stage('all versions'): args.show_all_versions = True with terminal.capture() as (out, _): control.do_series(args, test_db=self.tmpdir, pwork=True) lines = iter(out.getvalue().splitlines()) self._check_first(lines) self._check_second(lines, True) def test_series_progress_no_patches(self): """Test showing progress for all cseries without patches""" self.setup_second() with terminal.capture() as (out, _): self.run_args('series', 'progress', pwork=True) itr = iter(out.getvalue().splitlines()) self.assertEqual( 'Name Description ' 'Count Status', next(itr)) self.assertTrue(next(itr).startswith('--')) self.assertEqual( 'first ' ' 2 2:unknown', next(itr)) self.assertEqual( 'second2 The name of the cover letter ' ' 3 1:accepted 1:changes 1:rejected', next(itr)) self.assertTrue(next(itr).startswith('--')) self.assertEqual( ['2', 'series', '5', '2:unknown', '1:accepted', '1:changes', '1:rejected'], next(itr).split()) self.assert_finished(itr) def test_series_progress_all_no_patches(self): """Test showing progress for all cseries versions without patches""" self.setup_second() with terminal.capture() as (out, _): self.run_args('series', 'progress', '--show-all-versions', pwork=True) itr = iter(out.getvalue().splitlines()) self.assertEqual( 'Name Description ' 'Count Status', next(itr)) self.assertTrue(next(itr).startswith('--')) self.assertEqual( 'first ' ' 2 2:unknown', next(itr)) self.assertEqual( 'second Series for my board ' ' 3 3:unknown', next(itr)) self.assertEqual( 'second2 The name of the cover letter ' ' 3 1:accepted 1:changes 1:rejected', next(itr)) self.assertTrue(next(itr).startswith('--')) self.assertEqual( ['3', 'series', '8', '5:unknown', '1:accepted', '1:changes', '1:rejected'], next(itr).split()) self.assert_finished(itr) def test_series_summary(self): """Test showing a summary of series status""" self.setup_second() self.db_close() args = Namespace(subcmd='summary', series=None) with terminal.capture() as (out, _): control.do_series(args, test_db=self.tmpdir, pwork=True) lines = out.getvalue().splitlines() self.assertEqual( 'Name Status Description', lines[0]) self.assertEqual( '----------------- ------ ------------------------------', lines[1]) self.assertEqual('first -/2 ', lines[2]) self.assertEqual('second 1/3 Series for my board', lines[3]) def test_series_open(self): """Test opening a series in a web browser""" cser = self.get_cser() pwork = Patchwork.for_testing(self._fake_patchwork_cser) self.assertFalse(cser.project_get()) pwork.project_set(self.PROJ_ID, self.PROJ_LINK_NAME) with terminal.capture(): cser.add('second', allow_unmarked=True) cser.increment('second') cser.link_auto(pwork, 'second', 2, True) cser.gather(pwork, 'second', 2, False, False, False) with mock.patch.object(cros_subprocess.Popen, '__init__', return_value=None) as method: with terminal.capture() as (out, _): cser.open(pwork, 'second2', 2) url = ('https://patchwork.ozlabs.org/project/uboot/list/?series=457' '&state=*&archive=both') method.assert_called_once_with(['xdg-open', url]) self.assertEqual(f'Opening {url}', out.getvalue().strip()) def test_name_version(self): """Test handling of series names and versions""" cser = self.get_cser() repo = self.repo self.assertEqual(('fred', None), cser_helper.split_name_version('fred')) self.assertEqual(('mary', 2), cser_helper.split_name_version('mary2')) ser, version = cser._parse_series_and_version(None, None) self.assertEqual('first', ser.name) self.assertEqual(1, version) ser, version = cser._parse_series_and_version('first', None) self.assertEqual('first', ser.name) self.assertEqual(1, version) ser, version = cser._parse_series_and_version('first', 2) self.assertEqual('first', ser.name) self.assertEqual(2, version) with self.assertRaises(ValueError) as exc: cser._parse_series_and_version('123', 2) self.assertEqual( "Series name '123' cannot be a number, use ''", str(exc.exception)) with self.assertRaises(ValueError) as exc: cser._parse_series_and_version('first', 100) self.assertEqual("Version 100 exceeds 99", str(exc.exception)) with terminal.capture() as (_, err): cser._parse_series_and_version('mary3', 4) self.assertIn('Version mismatch: -V has 4 but branch name indicates 3', err.getvalue()) ser, version = cser._parse_series_and_version('mary', 4) self.assertEqual('mary', ser.name) self.assertEqual(4, version) # Move off the branch and check for a sensible error commit = repo.revparse_single('first~') repo.checkout_tree(commit) repo.set_head(commit.oid) with self.assertRaises(ValueError) as exc: cser._parse_series_and_version(None, None) self.assertEqual('No branch detected: please use -s ', str(exc.exception)) def test_name_version_extra(self): """More tests for some corner cases""" cser, _ = self.setup_second() target = self.repo.lookup_reference('refs/heads/second2') self.repo.checkout( target, strategy=pygit2.enums.CheckoutStrategy.FORCE) ser, version = cser._parse_series_and_version(None, None) self.assertEqual('second', ser.name) self.assertEqual(2, version) ser, version = cser._parse_series_and_version('second2', None) self.assertEqual('second', ser.name) self.assertEqual(2, version) def test_migrate(self): """Test migration to later schema versions""" db = database.Database(f'{self.tmpdir}/.patman.db') with terminal.capture() as (out, err): db.open_it() self.assertEqual( f'Creating new database {self.tmpdir}/.patman.db', err.getvalue().strip()) self.assertEqual(0, db.get_schema_version()) for version in range(1, database.LATEST + 1): with terminal.capture() as (out, _): db.migrate_to(version) self.assertTrue(os.path.exists( f'{self.tmpdir}/.patman.dbold.v{version - 1}')) self.assertEqual(f'Update database to v{version}', out.getvalue().strip()) self.assertEqual(version, db.get_schema_version()) self.assertEqual(4, database.LATEST) def test_series_scan(self): """Test scanning a series for updates""" cser, _ = self.setup_second() target = self.repo.lookup_reference('refs/heads/second2') self.repo.checkout( target, strategy=pygit2.enums.CheckoutStrategy.FORCE) # Add a new commit self.repo = pygit2.init_repository(self.gitdir) self.make_commit_with_file( 'wip: Try out a new thing', 'Just checking', 'wibble.c', '''changes to wibble''') target = self.repo.revparse_single('HEAD') self.repo.reset(target.oid, pygit2.enums.ResetMode.HARD) # name = gitutil.get_branch(self.gitdir) # upstream_name = gitutil.get_upstream(self.gitdir, name) name, ser, version, _ = cser.prep_series(None) # We now have 4 commits numbered 0 (second~3) to 3 (the one we just # added). Drop commit 1 (the 'serial' one) from the branch cser._filter_commits(name, ser, 1) svid = cser.get_ser_ver(ser.idnum, version).idnum old_pcdict = cser.get_pcommit_dict(svid).values() expect = '''Syncing series 'second2' v2: mark False allow_unmarked True 0 video: Some video improvements - 1 serial: Add a serial driver 1 bootm: Make it boot + 2 Just checking ''' with terminal.capture() as (out, _): self.run_args('series', '-n', 'scan', '-M', pwork=True) self.assertEqual(expect + 'Dry run completed\n', out.getvalue()) new_pcdict = cser.get_pcommit_dict(svid).values() self.assertEqual(list(old_pcdict), list(new_pcdict)) with terminal.capture() as (out, _): self.run_args('series', 'scan', '-M', pwork=True) self.assertEqual(expect, out.getvalue()) new_pcdict = cser.get_pcommit_dict(svid).values() self.assertEqual(len(old_pcdict), len(new_pcdict)) chk = list(new_pcdict) self.assertNotEqual(list(old_pcdict), list(new_pcdict)) self.assertEqual('video: Some video improvements', chk[0].subject) self.assertEqual('bootm: Make it boot', chk[1].subject) self.assertEqual('Just checking', chk[2].subject) def test_series_send(self): """Test sending a series""" cser, pwork = self.setup_second() # Create a third version with terminal.capture(): cser.increment('second') series = patchstream.get_metadata_for_list('second3', self.gitdir, 3) self.assertEqual('2:457 1:456', series.links) self.assertEqual('3', series.version) with terminal.capture() as (out, err): self.run_args('series', '-n', '-s', 'second3', 'send', '--no-autolink', pwork=pwork) self.assertIn('Send a total of 3 patches with a cover letter', out.getvalue()) self.assertIn( 'video.c:1: warning: Missing or malformed SPDX-License-Identifier ' 'tag in line 1', err.getvalue()) self.assertIn( ':19: warning: added, moved or deleted file(s), does ' 'MAINTAINERS need updating?', err.getvalue()) self.assertIn('bootm.c:1: check: Avoid CamelCase: ', err.getvalue()) self.assertIn( 'Cc: Anatolij Gustschin ', out.getvalue()) self.assertTrue(os.path.exists(os.path.join( self.tmpdir, '0001-video-Some-video-improvements.patch'))) self.assertTrue(os.path.exists(os.path.join( self.tmpdir, '0002-serial-Add-a-serial-driver.patch'))) self.assertTrue(os.path.exists(os.path.join( self.tmpdir, '0003-bootm-Make-it-boot.patch'))) def test_series_send_and_link(self): """Test sending a series and then adding its link to the database""" def h_sleep(time_s): if cser.get_time() > 25: self.autolink_extra = {'id': 500, 'name': 'Series for my board', 'version': 3} cser.inc_fake_time(time_s) cser, pwork = self.setup_second() # Create a third version with terminal.capture(): cser.increment('second') series = patchstream.get_metadata_for_list('second3', self.gitdir, 3) self.assertEqual('2:457 1:456', series.links) self.assertEqual('3', series.version) with terminal.capture(): self.run_args('series', '-n', 'send', pwork=pwork) cser.set_fake_time(h_sleep) with terminal.capture() as (out, _): cser.link_auto(pwork, 'second3', 3, True, 50) itr = iter(out.getvalue().splitlines()) for i in range(7): self.assertEqual( "Possible matches for 'second' v3 desc 'Series for my board':", next(itr), f'failed at i={i}') self.assertEqual(' Link Version Description', next(itr)) self.assertEqual(' 456 1 Series for my board', next(itr)) self.assertEqual(' 457 2 Series for my board', next(itr)) self.assertEqual('Sleeping for 5 seconds', next(itr)) self.assertEqual('Link completed after 35 seconds', next(itr)) self.assertRegex( next(itr), 'Checking out upstream commit refs/heads/base: .*') self.assertEqual( "Processing 3 commits from branch 'second3'", next(itr)) self.assertRegex( next(itr), f'- {HASH_RE} as {HASH_RE} ' 'video: Some video improvements') self.assertRegex( next(itr), f"- add links '3:500 2:457 1:456': {HASH_RE} as {HASH_RE} " 'serial: Add a serial driver') self.assertRegex( next(itr), f'- add v3: {HASH_RE} as {HASH_RE} ' 'bootm: Make it boot') self.assertRegex( next(itr), f'Updating branch second3 from {HASH_RE} to {HASH_RE}') self.assertEqual( "Setting link for series 'second' v3 to 500", next(itr)) def _check_status(self, out, has_comments, has_cover_comments): """Check output from the status command Args: itr (Iterator): Contains the output lines to check """ itr = iter(out.getvalue().splitlines()) if has_cover_comments: self.assertEqual('Cov The name of the cover letter', next(itr)) self.assertEqual( 'From: A user : Sun 13 Apr 14:06:02 MDT 2025', next(itr)) self.assertEqual('some comment', next(itr)) self.assertEqual('', next(itr)) self.assertEqual( 'From: Ghenkis Khan : Sun 13 Apr 13:06:02 ' 'MDT 2025', next(itr)) self.assertEqual('another comment', next(itr)) self.assertEqual('', next(itr)) self.assertEqual(' 1 video: Some video improvements', next(itr)) self.assertEqual(' + Reviewed-by: Fred Bloggs ', next(itr)) if has_comments: self.assertEqual( 'Review: Fred Bloggs ', next(itr)) self.assertEqual(' > This was my original patch', next(itr)) self.assertEqual(' > which is being quoted', next(itr)) self.assertEqual( ' I like the approach here and I would love to see more ' 'of it.', next(itr)) self.assertEqual('', next(itr)) self.assertEqual(' 2 serial: Add a serial driver', next(itr)) self.assertEqual(' 3 bootm: Make it boot', next(itr)) self.assertEqual( '1 new response available in patchwork (use -d to write them to ' 'a new branch)', next(itr)) def test_series_status(self): """Test getting the status of a series, including comments""" cser, pwork = self.setup_second() # Use single threading for easy debugging, but the multithreaded # version should produce the same output with self.stage('status second2: single-threaded'): with terminal.capture() as (out, _): cser.status(pwork, 'second', 2, False) self._check_status(out, False, False) self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) with self.stage('status second2 (normal)'): with terminal.capture() as (out2, _): cser.status(pwork, 'second', 2, False) self.assertEqual(out.getvalue(), out2.getvalue()) self._check_status(out, False, False) with self.stage('with comments'): with terminal.capture() as (out, _): cser.status(pwork, 'second', 2, show_comments=True) self._check_status(out, True, False) with self.stage('with comments and cover comments'): with terminal.capture() as (out, _): cser.status(pwork, 'second', 2, show_comments=True, show_cover_comments=True) self._check_status(out, True, True) def test_series_status_cmdline(self): """Test getting the status of a series, including comments""" cser, pwork = self.setup_second() with self.stage('status second2'): with terminal.capture() as (out, _): self.run_args('series', '-s', 'second', '-V', '2', 'status', pwork=pwork) self._check_status(out, False, False) with self.stage('status second2 (normal)'): with terminal.capture() as (out, _): cser.status(pwork, 'second', 2, show_comments=True) self._check_status(out, True, False) with self.stage('with comments and cover comments'): with terminal.capture() as (out, _): cser.status(pwork, 'second', 2, show_comments=True, show_cover_comments=True) self._check_status(out, True, True) def test_series_no_subcmd(self): """Test handling of things without a subcommand""" parsers = cmdline.setup_parser() parsers['series'].catch_error = True with terminal.capture() as (out, _): cmdline.parse_args(['series'], parsers=parsers) self.assertIn('usage: patman series', out.getvalue()) parsers['patchwork'].catch_error = True with terminal.capture() as (out, _): cmdline.parse_args(['patchwork'], parsers=parsers) self.assertIn('usage: patman patchwork', out.getvalue()) parsers['upstream'].catch_error = True with terminal.capture() as (out, _): cmdline.parse_args(['upstream'], parsers=parsers) self.assertIn('usage: patman upstream', out.getvalue()) def check_series_rename(self): """Check renaming a series""" cser = self.get_cser() with self.stage('setup'): with terminal.capture() as (out, _): cser.add('first', 'my name', allow_unmarked=True) # Remember the old series old = cser.get_series_by_name('first') self.assertEqual('first', gitutil.get_branch(self.gitdir)) with terminal.capture() as (out, _): cser.increment('first') self.assertEqual('first2', gitutil.get_branch(self.gitdir)) with terminal.capture() as (out, _): cser.increment('first') self.assertEqual('first3', gitutil.get_branch(self.gitdir)) # Do the dry run with self.stage('rename - dry run'): with terminal.capture() as (out, _): yield cser lines = out.getvalue().splitlines() itr = iter(lines) self.assertEqual("Renaming branch 'first' to 'newname'", next(itr)) self.assertEqual( "Renaming branch 'first2' to 'newname2'", next(itr)) self.assertEqual( "Renaming branch 'first3' to 'newname3'", next(itr)) self.assertEqual("Renamed series 'first' to 'newname'", next(itr)) self.assertEqual("Dry run completed", next(itr)) self.assert_finished(itr) # Check nothing changed self.assertEqual('first3', gitutil.get_branch(self.gitdir)) sdict = cser.db.series_get_dict() self.assertIn('first', sdict) # Now do it for real with self.stage('rename - real'): with terminal.capture() as (out2, _): yield cser lines2 = out2.getvalue().splitlines() self.assertEqual(lines[:-1], lines2) self.assertEqual('newname3', gitutil.get_branch(self.gitdir)) # Check the series ID did not change ser = cser.get_series_by_name('newname') self.assertEqual(old.idnum, ser.idnum) yield None def test_series_rename(self): """Test renaming of a series""" cor = self.check_series_rename() cser = next(cor) # Rename (dry run) cser.rename('first', 'newname', dry_run=True) cser = next(cor) # Rename (real) cser.rename('first', 'newname') self.assertFalse(next(cor)) def test_series_rename_cmdline(self): """Test renaming of a series with the cmdline""" cor = self.check_series_rename() next(cor) # Rename (dry run) self.run_args('series', '-n', '-s', 'first', 'rename', '-N', 'newname', pwork=True) next(cor) # Rename (real) self.run_args('series', '-s', 'first', 'rename', '-N', 'newname', pwork=True) self.assertFalse(next(cor)) def test_series_rename_bad(self): """Test renaming when it is not allowed""" cser = self.get_cser() with terminal.capture(): cser.add('first', 'my name', allow_unmarked=True) cser.increment('first') cser.increment('first') with self.assertRaises(ValueError) as exc: cser.rename('first', 'first') self.assertEqual("Cannot rename series 'first' to itself", str(exc.exception)) with self.assertRaises(ValueError) as exc: cser.rename('first2', 'newname') self.assertEqual( "Invalid series name 'first2': did you use the branch name?", str(exc.exception)) with self.assertRaises(ValueError) as exc: cser.rename('first', 'newname2') self.assertEqual( "Invalid series name 'newname2': did you use the branch name?", str(exc.exception)) with self.assertRaises(ValueError) as exc: cser.rename('first', 'second') self.assertEqual("Cannot rename: branches exist: second", str(exc.exception)) with terminal.capture(): cser.add('second', 'another name', allow_unmarked=True) cser.increment('second') with self.assertRaises(ValueError) as exc: cser.rename('first', 'second') self.assertEqual("Cannot rename: series 'second' already exists", str(exc.exception)) # Rename second2 so that it gets in the way of the rename gitutil.rename_branch('second2', 'newname2', self.gitdir) with self.assertRaises(ValueError) as exc: cser.rename('first', 'newname') self.assertEqual("Cannot rename: branches exist: newname2", str(exc.exception)) # Rename first3 and make sure it stops the rename gitutil.rename_branch('first3', 'tempbranch', self.gitdir) with self.assertRaises(ValueError) as exc: cser.rename('first', 'newname') self.assertEqual( "Cannot rename: branches missing: first3: branches exist: " 'newname2', str(exc.exception)) def test_version_change(self): """Test changing a version of a series to a different version number""" cser = self.get_cser() with self.stage('setup'): with terminal.capture(): cser.add('first', 'my description', allow_unmarked=True) with self.stage('non-existent version'): # Check changing a non-existent version with self.assertRaises(ValueError) as exc: cser.version_change('first', 2, 3, dry_run=True) self.assertEqual("Series 'first' does not have a version 2", str(exc.exception)) with self.stage('new version missing'): with self.assertRaises(ValueError) as exc: cser.version_change('first', None, None, dry_run=True) self.assertEqual("Please provide a new version number", str(exc.exception)) # Change v1 to v2 (dry run) with self.stage('v1 -> 2 dry run'): with terminal.capture(): self.assertTrue(gitutil.check_branch('first', self.gitdir)) cser.version_change('first', 1, 3, dry_run=True) self.assertTrue(gitutil.check_branch('first', self.gitdir)) self.assertFalse(gitutil.check_branch('first3', self.gitdir)) # Check that nothing actually happened series = patchstream.get_metadata('first', 0, 2, git_dir=self.gitdir) self.assertNotIn('version', series) svlist = cser.get_ser_ver_list() self.assertEqual(1, len(svlist)) item = svlist[0] self.assertEqual(1, item.version) with self.stage('increment twice'): # Increment so that we get first3 with terminal.capture(): cser.increment('first') cser.increment('first') with self.stage('existing version'): # Check changing to an existing version with self.assertRaises(ValueError) as exc: cser.version_change('first', 1, 3, dry_run=True) self.assertEqual("Series 'first' already has a v3: 1 2 3", str(exc.exception)) # Change v1 to v4 (for real) with self.stage('v1 -> 4'): with terminal.capture(): self.assertTrue(gitutil.check_branch('first', self.gitdir)) cser.version_change('first', 1, 4) self.assertTrue(gitutil.check_branch('first', self.gitdir)) self.assertTrue(gitutil.check_branch('first4', self.gitdir)) series = patchstream.get_metadata('first4', 0, 2, git_dir=self.gitdir) self.assertIn('version', series) self.assertEqual('4', series.version) svdict = cser.get_ser_ver_dict() self.assertEqual(3, len(svdict)) item = svdict[item.idnum] self.assertEqual(4, item.version) with self.stage('increment'): # Now try to increment first again with terminal.capture(): cser.increment('first') ser = cser.get_series_by_name('first') self.assertIn(5, cser._get_version_list(ser.idnum)) def test_version_change_cmdline(self): """Check changing a version on the cmdline""" self.get_cser() with (mock.patch.object(cseries.Cseries, 'version_change', return_value=None) as method): self.run_args('series', '-s', 'first', 'version-change', pwork=True) method.assert_called_once_with('first', None, None, dry_run=False) with (mock.patch.object(cseries.Cseries, 'version_change', return_value=None) as method): self.run_args('series', '-s', 'first', 'version-change', '--new-version', '3', pwork=True) method.assert_called_once_with('first', None, 3, dry_run=False)