[Piuparts-devel] [Git][debian/piuparts][develop] 7 commits: Switch tmpfiles.d to denylisting

Nicolas Dandrimont (@olasd) gitlab at salsa.debian.org
Mon Jun 22 14:56:58 BST 2026



Nicolas Dandrimont pushed to branch develop at Debian / piuparts


Commits:
82feece7 by Luca Boccassi at 2026-01-26T12:41:52+00:00
Switch tmpfiles.d to denylisting

Instead of having a list of flags that are allowed, which gets out
of date fast, have a list of flags that get raised.

For example piuparts on src:systemd is currently failing because the Q
flag is not allowed by mistake:

0m27.4s ERROR: FAIL: Package purging left files on system:
  /var/lib/machines/	 not owned
  /var/lib/portables/	 not owned

portables.conf:
Q /var/lib/portables 0700
systemd-nspawn.conf:
Q /var/lib/machines 0700

https://piuparts.debian.org/sid-strict/fail/systemd-container_257.5-2.log

Follow-up for 6c3497347c44ce509a7c87af9cceb9fafe7dabdf

- - - - -
7893fceb by Helmut Grohne at 2026-03-02T12:53:38+00:00
add Python type hints to piuparts.py and its dependencies

In order to fix a few type errors, some semantic changes are included.
 * Methods of the Defaults class raise NotImplementedError rather than
   returning None.
 * The settings class assigns default values for apt_unauthenticated,
   distro_config and testobjects in its constructor.
 * Renamed a few variables that were used with different types.
 * Added a number of assert something is not None to guide mypy.
 * get_state_meta_data constructs its result at once.
 * TimeOffsetFormatter.formatTime signature updated to match super
   class: Added default value.
 * The type of the warn_only argument changed to bool in two methods.
 * Added missing return statements and None values to existing return
   statements.
 * find_default_debian_mirrors returns [] when it previously returned
   None.
 * Rewrote DistroConfig.get as get_field to avoid conflicting with the
   super class type.
 * DistroConfig.get_deb_lines caches a lookup to help mypy track state.
 * unqualify always returns a list now.

- - - - -
070ec74c by Luca Boccassi at 2026-04-28T22:30:02+01:00
Install and use sysusers.d config files

sysusers.d config files allow a package to use declarative
configuration instead of manually written maintainer scripts.
This also allows image-based systems to be created with
/usr/ only, and also allows for factory resetting a system
and recreating /etc/ on boot.

https://www.freedesktop.org/software/systemd/man/latest/sysusers.d.html

- - - - -
4f524818 by Nicolas Dandrimont at 2026-05-04T21:58:32+02:00
Merge branch 'mr/74' into develop

- - - - -
56cd734f by Nicolas Dandrimont at 2026-05-04T22:00:38+02:00
Merge branch 'mr/79' into develop

- - - - -
8599e426 by Nicolas Dandrimont at 2026-05-04T22:05:23+02:00
Merge branch 'mr/76' into develop

- - - - -
fb2e2c4e by Nicolas Dandrimont at 2026-05-04T22:05:37+02:00
Run black 26.3.1 on python code

- - - - -


17 changed files:

- debian/control
- debian/piuparts-master.postinst
- + debian/piuparts-master.sysusers
- debian/piuparts-slave.postinst
- + debian/piuparts-slave.sysusers
- master-bin/detect_well_known_errors.py
- piuparts-analyze.py
- piuparts-master-backend.py
- piuparts-report.py
- piuparts-slave.py
- piuparts.py
- piupartslib/__init__.py
- piupartslib/conf.py
- piupartslib/dependencyparser.py
- piupartslib/dwke.py
- piupartslib/packagesdb.py
- tests/unittests.py


Changes:

=====================================
debian/control
=====================================
@@ -10,6 +10,7 @@ Rules-Requires-Root: no
 Build-Depends:
  debhelper-compat (= 13),
  dh-python,
+ dh-sequence-installsysusers,
  python3-all,
  python3-apt,
  python3-debian,
@@ -64,7 +65,6 @@ Depends:
  piuparts-common (>= ${source:Version}),
  piuparts-common (<< ${source:Version}+),
 # keep this list in sync with piuparts-master-from-git-deps
- adduser,
  openssh-server,
  python3-debianbts,
  python3-setproctitle,
@@ -97,7 +97,6 @@ Architecture: all
 Depends:
  git,
 # this list is synced from piuparts-master
- adduser,
  openssh-server,
  python3-debianbts,
  python3-setproctitle,
@@ -127,7 +126,6 @@ Depends:
  piuparts (= ${binary:Version}),
  piuparts-common (= ${binary:Version}),
 # keep this list in sync with piuparts-slave-from-git-deps
- adduser,
  openssh-client,
  screen,
  sudo,
@@ -151,7 +149,6 @@ Depends:
  git,
  pkgconf,
 # this list is synced from piuparts-slave
- adduser,
  openssh-client,
  screen,
  sudo,


=====================================
debian/piuparts-master.postinst
=====================================
@@ -8,13 +8,10 @@ userhome=/var/lib/piuparts
 
 if [ "$1" = "configure" ] ; then
 
-    addgroup --system --quiet $pgroup
+    systemd-sysusers ${DPKG_ROOT:+--root="$DPKG_ROOT"} piuparts-master.conf
 
     for user in $muser
     do
-        adduser --system --quiet --home $userhome/$user --ingroup $pgroup \
-                --shell /bin/sh $user
-
 	if ! [ -d $userhome/$user ]; then
 		mkdir -m 0755 $userhome/$user
 		chown $user:$pgroup $userhome/$user


=====================================
debian/piuparts-master.sysusers
=====================================
@@ -0,0 +1,2 @@
+g piuparts -
+u! piupartsm -:piuparts - /var/lib/piuparts/piupartsm /bin/sh


=====================================
debian/piuparts-slave.postinst
=====================================
@@ -8,13 +8,10 @@ userhome=/var/lib/piuparts
 
 if [ "$1" = "configure" ] ; then
 
-    addgroup --system --quiet $pgroup
+    systemd-sysusers ${DPKG_ROOT:+--root="$DPKG_ROOT"} piuparts-slave.conf
 
     for user in $suser
     do
-        adduser --system --quiet --home $userhome/$user --ingroup $pgroup \
-                --shell /bin/sh $user
-
 	if ! [ -d $userhome/$user ]; then
 		mkdir -m 0755 $userhome/$user
 		chown $user:$pgroup $userhome/$user


=====================================
debian/piuparts-slave.sysusers
=====================================
@@ -0,0 +1,2 @@
+g piuparts -
+u! piupartss -:piuparts - /var/lib/piuparts/piupartss /bin/sh


=====================================
master-bin/detect_well_known_errors.py
=====================================
@@ -109,13 +109,13 @@ def detect_well_known_errors(sections, config, problem_list, recheck, recheck_fa
     total_add = 0
     todo = deque([(s, 0) for s in sections])
     while len(todo):
-        (section, next_try) = todo.popleft()
+        section, next_try = todo.popleft()
         now = time.time()
         if now < next_try:
             # sleeping, section has been tried recently
             time.sleep(max(30, next_try - now) + 30)
         try:
-            (del_cnt, add_cnt) = process_section(section, config, problem_list, recheck, recheck_failed)
+            del_cnt, add_cnt = process_section(section, config, problem_list, recheck, recheck_failed)
             total_del += del_cnt
             total_add += add_cnt
             current_time = time.strftime("%a %b %2d %H:%M:%S %Z %Y", time.localtime())


=====================================
piuparts-analyze.py
=====================================
@@ -30,6 +30,7 @@ headers of the log in ./fail to the one in ./bugged and vice versa. It will then
 move the failed log to ./bugged as well.
 
 """
+
 from __future__ import print_function
 
 import fcntl
@@ -335,7 +336,7 @@ def main():
 
         todo = deque([(s, 0) for s in sections])
         while len(todo):
-            (section_name, next_try) = todo.popleft()
+            section_name, next_try = todo.popleft()
             now = time.time()
             if now < next_try:
                 print("Sleeping while section is busy")


=====================================
piuparts-master-backend.py
=====================================
@@ -23,7 +23,6 @@
 Lars Wirzenius <liw at iki.fi>
 """
 
-
 import fcntl
 import logging
 import os


=====================================
piuparts-report.py
=====================================
@@ -25,7 +25,6 @@
 Lars Wirzenius <liw at iki.fi>
 """
 
-
 import fcntl
 import hashlib
 import logging
@@ -769,7 +768,7 @@ def update_file(source, target):
         try:
             shutil.copyfile(source, target)
         except IOError as xxx_todo_changeme:
-            (errno, strerror) = xxx_todo_changeme.args
+            errno, strerror = xxx_todo_changeme.args
             logging.error("failed to copy %s to %s: I/O error(%d): %s" % (source, target, errno, strerror))
 
 
@@ -1243,7 +1242,7 @@ class Section:
                 packages[state] = []
                 package_rows[state] = ""
             for source in sorted(sources):
-                (state, sourcerows, binaryrows) = source_data[source]
+                state, sourcerows, binaryrows = source_data[source]
                 packages[state].append(source)
                 package_rows[state] += sourcerows + binaryrows
 
@@ -1496,18 +1495,12 @@ class Section:
         r("v <- t[0:nrow(t),0:nstate]")
         # thanks to http://tango.freedesktop.org/Generic_Icon_Theme_Guidelines for those nice colors
         r("palsize = 14")
-        r(
-            'palette(c("#4e9a06", "#ef2929", "#d3d7cf", "#5c3566", "#c4a000", \
+        r('palette(c("#4e9a06", "#ef2929", "#d3d7cf", "#5c3566", "#c4a000", \
                      "#fce94f", "#a40000", "#888a85", "#2e3436", "#729fcf", \
-                     "#3465a4", "#204a87", "#555753", "#ce5c00"))'
-        )
-        r(
-            'barplot(t(v),col = 1:palsize, \
-          main="Binary packages per state in '
-            + self._config.section
-            + '", \
-          xlab="", ylab="Number of binary packages", space=0, border=NA)'
-        )
+                     "#3465a4", "#204a87", "#555753", "#ce5c00"))')
+        r('barplot(t(v),col = 1:palsize, \
+          main="Binary packages per state in ' + self._config.section + '", \
+          xlab="", ylab="Number of binary packages", space=0, border=NA)')
         r('legend(x="bottom",legend=snames, ncol=2,fill=1:palsize,xjust=0.5,yjust=0,bty="n")')
         grdevices.dev_off()
 
@@ -2013,11 +2006,9 @@ def make_bts_stats_graph(master_dir, out_dir):
     r("v <- t[c(4, 2, 3, 1)]")  # reorder columns
     # tango colors again:
     r('palette(c("#a40000", "#ef2929", "#4e9a06", "#8ae234"))')
-    r(
-        'barplot(t(v),col = 1:4, \
+    r('barplot(t(v),col = 1:4, \
         main="Bugs with usertag=piuparts and user=debian-qa at lists.debian.org", \
-        xlab="", ylab="Total number of RC and non-RC bugs submitted and closed", space=0, border=NA)'
-    )
+        xlab="", ylab="Total number of RC and non-RC bugs submitted and closed", space=0, border=NA)')
     r('legend("right", legend=rev(colnames(v)), fill=rev(1:4), inset=0.05, bty="n")')
     grdevices.dev_off()
 
@@ -2064,7 +2055,7 @@ def main():
         )
         todo = deque([(s, 0) for s in process_section_names])
         while len(todo):
-            (section_name, next_try) = todo.popleft()
+            section_name, next_try = todo.popleft()
             now = time.time()
             if now < next_try:
                 logging.info("Sleeping while section is busy")


=====================================
piuparts-slave.py
=====================================
@@ -22,6 +22,7 @@
 
 Lars Wirzenius <liw at iki.fi>
 """
+
 from __future__ import print_function
 
 import fcntl


=====================================
piuparts.py
=====================================
@@ -30,6 +30,7 @@ more usage information.
 
 Lars Wirzenius <liw at iki.fi>
 """
+
 from __future__ import print_function
 
 import errno
@@ -47,11 +48,11 @@ import sys
 import tempfile
 import time
 import traceback
+import typing
 import uuid
 from collections import defaultdict, namedtuple
 from contextlib import ExitStack
 from signal import SIGALRM, SIGKILL, SIGTERM, alarm, signal
-from typing import Dict, Tuple
 
 import apt_pkg
 import distro_info
@@ -77,56 +78,60 @@ class Defaults:
 
     """
 
-    def get_components(self):
+    def get_components(self) -> list[str]:
         """Return list of default components for a mirror."""
+        raise NotImplementedError
 
-    def get_mirror(self):
+    def get_mirror(self) -> list[tuple[str, list[str]]]:
         """Return default mirror."""
+        raise NotImplementedError
 
-    def get_distribution(self):
+    def get_distribution(self) -> list[str]:
         """Return default distribution."""
+        raise NotImplementedError
 
-    def get_keyring(self):
+    def get_keyring(self) -> str:
         """Return default keyring."""
+        raise NotImplementedError
 
 
 class DebianDefaults(Defaults):
-    def get_components(self):
+    def get_components(self) -> list[str]:
         return ["main", "contrib", "non-free", "non-free-firmware"]
 
-    def get_mirror(self):
+    def get_mirror(self) -> list[tuple[str, list[str]]]:
         return [("http://deb.debian.org/debian", self.get_components())]
 
-    def get_distribution(self):
+    def get_distribution(self) -> list[str]:
         return [distro_info.DebianDistroInfo().devel()]
 
-    def get_keyring(self):
+    def get_keyring(self) -> str:
         return "/usr/share/keyrings/debian-archive-keyring.gpg"
 
 
 class UbuntuDefaults(Defaults):
-    def get_components(self):
+    def get_components(self) -> list[str]:
         return ["main", "universe", "restricted", "multiverse"]
 
-    def get_mirror(self):
+    def get_mirror(self) -> list[tuple[str, list[str]]]:
         return [("http://archive.ubuntu.com/ubuntu", self.get_components())]
 
-    def get_distribution(self):
+    def get_distribution(self) -> list[str]:
         return [distro_info.UbuntuDistroInfo().devel()]
 
-    def get_keyring(self):
+    def get_keyring(self) -> str:
         return "/usr/share/keyrings/ubuntu-archive-keyring.gpg"
 
 
 class DefaultsFactory:
     """Instantiate the right defaults class."""
 
-    def guess_flavor(self):
+    def guess_flavor(self) -> str:
         p = subprocess.Popen(["lsb_release", "-i", "-s"], stdout=subprocess.PIPE, universal_newlines=True)
         stdout, stderr = p.communicate()
         return stdout.strip().lower()
 
-    def new_defaults(self):
+    def new_defaults(self) -> Defaults:
         if not settings.defaults:
             settings.defaults = self.guess_flavor()
         if settings.defaults.lower() == "debian":
@@ -140,9 +145,9 @@ class DefaultsFactory:
 class Settings:
     """Global settings for this program."""
 
-    def __init__(self):
-        self.defaults = None
-        self.tmpdir = None
+    def __init__(self) -> None:
+        self.defaults: str | None = None
+        self.tmpdir: str | None = None
         self.keep_env = False
         self.shell_on_error = False
         self.max_command_output_size = (
@@ -154,13 +159,13 @@ class Settings:
         self.args_are_package_files = True
         # distro setup
         self.proxy = None
-        self.debian_mirrors = []
-        self.extra_repos = []
+        self.debian_mirrors: list[tuple[str, list[str]]] = []
+        self.extra_repos: list[str] = []
         self.testdebs_repo = None
-        self.debian_distros = []
-        self.bootstrapcmd = []
+        self.debian_distros: list[str] = []
+        self.bootstrapcmd: list[str] = []
         self.keep_sources_list = False
-        self.keyring = None
+        self.keyring: str | None = None
         self.do_not_verify_signatures = False
         self.no_check_valid_until = False
         self.install_recommends = False
@@ -168,8 +173,8 @@ class Settings:
         self.eatmydata = True
         self.dpkg_force_unsafe_io = True
         self.dpkg_force_confdef = False
-        self.scriptsdirs = []
-        self.bindmounts = []
+        self.scriptsdirs: list[str] = []
+        self.bindmounts: list[str] = []
         self.allow_database = False
         self.update_retries = None
         # chroot setup
@@ -185,7 +190,7 @@ class Settings:
         self.save_end_meta = None
         self.skip_minimize = True
         self.minimize = False
-        self.debfoster_options = None
+        self.debfoster_options: list[str] | None = None
         self.docker_image = None
         self.merged_usr = True
         # tests and checks
@@ -196,8 +201,8 @@ class Settings:
         self.install_remove_install = False
         self.install_purge_install = False
         self.list_installed_files = False
-        self.fake_essential_packages = []
-        self.extra_old_packages = []
+        self.fake_essential_packages: list[str] = []
+        self.extra_old_packages: list[str] = []
         self.skip_cronfiles_test = False
         self.skip_logrotatefiles_test = False
         self.adequate = True
@@ -416,6 +421,9 @@ class Settings:
             ":/lib/modules/([^/]*/(modules.*)?)?",
         ]
         self.non_pedantic_ignore_patterns = ["/tmp/.*"]
+        self.apt_unauthenticated = "No"
+        self.distro_config: piupartslib.conf.DistroConfig | None = None
+        self.testobjects: list[str] | None = None
 
 
 settings = Settings()
@@ -425,7 +433,7 @@ on_panic_hooks = {}
 counter = 0
 
 
-def do_on_panic(hook):
+def do_on_panic(hook: typing.Callable[[], None]) -> int:
     global counter
     cid = counter
     counter += 1
@@ -433,16 +441,16 @@ def do_on_panic(hook):
     return cid
 
 
-def dont_do_on_panic(id):
+def dont_do_on_panic(id: int) -> None:
     del on_panic_hooks[id]
 
 
 class TimeOffsetFormatter(logging.Formatter):
-    def __init__(self, fmt=None, datefmt=None):
+    def __init__(self, fmt: str | None = None, datefmt: str | None = None):
         self.startup_time = time.time()
         logging.Formatter.__init__(self, fmt, datefmt)
 
-    def formatTime(self, record, datefmt):
+    def formatTime(self, record: logging.LogRecord, datefmt: str | None = None) -> str:
         t = time.time() - self.startup_time
         t_min = int(t / 60)
         t_sec = t % 60.0
@@ -450,36 +458,36 @@ class TimeOffsetFormatter(logging.Formatter):
 
 
 DUMP = logging.DEBUG - 1
-HANDLERS = []
+HANDLERS: list[logging.Handler] = []
 
 
-def setup_logging(log_level, log_file_name):
+def setup_logging(log_level: int, log_file_name: str) -> None:
     logging.addLevelName(DUMP, "DUMP")
 
     logger = logging.getLogger()
     logger.setLevel(log_level)
     formatter = TimeOffsetFormatter("%(asctime)s %(levelname)s: %(message)s")
 
-    handler = logging.StreamHandler(sys.stdout)
-    handler.setFormatter(formatter)
-    logger.addHandler(handler)
-    HANDLERS.append(handler)
+    shandler = logging.StreamHandler(sys.stdout)
+    shandler.setFormatter(formatter)
+    logger.addHandler(shandler)
+    HANDLERS.append(shandler)
 
     if log_file_name:
-        handler = logging.FileHandler(log_file_name)
-        handler.setFormatter(formatter)
-        logger.addHandler(handler)
-        HANDLERS.append(handler)
+        fhandler = logging.FileHandler(log_file_name)
+        fhandler.setFormatter(formatter)
+        logger.addHandler(fhandler)
+        HANDLERS.append(fhandler)
 
 
-def dump(msg):
+def dump(msg: str) -> None:
     logger = logging.getLogger()
     logger.log(DUMP, msg)
     for handler in HANDLERS:
         handler.flush()
 
 
-def panic(exit=1):
+def panic(exit: int = 1) -> typing.NoReturn:
     for i in reversed(range(counter)):
         if i in on_panic_hooks:
             on_panic_hooks[i]()
@@ -487,27 +495,27 @@ def panic(exit=1):
     sys.exit(exit)
 
 
-def indent_string(str):
+def indent_string(str: str) -> str:
     """Indent all lines in a string with two spaces and return result."""
     return "\n".join(["  " + line for line in str.split("\n")])
 
 
-def command2string(command):
+def command2string(command: list[str]) -> str:
     """Quote s.t. copy+paste from the logfile gives a runnable command in the shell."""
     return " ".join([shlex.quote(arg) for arg in command])
 
 
-def unqualify(packages):
+def unqualify(packages: typing.Iterable[str]) -> list[str]:
     if packages:
         return [p.split("=", 1)[0].strip() for p in packages]
-    return packages
+    return []
 
 
 class Alarm(Exception):
     pass
 
 
-def alarm_handler(signum, frame):
+def alarm_handler(signum, frame) -> typing.NoReturn:
     raise Alarm
 
 
@@ -530,17 +538,18 @@ FILTERED_ENVVARS = {
 }
 
 
-def get_clean_environment() -> Dict[str, str]:
+def get_clean_environment() -> dict[str, str]:
     """Get a cleaned up environment"""
+    assert settings.testobjects is not None
     env = {k: v for k, v in os.environ.items() if k not in FILTERED_ENVVARS}
     env["PIUPARTS_OBJECTS"] = " ".join(str(vobject) for vobject in settings.testobjects)
     return env
 
 
-def run(command, ignore_errors=False, timeout=0):
+def run(command: list[str], ignore_errors: bool = False, timeout: int = 0) -> tuple[int, str]:
     """Run an external command and die with error message if it fails."""
 
-    def kill_subprocess(p, reason):
+    def kill_subprocess(p: subprocess.Popen[str], reason: str) -> None:
         logging.error("Terminating command due to %s" % reason)
         p.terminate()
         for i in range(10):
@@ -569,6 +578,7 @@ def run(command, ignore_errors=False, timeout=0):
                 errors="backslashreplace",
             )
         )
+        assert p.stdout is not None
         output = ""
         excessive_output = False
         if timeout > 0:
@@ -610,14 +620,14 @@ def run(command, ignore_errors=False, timeout=0):
         return p.returncode, output
 
 
-def create_temp_file():
+def create_temp_file() -> tuple[int, str]:
     """Create a temporary file and return its full path."""
-    (fd, path) = tempfile.mkstemp(dir=settings.tmpdir)
+    fd, path = tempfile.mkstemp(dir=settings.tmpdir)
     logging.debug("Created temporary file %s" % path)
     return (fd, path)
 
 
-def create_file(filename, contents):
+def create_file(filename: str, contents: str) -> None:
     """Create a new file with the desired name and contents."""
     try:
         with open(filename, "w") as f:
@@ -627,12 +637,12 @@ def create_file(filename, contents):
         panic()
 
 
-def readlines_file(filename):
+def readlines_file(filename: str) -> list[str]:
     with open(filename, "r") as f:
         return f.readlines()
 
 
-def remove_files(filenames):
+def remove_files(filenames: list[str]) -> None:
     """Remove some files."""
     for filename in filenames:
         logging.debug("Removing %s" % filename)
@@ -643,7 +653,7 @@ def remove_files(filenames):
             panic()
 
 
-def make_metapackage(name, depends, conflicts, arch="all"):
+def make_metapackage(name: str, depends: str, conflicts: str, arch: str = "all") -> str:
     """Return the path to a .deb created just for satisfying dependencies
 
     Caller is responsible for removing the temporary directory containing the
@@ -682,10 +692,10 @@ def make_metapackage(name, depends, conflicts, arch="all"):
     return os.path.join(tmpdir, name + ".deb")
 
 
-def split_path(pathname):
+def split_path(pathname: str) -> list[str]:
     parts = []
     while pathname:
-        (head, tail) = os.path.split(pathname)
+        head, tail = os.path.split(pathname)
         # print("split '%s' => '%s' + '%s'" % (pathname, head, tail))
         if tail:
             parts.append(tail)
@@ -698,7 +708,15 @@ def split_path(pathname):
     return parts
 
 
-def canonicalize_path(root, pathname, report_links=False):
+ at typing.overload
+def canonicalize_path(root: str, pathname: str, report_links: typing.Literal[False] = False) -> str: ...
+
+
+ at typing.overload
+def canonicalize_path(root: str, pathname: str, report_links: typing.Literal[True]) -> list[tuple[str, str]]: ...
+
+
+def canonicalize_path(root: str, pathname: str, report_links: bool = False) -> str | list[tuple[str, str]]:
     """Canonicalize a path name, simulating chroot at 'root'.
 
     When resolving the symlink, pretend (similar to chroot) that
@@ -713,7 +731,7 @@ def canonicalize_path(root, pathname, report_links=False):
     """
     # print("\nCANONICALIZE %s %s" % (root, pathname))
     links = []
-    seen = []
+    seen: list[str] = []
     parts = split_path(pathname)
     # print("PARTS ", list(reversed(parts)))
     path = "/"
@@ -751,7 +769,7 @@ def canonicalize_path(root, pathname, report_links=False):
     return path
 
 
-def is_broken_symlink(root, dirpath, filename):
+def is_broken_symlink(root: str, dirpath: str, filename: str) -> bool:
     """Is symlink dirpath+filename broken?"""
 
     if dirpath[: len(root)] == root:
@@ -768,25 +786,33 @@ def is_broken_symlink(root, dirpath, filename):
 FileInfo = namedtuple("FileInfo", ["st", "target", "user", "group"])
 
 
+class StateMetaData(typing.TypedDict):
+    initial_selections: dict[str, tuple[str, str | None]] | None
+    avail_md5: list[str]
+    tree: dict[str, FileInfo]
+    selections: dict[str, tuple[str, str | None]]
+    diversions: list[str] | None
+
+
 class Chroot:
     """A chroot for testing things in."""
 
-    def __init__(self):
-        self.name = None
+    def __init__(self) -> None:
+        self.name: str | None = None
         self.bootstrapped = False
-        self.mounts = []
-        self.initial_selections = None
-        self.avail_md5_history = []
-        self.systemd_tmpfiles: Dict[str, Tuple[str, str]] = {}
+        self.mounts: list[str] = []
+        self.initial_selections: dict[str, tuple[str, str | None]] | None = None
+        self.avail_md5_history: list[str] = []
+        self.systemd_tmpfiles: dict[str, tuple[str, str]] = {}
 
-    def create_temp_dir(self):
+    def create_temp_dir(self) -> None:
         """Create a temporary directory for the chroot."""
         self.name = tempfile.mkdtemp(dir=settings.tmpdir)
         create_file(os.path.join(self.name, ".piuparts.tmpdir"), "chroot")
         os.chmod(self.name, 0o755)
         logging.debug("Created temporary directory %s" % self.name)
 
-    def create(self, temp_tgz=None):
+    def create(self, temp_tgz: str | None = None) -> None:
         """Create a chroot according to user's wishes."""
         self.panic_handler_id = do_on_panic(self.remove)
         if not settings.schroot and not settings.docker_image:
@@ -845,7 +871,7 @@ class Chroot:
         if settings.savetgz and not temp_tgz:
             self.pack_into_tgz(settings.savetgz)
 
-    def remove(self):
+    def remove(self) -> None:
         """Remove a chroot and all its contents."""
         if not self.name:
             return
@@ -877,29 +903,30 @@ class Chroot:
                 logging.debug("Keeping directory tree at %s" % self.name)
         dont_do_on_panic(self.panic_handler_id)
 
-    def was_bootstrapped(self):
+    def was_bootstrapped(self) -> bool:
         return self.bootstrapped
 
-    def create_temp_tgz_file(self):
+    def create_temp_tgz_file(self) -> str:
         """Return the path to a file to be used as a temporary tgz file"""
         # Yes, create_temp_file() would work just as well, but putting it in
         # the interface for Chroot allows the VirtServ hack to work.
-        (fd, temp_tgz) = create_temp_file()
+        fd, temp_tgz = create_temp_file()
         os.close(fd)
         return temp_tgz
 
-    def remove_temp_tgz_file(self, temp_tgz):
+    def remove_temp_tgz_file(self, temp_tgz: str) -> None:
         """Remove the file that was used as a temporary tgz file"""
         # Yes, remove_files() would work just as well, but putting it in
         # the interface for Chroot allows the VirtServ hack to work.
         remove_files([temp_tgz])
 
-    def pack_into_tgz(self, result):
+    def pack_into_tgz(self, result: str) -> None:
         """Tar and compress all files in the chroot."""
+        assert self.name is not None
         self.run(["apt-get", "clean"])
         logging.debug("Saving %s to %s." % (self.name, result))
 
-        (fd, tmpfile) = tempfile.mkstemp(dir=os.path.dirname(result))
+        fd, tmpfile = tempfile.mkstemp(dir=os.path.dirname(result))
         os.close(fd)
         panic_handler_id = do_on_panic(lambda: os.remove(tmpfile))
 
@@ -922,15 +949,16 @@ class Chroot:
         os.rename(tmpfile, result)
         dont_do_on_panic(panic_handler_id)
 
-    def unpack_from_tgz(self, tarball):
+    def unpack_from_tgz(self, tarball: str) -> None:
         """Unpack a tarball to a chroot."""
+        assert self.name is not None
         logging.debug("Unpacking %s into %s" % (tarball, self.name))
         prefix = []
         if settings.eatmydata and os.path.isfile("/usr/bin/eatmydata"):
             prefix.append("eatmydata")
         run(prefix + ["tar", "-C", self.name, "--auto-compress", "-xf", tarball])
 
-    def setup_from_schroot(self, schroot):
+    def setup_from_schroot(self, schroot: str) -> None:
         self.schroot_session = schroot.split(":", 1)[-1] + "-" + str(uuid.uuid1()) + "-piuparts"
         run(["schroot", "--begin-session", "--chroot", schroot, "--session-name", self.schroot_session])
         ret_code, output = run(["schroot", "--chroot", "session:" + self.schroot_session, "--location"])
@@ -938,13 +966,13 @@ class Chroot:
         logging.info("New schroot session in '%s'" % self.name)
 
     @staticmethod
-    def check_if_docker_storage_driver_is_supported():
+    def check_if_docker_storage_driver_is_supported() -> None:
         ret_code, output = run(["docker", "info"])
         if "overlay2" not in output:
             logging.error("Only overlay2 storage driver is supported")
             panic()
 
-    def setup_from_docker(self, docker_image):
+    def setup_from_docker(self, docker_image: str) -> None:
         self.check_if_docker_storage_driver_is_supported()
         with tempfile.TemporaryDirectory() as tmpdir:
             cidfile = pathlib.Path(tmpdir) / "cidfile"
@@ -959,8 +987,9 @@ class Chroot:
         self.name = output.strip()
         logging.info("New container created %r at %r", self.docker_container, self.name)
 
-    def setup_from_lvm(self, lvm_volume):
+    def setup_from_lvm(self, lvm_volume: str) -> None:
         """Create a chroot by creating an LVM snapshot."""
+        assert self.name is not None
         self.lvm_base = os.path.dirname(lvm_volume)
         self.lvm_vol_name = os.path.basename(lvm_volume)
         self.lvm_snapshot_name = self.lvm_vol_name + "-" + str(uuid.uuid1())
@@ -971,8 +1000,9 @@ class Chroot:
         logging.info("Mounting LVM snapshot to %s" % self.name)
         run(["mount", self.lvm_snapshot, self.name])
 
-    def setup_from_dir(self, dirname):
+    def setup_from_dir(self, dirname: str) -> None:
         """Create chroot from an existing one."""
+        assert self.name is not None
         # if on same device, make hard link
         cmd = ["cp"]
         if settings.hard_link and os.stat(dirname).st_dev == os.stat(self.name).st_dev:
@@ -986,7 +1016,8 @@ class Chroot:
             dst = os.path.join(self.name, name)
             run(cmd + [src, dst])
 
-    def interactive_shell(self):
+    def interactive_shell(self) -> None:
+        assert self.name is not None
         logging.info("Entering interactive shell in %s" % self.name)
         env = os.environ.copy()
         env["debian_chroot"] = "piuparts:%s" % self.name
@@ -995,7 +1026,8 @@ class Chroot:
         except Exception:
             pass
 
-    def run(self, command, ignore_errors=False):
+    def run(self, command: list[str], ignore_errors: bool = False) -> tuple[int, str]:
+        assert self.name is not None
         prefix = []
         if settings.eatmydata and os.path.isfile(os.path.join(self.name, "usr/bin/eatmydata")):
             prefix.append("eatmydata")
@@ -1035,13 +1067,14 @@ class Chroot:
                 timeout=settings.max_command_runtime,
             )
 
-    def mkdir_p(self, path):
+    def mkdir_p(self, path: str) -> None:
         fullpath = self.relative(path)
         if not os.path.isdir(fullpath):
             os.makedirs(fullpath)
 
-    def create_apt_sources(self, distro):
+    def create_apt_sources(self, distro: str) -> None:
         """Create an /etc/apt/sources.list with a given distro."""
+        assert settings.distro_config is not None
         lines = []
         lines.extend(settings.distro_config.get_deb_lines(distro, settings.debian_mirrors[0][1]))
         for mirror, components in settings.debian_mirrors[1:]:
@@ -1051,7 +1084,7 @@ class Chroot:
         create_file(self.relative("etc/apt/sources.list"), "\n".join(lines) + "\n")
         logging.debug("sources.list:\n" + indent_string("\n".join(lines)))
 
-    def aptupdate_run(self):
+    def aptupdate_run(self) -> int | None:
         """Resynchronize the package index files.
         If executed under --update-retries <num>, retry
         its execution up to <num> times, useful e.g.
@@ -1059,11 +1092,11 @@ class Chroot:
         errors."""
         if not settings.update_retries:
             self.run(["apt-get", "update"])
-            return
+            return None
 
         count = 0
         for count, run in enumerate(range(settings.update_retries), 1):
-            (status, output) = self.run(["apt-get", "update"], ignore_errors=True)
+            status, output = self.run(["apt-get", "update"], ignore_errors=True)
             if status == 0:
                 break
             else:
@@ -1074,7 +1107,7 @@ class Chroot:
 
         return status
 
-    def enable_testdebs_repo(self, update=True):
+    def enable_testdebs_repo(self, update: bool = True) -> None:
         if settings.testdebs_repo:
             if settings.testdebs_repo.startswith("deb"):
                 debline = settings.testdebs_repo
@@ -1087,12 +1120,12 @@ class Chroot:
             if update:
                 self.aptupdate_run()
 
-    def disable_testdebs_repo(self):
+    def disable_testdebs_repo(self) -> None:
         if settings.testdebs_repo:
             logging.debug("disabling testdebs repository")
             remove_files([self.relative("etc/apt/sources.list.d/piuparts-testdebs-repo.list")])
 
-    def create_apt_conf(self):
+    def create_apt_conf(self) -> None:
         """Create /etc/apt/apt.conf.d/piuparts inside the chroot."""
         lines = ['APT::Get::Assume-Yes "yes";\n']
         lines.append('APT::Install-Recommends "%d";\n' % int(settings.install_recommends))
@@ -1124,7 +1157,7 @@ class Chroot:
 
         create_file(self.relative("etc/apt/apt.conf.d/piuparts"), "".join(lines))
 
-    def create_dpkg_conf(self):
+    def create_dpkg_conf(self) -> None:
         """Create /etc/dpkg/dpkg.cfg.d/piuparts inside the chroot."""
         lines = []
         if settings.dpkg_force_unsafe_io:
@@ -1139,7 +1172,7 @@ class Chroot:
             self.mkdir_p("etc/dpkg/dpkg.cfg.d")
             create_file(self.relative("etc/dpkg/dpkg.cfg.d/piuparts"), "".join(lines))
 
-    def create_policy_rc_d(self):
+    def create_policy_rc_d(self) -> None:
         """Create a policy-rc.d that prevents daemons from running."""
         full_name = self.relative("usr/sbin/policy-rc.d")
         policy = "#!/bin/sh\n"
@@ -1156,7 +1189,7 @@ class Chroot:
         os.chmod(full_name, 0o755)
         logging.debug("Created policy-rc.d and chmodded it.")
 
-    def create_resolv_conf(self):
+    def create_resolv_conf(self) -> None:
         """Update resolv.conf based on the current configuration in the host system. Strip comments and whitespace."""
         if settings.docker_image:
             # Docker takes care of this
@@ -1171,8 +1204,10 @@ class Chroot:
         os.chmod(full_name, 0o644)
         logging.debug("Created resolv.conf.")
 
-    def setup_minimal_chroot(self):
+    def setup_minimal_chroot(self) -> None:
         """Set up a minimal Debian system in a chroot."""
+        assert self.name is not None
+        assert settings.distro_config is not None
         logging.debug("Setting up minimal chroot for %s at %s." % (settings.debian_distros[0], self.name))
         prefix = []
         if settings.eatmydata and os.path.isfile("/usr/bin/eatmydata"):
@@ -1199,10 +1234,11 @@ class Chroot:
         )
         self.bootstrapped = True
 
-    def minimize(self):
+    def minimize(self) -> None:
         """Minimize a chroot by removing (almost all) unnecessary packages"""
         if settings.skip_minimize or not settings.minimize:
             return
+        assert settings.debfoster_options is not None
         self.run(["apt-get", "install", "debfoster"])
         debfoster_command = ["debfoster"] + settings.debfoster_options
         if settings.eatmydata:
@@ -1211,8 +1247,10 @@ class Chroot:
         remove_files([self.relative("var/lib/debfoster/keepers")])
         self.run(["dpkg", "--purge", "debfoster"])
 
-    def configure_chroot(self):
+    def configure_chroot(self) -> None:
         """Configure a chroot according to current settings"""
+        assert self.name is not None
+        assert settings.distro_config is not None
         os.environ["PIUPARTS_DISTRIBUTION"] = settings.distro_config.get_distribution(settings.debian_distros[0])
         if not settings.keep_sources_list:
             self.create_apt_sources(settings.debian_distros[0])
@@ -1282,33 +1320,37 @@ class Chroot:
         for bindmount in settings.bindmounts:
             self.mount(bindmount, bindmount, opts=["rbind"])
 
-    def remember_available_md5(self):
+    def remember_available_md5(self) -> None:
         """Keep a history of 'apt-cache dumpavail | md5sum' after initial
         setup and each dist-upgrade step to notice outdated reference
         chroot metadata"""
         errorcode, avail_md5 = self.run(["sh", "-c", "apt-cache dumpavail | md5sum"])
         self.avail_md5_history.append(avail_md5.split()[0])
 
-    def remember_initial_selections(self):
+    def remember_initial_selections(self) -> None:
         """Remember initial selections to easily recognize mismatching chroot metadata"""
         self.initial_selections = self.get_selections()
 
-    def remember_systemd_tmpfiles(self):
+    def remember_systemd_tmpfiles(self) -> None:
         """Remember files potentially affected by systemd-tmpfiles.
 
-        Only files and directories with some flags are ignored by piuparts."""
-        ignored_flags = (
-            "f",  # Create file
-            "w",  # Write file contents
-            "d",  # Create directory
-            "D",  # Create directory (and remove on --remove)
-            "p",  # Create named pipe
-            "L",  # Create symlink
-            "c",  # Create character device
-            "b",  # Create block device
-            "C",  # Copy file or directory
+        Only files and directories with some flags are flagged by piuparts."""
+        raised_flags = (
+            "e",  # Clean up directory
+            "x",  # Ignore path for cleanup (recursive)
+            "X",  # Ignore path for cleanup
+            "r",  # Remove file or directory
+            "R",  # Remove file or directory (recursive)
+            "z",  # Adjust mode
+            "Z",  # Adjust mode (recursive)
+            "t",  # Set xattrs
+            "T",  # Set xattrs (recursive)
+            "h",  # Set attrs
+            "H",  # Set attrs (recursive)
+            "a",  # Set acls
+            "A",  # Set acls (recursive)
         )
-        (retcode, output) = self.run(["systemd-tmpfiles", "--cat-config"], ignore_errors=True)
+        retcode, output = self.run(["systemd-tmpfiles", "--cat-config"], ignore_errors=True)
         if retcode != 0:
             return
 
@@ -1324,11 +1366,12 @@ class Chroot:
                         current_file = line[2:]
                 continue
             flag, filename, *_ = line.split()
-            if flag[0] in ignored_flags:
+            if flag[0] not in raised_flags:
                 self.systemd_tmpfiles[filename] = (current_file, flag)
 
-    def upgrade_to_distros(self, distros, packages, apt_get_upgrade=False):
+    def upgrade_to_distros(self, distros: list[str], packages: list[str], apt_get_upgrade: bool = False) -> None:
         """Upgrade a chroot installation to each successive distro."""
+        assert settings.distro_config is not None
         for distro in distros:
             logging.debug("Upgrading %s to %s" % (self.name, distro))
             os.environ["PIUPARTS_DISTRIBUTION_NEXT"] = settings.distro_config.get_distribution(distro)
@@ -1354,12 +1397,12 @@ class Chroot:
             self.run_scripts("post_distupgrade")
             self.check_for_no_processes()
 
-    def get_known_packages(self, packages):
+    def get_known_packages(self, packages: list[str]) -> list[str]:
         """Does apt-get (or apt-cache) know about a set of packages?"""
         known_packages = []
         new_packages = []
         for name in packages:
-            (status, output) = self.run(["apt-cache", "show", "--no-all-versions", name], ignore_errors=True)
+            status, output = self.run(["apt-cache", "show", "--no-all-versions", name], ignore_errors=True)
             # apt-cache reports status for some virtual packages and packages
             # in status config-files-remaining state without installation
             # candidate -- but only real packages have Filename/MD5sum/SHA*
@@ -1375,7 +1418,7 @@ class Chroot:
                 logging.info("the following packages are not in the archive: " + ", ".join(new_packages))
         return known_packages
 
-    def copy_files(self, source_names, target_name):
+    def copy_files(self, source_names: list[str], target_name: str) -> None:
         """Copy files in 'source_name' to file/dir 'target_name', relative
         to the root of the chroot."""
         target_name = self.relative(target_name)
@@ -1387,10 +1430,10 @@ class Chroot:
                 logging.error("Error copying %s to %s: %s" % (source_name, target_name, detail))
                 panic()
 
-    def list_installed_files(self, pre_info, post_info):
+    def list_installed_files(self, pre_info: dict[str, FileInfo], post_info: dict[str, FileInfo]) -> None:
         """List the new files installed, removed and modified between two dir trees.
         Actually, it is a nice output of the funcion diff_meta_dat."""
-        (new, removed, modified) = diff_meta_data(pre_info, post_info, self.is_ignored)
+        new, removed, modified = diff_meta_data(pre_info, post_info, self.is_ignored)
         file_owners = self.get_files_owned_by_packages()
 
         if new:
@@ -1406,7 +1449,7 @@ class Chroot:
         else:
             logging.debug("The package did not modify any file.\n")
 
-    def is_installed(self, packages):
+    def is_installed(self, packages: list[str]) -> bool:
         if not packages:
             return True
         retcode, output = self.run(["dpkg-query", "-f", "${Package} ${Status}\n", "-W"] + packages, ignore_errors=True)
@@ -1420,24 +1463,29 @@ class Chroot:
                 installed = False
         return installed
 
-    def install_packages(self, package_files, packages, with_scripts=True, reinstall=False):
+    def install_packages(
+        self, package_files: list[str], packages: list[str], with_scripts: bool = True, reinstall: bool = False
+    ) -> None:
         if package_files:
             self.install_package_files(package_files, packages, with_scripts=with_scripts)
         else:
             self.install_packages_by_name(packages, with_scripts=with_scripts, reinstall=reinstall)
 
-    def install_package_files(self, package_files, packages=None, with_scripts=False):
+    def install_package_files(
+        self, package_files: list[str], packages: list[str] | None = None, with_scripts: bool = False
+    ) -> None:
         if packages and settings.testdebs_repo:
             self.install_packages_by_name(packages, with_scripts=with_scripts)
             return
         if package_files:
+            assert settings.distro_config is not None
             # Check whether apt-get can install debs (supported since apt 1.1)
             #
             # If it can, this is preferable to the traditional
             #   `dpkg -i foo.deb && apt-get -yf install`
             # approach since 'apt-get -yf install' can 'resolve' dependency
             # problems by removing the package we are trying to install
-            (status, output) = self.run(["dpkg-query", "-f", "${Version}\n", "-W", "apt"], ignore_errors=True)
+            status, output = self.run(["dpkg-query", "-f", "${Version}\n", "-W", "apt"], ignore_errors=True)
             apt_can_install_debs = apt_pkg.version_compare(output.strip(), "1.1") >= 0
 
             # This must look like a local path so that apt-get can
@@ -1465,7 +1513,7 @@ class Chroot:
             if apt_can_install_debs:
                 self.run(apt_get_install + tmp_files)
             else:
-                (ret, out) = self.run(["dpkg", "-i"] + tmp_files, ignore_errors=True)
+                ret, out = self.run(["dpkg", "-i"] + tmp_files, ignore_errors=True)
                 if ret != 0:
                     if "dependency problems - leaving unconfigured" in out:
                         pass
@@ -1492,8 +1540,9 @@ class Chroot:
 
             remove_files([self.relative(name) for name in tmp_files])
 
-    def install_packages_by_name(self, packages, with_scripts=True, reinstall=False):
+    def install_packages_by_name(self, packages: list[str], with_scripts: bool = True, reinstall: bool = False) -> None:
         if packages:
+            assert settings.distro_config is not None
             if with_scripts:
                 self.run_scripts("pre_install")
 
@@ -1512,7 +1561,14 @@ class Chroot:
             if with_scripts:
                 self.run_scripts("post_install")
 
-    def apt_get_install(self, to_install=[], to_remove=[], to_purge=[], flags=[], reinstall=False):
+    def apt_get_install(
+        self,
+        to_install: list[str] = [],
+        to_remove: list[str] = [],
+        to_purge: list[str] = [],
+        flags: list[str] = [],
+        reinstall: bool = False,
+    ) -> None:
         command = ["apt-get", "-y"] + flags + ["install"]
         if reinstall:
             command.append("--reinstall")
@@ -1521,11 +1577,11 @@ class Chroot:
         command.extend(["%s_" % x for x in unqualify(to_purge)])
         self.run(command)
 
-    def get_selections(self):
+    def get_selections(self) -> dict[str, tuple[str, str | None]]:
         """Get current package selections in a chroot."""
         # "${Status}" emits three columns, e.g. "install ok installed"
         # "${binary:Package}" requires a multi-arch dpkg, so fall back to "${Package}" on older versions
-        (status, output) = self.run(
+        command_status, output = self.run(
             ["dpkg-query", "-W", "-f", "${Status}\\t${binary:Package}\\t${Package}\\t${Version}\\n"]
         )
         vdict = {}
@@ -1540,23 +1596,27 @@ class Chroot:
             vdict[name] = (status, version)
         return vdict
 
-    def get_diversions(self):
+    def get_diversions(self) -> list[str] | None:
         """Get current dpkg-divert --list in a chroot."""
         if not settings.check_broken_diversions:
-            return
-        (status, output) = self.run(["dpkg-divert", "--list"])
+            return None
+        status, output = self.run(["dpkg-divert", "--list"])
         return output.split("\n")
 
-    def get_modified_diversions(self, pre_install_diversions, post_install_diversions=None):
+    def get_modified_diversions(
+        self, pre_install_diversions: list[str], post_install_diversions: list[str] | None = None
+    ) -> tuple[list[str], list[str]]:
         """Check that diversions in chroot are identical (though potentially reordered)."""
         if post_install_diversions is None:
             post_install_diversions = self.get_diversions()
+            assert post_install_diversions is not None
         removed = [ln for ln in pre_install_diversions if ln not in post_install_diversions]
         added = [ln for ln in post_install_diversions if ln not in pre_install_diversions]
         return (removed, added)
 
-    def check_debsums(self):
-        (status, output) = run(["debsums", "--root", self.name, "-ac", "--ignore-obsolete"], ignore_errors=True)
+    def check_debsums(self) -> None:
+        assert self.name is not None
+        status, output = run(["debsums", "--root", self.name, "-ac", "--ignore-obsolete"], ignore_errors=True)
         if status != 0:
             logging.error(
                 "FAIL: debsums reports modifications inside the chroot:\n%s"
@@ -1565,11 +1625,12 @@ class Chroot:
             if not settings.warn_on_debsums_errors:
                 panic()
 
-    def check_adequate(self, packages):
+    def check_adequate(self, packages: list[str]) -> None:
         """Run adequate and categorize output according to our needs."""
+        assert self.name is not None
         packages = unqualify([p for p in packages if not p.endswith("=None")])
         if packages and settings.adequate and os.path.isfile("/usr/bin/adequate"):
-            (status, output) = run(["dpkg-query", "-f", "${Version}\n", "-W", "adequate"], ignore_errors=True)
+            status, output = run(["dpkg-query", "-f", "${Version}\n", "-W", "adequate"], ignore_errors=True)
             logging.info("Running adequate version %s now." % output.strip())
             adequate_tags = [
                 "bin-or-sbin-binary-requires-usr-lib-library",
@@ -1592,8 +1653,8 @@ class Chroot:
                 "obsolete-conffile",
                 "broken-symlink",
             ]
-            ignored_tags = []
-            (status, output) = run(["adequate", "--root", self.name] + packages, ignore_errors=True)
+            ignored_tags: list[str] = []
+            status, output = run(["adequate", "--root", self.name] + packages, ignore_errors=True)
             for tag in ignored_tags:
                 # ignore some tags
                 _regex = "^[^:]+: " + tag + " .*\n"
@@ -1631,7 +1692,8 @@ class Chroot:
                 if not settings.warn_if_inadequate:
                     panic()
 
-    def list_paths_with_symlinks(self):
+    def list_paths_with_symlinks(self) -> None:
+        assert self.name is not None
         file_owners = self.get_files_owned_by_packages()
         bad = []
         overwrites = False
@@ -1676,7 +1738,9 @@ class Chroot:
             else:
                 logging.info(msg)
 
-    def remove_packages(self, packages, allow_remove_essential=True, ignore_errors=False):
+    def remove_packages(
+        self, packages: list[str], allow_remove_essential: bool = True, ignore_errors: bool = False
+    ) -> None:
         """Remove packages in a chroot."""
         base_command = ["apt-get", "remove"]
         if allow_remove_essential:
@@ -1687,12 +1751,12 @@ class Chroot:
                 ignore_errors=ignore_errors,
             )
 
-    def purge_packages(self, packages, ignore_errors=False):
+    def purge_packages(self, packages: typing.Iterable[str], ignore_errors: bool = False) -> None:
         """Purge packages in a chroot."""
         if packages:
             self.run(["dpkg", "--purge"] + unqualify(packages), ignore_errors=ignore_errors)
 
-    def restore_selections(self, reference_chroot_state, packages_qualified):
+    def restore_selections(self, reference_chroot_state: StateMetaData, packages_qualified: list[str]) -> None:
         """Restore package selections in a chroot to the state in
         'reference_chroot_state'."""
 
@@ -1772,7 +1836,7 @@ class Chroot:
         self.run(["dpkg", "--purge", "--pending"])
         self.run(["dpkg", "--remove", "--pending"])
 
-    def get_tree_meta_data(self):
+    def get_tree_meta_data(self) -> dict[str, FileInfo]:
         """Return the filesystem meta data for all objects in the chroot."""
         self.run(["apt-get", "clean"])
         logging.debug("Recording chroot state")
@@ -1780,12 +1844,12 @@ class Chroot:
         uidmap = {}
         with open(self.relative("etc/passwd"), "r") as passwd:
             for line in passwd:
-                (usr, x, uid) = line.split(":")[0:3]
+                usr, x, uid = line.split(":")[0:3]
                 uidmap[int(uid)] = usr
         gidmap = {}
-        with open(self.relative("etc/group"), "r") as group:
-            for line in group:
-                (grp, x, gid) = line.split(":")[0:3]
+        with open(self.relative("etc/group"), "r") as groupfile:
+            for line in groupfile:
+                grp, x, gid = line.split(":")[0:3]
                 gidmap[int(gid)] = grp
         vdict = {}
         proc = os.path.join(root, "proc")
@@ -1815,24 +1879,25 @@ class Chroot:
                 vdict[name[len(root) :]] = FileInfo(st, target, user, group)
         return vdict
 
-    def get_state_meta_data(self):
-        chroot_state = {}
-        chroot_state["initial_selections"] = self.initial_selections
-        chroot_state["avail_md5"] = self.avail_md5_history
-        chroot_state["tree"] = self.get_tree_meta_data()
-        chroot_state["selections"] = self.get_selections()
-        chroot_state["diversions"] = self.get_diversions()
-        return chroot_state
+    def get_state_meta_data(self) -> StateMetaData:
+        return {
+            "initial_selections": self.initial_selections,
+            "avail_md5": self.avail_md5_history,
+            "tree": self.get_tree_meta_data(),
+            "selections": self.get_selections(),
+            "diversions": self.get_diversions(),
+        }
 
-    def relative(self, pathname):
+    def relative(self, pathname: str) -> str:
+        assert self.name is not None
         if pathname.startswith("/"):
             return os.path.join(self.name, pathname[1:])
         return os.path.join(self.name, pathname)
 
-    def get_files_owned_by_packages(self):
+    def get_files_owned_by_packages(self) -> dict[str, list[str]]:
         """Return dict[filename] = [packagenamelist]."""
         vdir = self.relative("var/lib/dpkg/info")
-        vdict = {}
+        vdict: dict[str, list[str]] = {}
         for basename in os.listdir(vdir):
             if basename.endswith(".list"):
                 pkg = basename[: -len(".list")]
@@ -1844,13 +1909,14 @@ class Chroot:
                         vdict[pathname] = [pkg]
         return vdict
 
-    def check_for_no_processes(self, fail=None):
+    def check_for_no_processes(self, fail: bool | None = None) -> None:
         """Check there are no processes running inside the chroot."""
         if settings.docker_image:
-            (status, output) = run(["docker", "top", self.docker_container])
+            status, output = run(["docker", "top", self.docker_container])
             count = len(output.strip().split("\n")) - 2  # header + bash launched on container creation
         else:
-            (status, output) = run(["lsof", "-w", "+D", self.name], ignore_errors=True)
+            assert self.name is not None
+            status, output = run(["lsof", "-w", "+D", self.name], ignore_errors=True)
             count = len(output.split("\n")) - 1
         if count > 0:
             if fail is None:
@@ -1862,11 +1928,12 @@ class Chroot:
                 self.terminate_running_processes()
                 panic()
 
-    def terminate_running_processes(self):
+    def terminate_running_processes(self) -> None:
         """Terminate all processes running in the chroot."""
         if settings.docker_image:
             # Docker takes care of this
             return
+        assert self.name is not None
         seen = []
         while True:
             p = subprocess.Popen(
@@ -1895,14 +1962,17 @@ class Chroot:
     # If /selinux is present, assume that this is the only supported
     # location by libselinux. Otherwise use the new location.
     # /selinux was shipped by the libselinux package until wheezy.
-    def selinuxfs_path(self):
+    def selinuxfs_path(self) -> str:
         if os.path.isdir(self.relative("/selinux")):
             return "/selinux"
         else:
             return "/sys/fs/selinux"
 
-    def mount(self, source, path, fstype=None, opts=None, no_mkdir=False):
+    def mount(
+        self, source: str, path: str, fstype: str | None = None, opts: list[str] | None = None, no_mkdir: bool = False
+    ) -> None:
         """Mount something into the chroot and remember it for unmount_all()."""
+        assert self.name is not None
         if opts is None:
             opts = []
         path = canonicalize_path(self.name, path)
@@ -1922,7 +1992,7 @@ class Chroot:
         run(command)
         self.mounts.append(fullpath)
 
-    def unmount_all(self):
+    def unmount_all(self) -> None:
         """Unmount everything we mount()ed into the chroot."""
 
         # Workaround to unmount /proc/sys/fs/binfmt_misc which is mounted by
@@ -1935,7 +2005,7 @@ class Chroot:
         for mountpoint in reversed(self.mounts):
             run(["umount", mountpoint], ignore_errors=True)
 
-    def mount_proc(self):
+    def mount_proc(self) -> None:
         """Mount /proc etc. inside chroot."""
         self.mount("proc", "/proc", fstype="proc")
         etcmtab = self.relative("etc/mtab")
@@ -1969,7 +2039,7 @@ class Chroot:
         if selinux_enabled():
             self.mount("/sys/fs/selinux", self.selinuxfs_path(), opts=["bind", "ro"])
 
-    def is_ignored(self, pathname, info="PATH", quiet=False):
+    def is_ignored(self, pathname: str, info: str = "PATH", quiet: bool = False) -> bool:
         """Is a file (or dir or whatever) to be ignored?"""
         if pathname in settings.ignored_files:
             return True
@@ -1995,7 +2065,13 @@ class Chroot:
                 return True
         return False
 
-    def check_files_moved_usr(self, packages=[], files_before={}, files_after={}, warn_only=None):
+    def check_files_moved_usr(
+        self,
+        packages: list[str] = [],
+        files_before: dict[str, list[str]] = {},
+        files_after: dict[str, list[str]] = {},
+        warn_only: bool = False,
+    ) -> None:
         """Check that no files were moved from /{bin|sbin|lib*} and /usr/{bin|sbin|lib*}"""
 
         if settings.warn_on_usr_move == "disabled":
@@ -2041,8 +2117,9 @@ class Chroot:
         else:
             logging.debug("No file moved between /{bin|sbin|lib*} and /usr/{bin|sbin|lib*}.")
 
-    def check_for_broken_symlinks(self, warn_only=None, file_owners={}):
+    def check_for_broken_symlinks(self, warn_only: bool = False, file_owners: dict[str, list[str]] = {}) -> None:
         """Check that all symlinks in chroot are non-broken."""
+        assert self.name is not None
         if not settings.check_broken_symlinks:
             return
         broken = []
@@ -2073,7 +2150,7 @@ class Chroot:
         else:
             logging.debug("No broken symlinks as far as we can find.")
 
-    def check_if_cronfiles(self, packages):
+    def check_if_cronfiles(self, packages: list[str]) -> list[str]:
         """Check if the packages have cron files under /etc/cron.d and in case positive,
         it returns the list of files."""
 
@@ -2099,7 +2176,7 @@ class Chroot:
 
         return vlist
 
-    def check_output_cronfiles(self, list):
+    def check_output_cronfiles(self, list: list[str]) -> None:
         """Check if a given list of cronfiles has any output. Executes
         cron file as cron would do (except for SHELL)"""
         failed = False
@@ -2107,7 +2184,7 @@ class Chroot:
             if not os.path.exists(self.relative(vfile.strip("/"))):
                 continue
 
-            (retval, output) = self.run([vfile])
+            retval, output = self.run([vfile])
             if output:
                 failed = True
                 logging.error("FAIL: Cron file %s has output with package removed" % vfile)
@@ -2115,7 +2192,7 @@ class Chroot:
         if failed:
             panic()
 
-    def check_if_logrotatefiles(self, packages):
+    def check_if_logrotatefiles(self, packages: list[str]) -> list[str]:
         """Check if the packages have logrotate files under /etc/logrotate.d and in case positive,
         it returns the list of files."""
 
@@ -2137,7 +2214,7 @@ class Chroot:
 
         return vlist
 
-    def install_logrotate(self):
+    def install_logrotate(self) -> typing.Iterable[str]:
         """Install logrotate for check_output_logrotatefiles, and return the
         list of packages that were installed"""
         old_selections = self.get_selections()
@@ -2148,7 +2225,7 @@ class Chroot:
         diff = diff_selections(self, old_selections)
         return diff.keys()
 
-    def check_output_logrotatefiles(self, list):
+    def check_output_logrotatefiles(self, list: list[str]) -> None:
         """Check if a given list of logrotatefiles has any output. Executes
         logrotate file as logrotate would do from cron (except for SHELL)"""
         failed = False
@@ -2156,7 +2233,7 @@ class Chroot:
             if not os.path.exists(self.relative(vfile.strip("/"))):
                 continue
 
-            (retval, output) = self.run(["/usr/sbin/logrotate", vfile])
+            retval, output = self.run(["/usr/sbin/logrotate", vfile])
             if output or retval != 0:
                 failed = True
                 logging.error("FAIL: Logrotate file %s exits with error or has output with package removed" % vfile)
@@ -2164,7 +2241,7 @@ class Chroot:
         if failed:
             panic()
 
-    def run_scripts(self, step, ignore_errors=False):
+    def run_scripts(self, step: str, ignore_errors: bool = False) -> int:
         """Run custom scripts to given step post-install|remove|purge"""
 
         errorcodes = 0
@@ -2184,16 +2261,17 @@ class Chroot:
         return errorcodes
 
 
-def selinux_enabled(enabled_test="/usr/sbin/selinuxenabled"):
+def selinux_enabled(enabled_test: str = "/usr/sbin/selinuxenabled") -> bool | None:
     if os.access(enabled_test, os.X_OK):
         retval, output = run([enabled_test], ignore_errors=True)
         if retval == 0:
             return True
         else:
             return False
+    return None
 
 
-def objects_are_different(obj1, obj2):
+def objects_are_different(obj1: FileInfo, obj2: FileInfo) -> bool:
     """Are filesystem objects different based on their meta data?"""
     if (
         obj1.st.st_mode != obj2.st.st_mode
@@ -2207,7 +2285,7 @@ def objects_are_different(obj1, obj2):
     return False
 
 
-def format_object_attributes(obj):
+def format_object_attributes(obj: FileInfo) -> str:
     st = obj.st
     ft = ""
     if stat.S_ISDIR(st.st_mode):
@@ -2228,7 +2306,16 @@ def format_object_attributes(obj):
     return res
 
 
-def diff_meta_data(tree1, tree2, is_ignored, quiet=False):
+class IsIgnored(typing.Protocol):
+    def __call__(self, pathname: str, info: str, quiet: bool) -> bool: ...
+
+
+def diff_meta_data(
+    tree1: dict[str, FileInfo],
+    tree2: dict[str, FileInfo],
+    is_ignored: IsIgnored,
+    quiet: bool = False,
+) -> tuple[list[tuple[str, FileInfo]], list[tuple[str, FileInfo]], list[tuple[str, FileInfo]]]:
     """Compare two dir trees and return list of new files (only in 'tree2'),
     removed files (only in 'tree1'), and modified files."""
 
@@ -2277,7 +2364,7 @@ def diff_meta_data(tree1, tree2, is_ignored, quiet=False):
     return new, removed, modified
 
 
-def file_list(meta_infos, file_owners):
+def file_list(meta_infos: list[tuple[str, FileInfo]], file_owners: dict[str, list[str]]) -> str:
     """Return list of indented filenames."""
     meta_infos = sorted(meta_infos[:])
     vlist = []
@@ -2297,7 +2384,7 @@ def file_list(meta_infos, file_owners):
     return "".join(vlist)
 
 
-def offending_packages(meta_infos, file_owners):
+def offending_packages(meta_infos: list[tuple[str, FileInfo]], file_owners: dict[str, list[str]]) -> set[str]:
     """Return a Set of offending packages."""
     pkgset = set()
     for name, data in meta_infos:
@@ -2307,7 +2394,9 @@ def offending_packages(meta_infos, file_owners):
     return pkgset
 
 
-def prune_files_list(files, depsfiles):
+def prune_files_list(
+    files: list[tuple[str, FileInfo]], depsfiles: list[tuple[str, FileInfo]]
+) -> list[tuple[str, FileInfo]]:
     """Remove elements from 'files' that are in 'depsfiles', and return the
     list of removed elements.
     """
@@ -2320,12 +2409,12 @@ def prune_files_list(files, depsfiles):
     return warn
 
 
-def diff_selections(chroot, selections):
+def diff_selections(chroot: Chroot, selections: dict[str, tuple[str, str | None]]) -> dict[str, tuple[str, str | None]]:
     """Compare original and current package selection.
     Return dict where dict[package_name] = original_status, that is,
     the value in the dict is the state that the package needs to be
     set to to restore original selections."""
-    changes = {}
+    changes: dict[str, tuple[str, str | None]] = {}
     current = chroot.get_selections()
     for name, (value, version) in current.items():
         if name not in selections:
@@ -2338,11 +2427,11 @@ def diff_selections(chroot, selections):
     return changes
 
 
-def get_package_names_from_package_files(package_files):
+def get_package_names_from_package_files(package_files: list[str]) -> list[str]:
     """Return list of package names given list of package file names."""
     vlist = []
     for filename in package_files:
-        (status, output) = run(["dpkg", "--info", filename])
+        status, output = run(["dpkg", "--info", filename])
         p = None
         v = None
         for line in [line.lstrip() for line in output.split("\n")]:
@@ -2362,7 +2451,7 @@ def get_package_names_from_package_files(package_files):
 # from the 'Files' stanza.
 
 
-def process_changes(changes):
+def process_changes(changes: str) -> list[str] | None:
     # Determine the path to the changes file, then check if it's readable.
     dir_path = ""
     changes_path = ""
@@ -2373,15 +2462,12 @@ def process_changes(changes):
         changes_path = os.path.abspath(changes)
     if not os.access(changes_path, os.R_OK):
         logging.warning(changes_path + " is not readable. Skipping.")
-        return
+        return None
 
     # Determine the packages in the changes file through the 'Files' stanza.
     field = "Files"
     pattern = re.compile(
-        r"^"
-        + field
-        + r":"
-        + r"""  # The field we want the contents from
+        r"^" + field + r":" + r"""  # The field we want the contents from
         (.*?)                   # The contents of the field
         \n([^ ]|$)              # Start of a new field or EOF
         """,
@@ -2404,7 +2490,12 @@ def process_changes(changes):
     return package_list
 
 
-def check_results(chroot, chroot_state, file_owners, deps_info=None):
+def check_results(
+    chroot: Chroot,
+    chroot_state: StateMetaData,
+    file_owners: dict[str, list[str]],
+    deps_info: dict[str, FileInfo] | None = None,
+) -> bool:
     """Check that current chroot state matches 'chroot_state'.
 
     If settings.warn_on_others is True and deps_info is not None, then only
@@ -2418,29 +2509,32 @@ def check_results(chroot, chroot_state, file_owners, deps_info=None):
     reference_info = chroot_state["tree"]
     ok = True
     if settings.check_broken_diversions:
-        (removed, added) = chroot.get_modified_diversions(chroot_state["diversions"])
-        if added:
+        assert chroot_state["diversions"] is not None
+        removed_diversions, added_diversions = chroot.get_modified_diversions(chroot_state["diversions"])
+        if added_diversions:
             logging.error(
-                "FAIL: Installed diversions (dpkg-divert) not removed by purge:\n%s" % indent_string("\n".join(added))
+                "FAIL: Installed diversions (dpkg-divert) not removed by purge:\n%s"
+                % indent_string("\n".join(added_diversions))
             )
             ok = False
-        if removed:
+        if removed_diversions:
             logging.error(
-                "FAIL: Existing diversions (dpkg-divert) removed/modified:\n%s" % indent_string("\n".join(removed))
+                "FAIL: Existing diversions (dpkg-divert) removed/modified:\n%s"
+                % indent_string("\n".join(removed_diversions))
             )
             ok = False
 
     current_info = chroot.get_tree_meta_data()
     if settings.warn_on_others and deps_info is not None:
-        (new, removed, modified) = diff_meta_data(reference_info, current_info, chroot.is_ignored)
-        (depsnew, depsremoved, depsmodified) = diff_meta_data(reference_info, deps_info, chroot.is_ignored, quiet=True)
+        new, removed, modified = diff_meta_data(reference_info, current_info, chroot.is_ignored)
+        depsnew, depsremoved, depsmodified = diff_meta_data(reference_info, deps_info, chroot.is_ignored, quiet=True)
 
         warnnew = prune_files_list(new, depsnew)
         warnremoved = prune_files_list(removed, depsremoved)
         warnmodified = prune_files_list(modified, depsmodified)
 
     else:
-        (new, removed, modified) = diff_meta_data(reference_info, current_info, chroot.is_ignored)
+        new, removed, modified = diff_meta_data(reference_info, current_info, chroot.is_ignored)
 
     if new:
         if settings.warn_on_leftovers_after_purge:
@@ -2484,7 +2578,13 @@ def check_results(chroot, chroot_state, file_owners, deps_info=None):
     return ok
 
 
-def install_purge_test(chroot, chroot_state, package_files, packages, extra_packages):
+def install_purge_test(
+    chroot: Chroot,
+    chroot_state: StateMetaData,
+    package_files: list[str],
+    packages: list[str],
+    extra_packages: list[str],
+) -> bool:
     """Do an install-purge test. Return True if successful, False if not.
     Assume 'root' is a directory already populated with a working
     chroot, with packages in states given by 'selections'."""
@@ -2508,6 +2608,7 @@ def install_purge_test(chroot, chroot_state, package_files, packages, extra_pack
 
     if settings.warn_on_others or settings.install_purge_install:
         # Create a metapackage with dependencies from the given packages
+        control_infos: typing.Iterable[deb822.Deb822]
         if package_files:
             control_infos = []
             # We were given package files, so let's get the Depends and
@@ -2561,7 +2662,7 @@ def install_purge_test(chroot, chroot_state, package_files, packages, extra_pack
             "piuparts-depends-dummy", depends=all_depends, conflicts=all_conflicts, arch=arch
         )
 
-        def cleanup_metapackage():
+        def cleanup_metapackage() -> None:
             shutil.rmtree(os.path.dirname(metapackage))
 
         panic_handler_id = do_on_panic(cleanup_metapackage)
@@ -2637,7 +2738,9 @@ def install_purge_test(chroot, chroot_state, package_files, packages, extra_pack
     return check_results(chroot, chroot_state, file_owners, deps_info=deps_info)
 
 
-def install_upgrade_test(chroot, chroot_state, package_files, packages, old_packages):
+def install_upgrade_test(
+    chroot: Chroot, chroot_state: StateMetaData, package_files: list[str], packages: list[str], old_packages: list[str]
+) -> bool:
     """Install old_packages via apt-get, then upgrade from package files.
     Return True if successful, False if not."""
 
@@ -2685,21 +2788,21 @@ def install_upgrade_test(chroot, chroot_state, package_files, packages, old_pack
     return check_results(chroot, chroot_state, file_owners_after)
 
 
-def save_meta_data(filename, chroot_state):
+def save_meta_data(filename: str, chroot_state: StateMetaData) -> None:
     """Save directory tree meta data into a file for fast access later."""
     logging.debug("Saving chroot meta data to %s" % filename)
     with open(filename, "wb") as f:
         pickle.dump(chroot_state, f)
 
 
-def load_meta_data(filename):
+def load_meta_data(filename: str) -> StateMetaData:
     """Load meta data saved by 'save_meta_data'."""
     logging.debug("Loading chroot meta data from %s" % filename)
     with open(filename, "rb") as f:
         return pickle.load(f)
 
 
-def install_and_upgrade_between_distros(package_files, packages_qualified):
+def install_and_upgrade_between_distros(package_files: list[str], packages_qualified: list[str]) -> bool:
     """Install package and upgrade it between distributions, then remove.
     Return True if successful, False if not."""
 
@@ -2863,14 +2966,14 @@ def install_and_upgrade_between_distros(package_files, packages_qualified):
     return result
 
 
-def parse_mirror_spec(str, defaultcomponents=[]):
+def parse_mirror_spec(str: str, defaultcomponents: list[str] = []) -> tuple[str, list[str]]:
     """Parse a mirror specification from the --mirror option argument.
     Return (mirror, componentslist)."""
     parts = str.split()
     return parts[0], parts[1:] or defaultcomponents[:]
 
 
-def find_default_debian_mirrors():
+def find_default_debian_mirrors() -> list[tuple[str, list[str]]]:
     """Find the default Debian mirrors."""
     mirrors = []
     try:
@@ -2881,11 +2984,19 @@ def find_default_debian_mirrors():
                 mirrors.append((parts[1], parts[3:]))
                 break  # Only use the first one, at least for now.
     except IOError:
-        return None
+        return []
     return mirrors
 
 
-def forget_ignores(option, opt, value, parser, *args, **kwargs):
+def forget_ignores(
+    option: optparse.Option,
+    opt: str,
+    value: typing.Any,
+    parser: optparse.OptionParser,
+    *args: typing.Any,
+    **kwargs: typing.Any,
+) -> None:
+    assert parser.values is not None
     settings.bindmounts = []
     parser.values.ignore = []
     parser.values.ignore_regex = []
@@ -2893,11 +3004,19 @@ def forget_ignores(option, opt, value, parser, *args, **kwargs):
     settings.ignored_patterns = []
 
 
-def set_basetgz_to_pbuilder(option, opt, value, parser, *args, **kwargs):
+def set_basetgz_to_pbuilder(
+    option: optparse.Option,
+    opt: str,
+    value: typing.Any,
+    parser: optparse.OptionParser,
+    *args: typing.Any,
+    **kwargs: typing.Any,
+) -> None:
+    assert parser.values is not None
     parser.values.basetgz = "/var/cache/pbuilder/base.tgz"
 
 
-def parse_command_line():
+def parse_command_line() -> list[str]:
     """Parse the command line, change global settings, return non-options."""
 
     parser = optparse.OptionParser(usage="%prog [options] package ...", version="piuparts %s" % VERSION)
@@ -3069,7 +3188,10 @@ def parse_command_line():
         "--install-suggests", action="store_true", default=False, help="Enable the installation of Suggests."
     )
 
-    def keep_env_parser(option, opt_str, value, parser):
+    def keep_env_parser(
+        option: optparse.Option, opt_str: str, value: typing.Any, parser: optparse.OptionParser
+    ) -> None:
+        assert option.dest is not None
         setattr(parser.values, option.dest, True)
         if "--keep-tmpdir" == opt_str:
             print("WARNING `--keep-tmpdir` is deprecated, use `--keep-env` " "instead")
@@ -3315,7 +3437,10 @@ def parse_command_line():
     )
 
     parser.add_option(
-        "--skip-cronfiles-test", action="store_true", default=False, help="Skip testing the output from the cron files."
+        "--skip-cronfiles-test",
+        action="store_true",
+        default=False,
+        help="Skip testing the output from the cron files.",
     )
 
     parser.add_option(
@@ -3437,7 +3562,7 @@ def parse_command_line():
         help="Set maximum permitted command output to SIZE (in MB).",
     )
 
-    (opts, args) = parser.parse_args()
+    opts, args = parser.parse_args()
 
     # expand combined options
     if opts.distupgrade_to_testdebs_from:
@@ -3511,11 +3636,10 @@ def parse_command_line():
     settings.install_purge_install = opts.install_purge_install
     settings.install_remove_install = opts.install_remove_install
     settings.list_installed_files = opts.list_installed_files
-    [
+    for csv in opts.fake_essential_packages:
         settings.fake_essential_packages.extend([i.strip() for i in csv.split(",")])
-        for csv in opts.fake_essential_packages
-    ]
-    [settings.extra_old_packages.extend([i.strip() for i in csv.split(",")]) for csv in opts.extra_old_packages]
+    for csv in opts.extra_old_packages:
+        settings.extra_old_packages.extend([i.strip() for i in csv.split(",")])
     settings.skip_cronfiles_test = opts.skip_cronfiles_test
     settings.skip_logrotatefiles_test = opts.skip_logrotatefiles_test
     settings.adequate = not opts.no_adequate
@@ -3552,6 +3676,7 @@ def parse_command_line():
             settings.tmpdir = os.environ["TMPDIR"]
         else:
             settings.tmpdir = "/tmp"
+    assert settings.tmpdir is not None
 
     if not os.path.isdir(settings.tmpdir):
         logging.error("Temporary directory is not a directory: %s" % settings.tmpdir)
@@ -3594,12 +3719,12 @@ def parse_command_line():
     return args
 
 
-def get_chroot():
+def get_chroot() -> Chroot:
     return Chroot()
 
 
 # Process the packages given in a list
-def process_packages(package_list):
+def process_packages(package_list: list[str]) -> None:
     # Find the names of packages.
     if settings.args_are_package_files:
         packages = get_package_names_from_package_files(package_list)
@@ -3661,7 +3786,7 @@ def process_packages(package_list):
             panic()
 
 
-def main():
+def main() -> None:
     """Main program. But you knew that."""
 
     args = parse_command_line()
@@ -3721,6 +3846,7 @@ def main():
     for arg in args:
         if changes_p.match(arg):
             package_list = process_changes(arg)
+            assert package_list is not None
             if settings.single_changes_list:
                 for package in package_list:
                     regular_packages_list.append(package)


=====================================
piupartslib/__init__.py
=====================================
@@ -19,22 +19,27 @@
 
 import bz2
 import lzma
+import typing
 import urllib.error
 import urllib.request
 import zlib
 
 
+class Decompressor(typing.Protocol):
+    def decompress(self, chunk: bytes) -> bytes: ...
+
+
 class DecompressedStream:
-    def __init__(self, fileobj, decompressor=None):
+    def __init__(self, fileobj, decompressor: Decompressor | None = None):
         self._input = fileobj
         self._decompressor = decompressor
         self._buffer = ""
-        self._line_buffer = []
+        self._line_buffer: list[str] = []
         self._i = 0
         self._end = 0
         self._undecbuf = b""
 
-    def _split_decode(self, myb):
+    def _split_decode(self, myb: bytes) -> tuple[str, bytes]:
         lmyb = len(myb)
         for end in range(lmyb, max(lmyb - 6, -1), -1):
             try:
@@ -44,7 +49,7 @@ class DecompressedStream:
         # not returned yet? We have a problem
         raise UnicodeDecodeError
 
-    def _refill(self):
+    def _refill(self) -> bool:
         if self._input is None:
             return False
         while True:
@@ -62,10 +67,10 @@ class DecompressedStream:
             if chunk:
                 return True
 
-    def readline(self):
+    def readline(self) -> str:
         while not self._i < self._end:
             self._i = self._end = 0
-            self._line_buffer = None
+            self._line_buffer = []
             empty = not self._refill()
             if not self._buffer:
                 break
@@ -80,13 +85,13 @@ class DecompressedStream:
             return self._line_buffer[self._i - 1]
         return ""
 
-    def close(self):
+    def close(self) -> None:
         if self._input:
             self._input.close()
         self._input = self._decompressor = None
 
 
-def open_packages_url(url):
+def open_packages_url(url: str) -> tuple[str, DecompressedStream]:
     """Open a Packages.bz2 file pointed to by a URL"""
     socket = None
     error = None
@@ -98,17 +103,15 @@ def open_packages_url(url):
         else:
             break
     else:
+        assert error is not None
         raise error
     url = socket.geturl()
     if ext == ".bz2":
-        decompressor = bz2.BZ2Decompressor()
-        decompressed = DecompressedStream(socket, decompressor)
+        decompressed = DecompressedStream(socket, bz2.BZ2Decompressor())
     elif ext == ".gz":
-        decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
-        decompressed = DecompressedStream(socket, decompressor)
+        decompressed = DecompressedStream(socket, zlib.decompressobj(16 + zlib.MAX_WBITS))
     elif ext == ".xz":
-        decompressor = lzma.LZMADecompressor()
-        decompressed = DecompressedStream(socket, decompressor)
+        decompressed = DecompressedStream(socket, lzma.LZMADecompressor())
     elif ext == "":
         decompressed = socket
     else:


=====================================
piupartslib/conf.py
=====================================
@@ -33,17 +33,19 @@ import distro_info
 
 
 class MissingSection(Exception):
-    def __init__(self, filename, section):
+    def __init__(self, filename: str, section: str):
         self.args = ("Section %s not defined in configuration file %s" % (section, filename),)
 
 
 class MissingMandatorySetting(Exception):
-    def __init__(self, filename, key):
+    def __init__(self, filename: str, key: str):
         self.args = ("Value for %s not set in configuration file %s" % (key, filename),)
 
 
-class Config(UserDict):
-    def __init__(self, section, defaults, mandatory=[], defaults_section=None):
+class Config(UserDict[str, str]):
+    def __init__(
+        self, section: str, defaults: dict[str, str], mandatory: list[str] = [], defaults_section: str | None = None
+    ):
         UserDict.__init__(self)
         self._section = section
         self._defaults_section = defaults_section
@@ -51,7 +53,7 @@ class Config(UserDict):
             self[key] = value
         self._mandatory = mandatory
 
-    def read(self, filename):
+    def read(self, filename: str) -> None:
         cp = configparser.ConfigParser()
         cp.read(filename)
         if not cp.has_section(self._section):
@@ -64,17 +66,17 @@ class Config(UserDict):
             elif key in self._mandatory:
                 raise MissingMandatorySetting(filename, key)
 
-    def get_mirror(self, distro=None):
+    def get_mirror(self, distro: str | None = None) -> str:
         if self["mirror"] is not None:
             return self["mirror"]
         return "http://deb.debian.org/debian"
 
-    def get_distros(self):
+    def get_distros(self) -> list[str]:
         if self["upgrade-test-distros"] is not None:
             return self["upgrade-test-distros"].split()
         return []
 
-    def get_distro(self):
+    def get_distro(self) -> str | None:
         if self["distro"]:
             return self["distro"]
         distros = self.get_distros()
@@ -82,19 +84,19 @@ class Config(UserDict):
             return distros[-1]
         return None
 
-    def get_start_distro(self):
+    def get_start_distro(self) -> str:
         distros = self.get_distros()
         if distros:
             return distros[0]
         return self["distro"]
 
-    def get_final_distro(self):
+    def get_final_distro(self) -> str:
         distros = self.get_distros()
         if distros:
             return distros[-1]
         return self["distro"]
 
-    def _get_distmap(self):
+    def _get_distmap(self) -> defaultdict[str, str]:
         debdist = distro_info.DebianDistroInfo()
 
         # start with e.g. "sid" -> "unstable"
@@ -120,32 +122,33 @@ class Config(UserDict):
 
         return distmap
 
-    def _map_distro(self, distro):
+    def _map_distro(self, distro: str) -> str:
         distro_root = re.split("[-.]", distro)[0]
         distmap = self._get_distmap()
         return distmap[distro_root]
 
-    def get_std_distro(self, distrolist=[]):
+    def get_std_distro(self, distrolist: list[str] = []) -> str:
         if not distrolist:
             distrolist = [self.get_distro()] + self.get_distros()
         mappedlist = [self._map_distro(x) for x in distrolist]
         return reduce(lambda x, y: y if y != "unknown" else x, mappedlist)
 
-    def get_area(self):
+    def get_area(self) -> str:
         if self["area"] is not None:
             return self["area"]
         return "main"
 
-    def get_arch(self):
+    def get_arch(self) -> str:
         if not self["arch"]:
             # Try to figure it out ourselves, using dpkg
             p = subprocess.Popen(["dpkg", "--print-architecture"], stdout=subprocess.PIPE)
+            assert p.stdout is not None
             self["arch"] = p.stdout.read().decode().rstrip()
         return self["arch"]
 
 
-class DistroConfig(UserDict):
-    def __init__(self, filename, mirror):
+class DistroConfig(UserDict[str, dict[str, str | None]]):
+    def __init__(self, filename: str, mirror: str):
         UserDict.__init__(self)
         self._mirror = mirror
         self._defaults = {
@@ -164,31 +167,32 @@ class DistroConfig(UserDict):
                 if cp.has_option(section, key):
                     self[section][key] = cp.get(section, key)
 
-    def get(self, section, key=None):
-        if section not in self.keys():
-            self[section] = dict(self._defaults, distribution=section)
-        if key is not None:
-            return self[section][key]
-        return self[section]
+    def get_field(self, section: str, key: str) -> str | None:
+        try:
+            sectdict = self[section]
+        except KeyError:
+            sectdict = dict(self._defaults, distribution=section)
+            self[section] = sectdict
+        return sectdict[key]
 
-    def _is_virtual(self, distro):
-        uri = self.get(distro, "uri")
+    def _is_virtual(self, distro: str) -> bool:
+        uri = self.get_field(distro, "uri")
         return uri is not None and uri == "None"
 
-    def get_mirror(self, distro):
+    def get_mirror(self, distro: str) -> str:
         if self._is_virtual(distro):
             distro = self._expand_depends(distro)[0]
-        return self.get(distro, "uri") or self._mirror
+        return self.get_field(distro, "uri") or self._mirror
 
-    def get_distribution(self, distro):
+    def get_distribution(self, distro: str) -> str:
         if self._is_virtual(distro):
             distro = self._expand_depends(distro)[0]
-        return self.get(distro, "distribution") or distro
+        return self.get_field(distro, "distribution") or distro
 
-    def get_candidates(self, distro):
-        return (self.get(distro, "candidates") or "").split() or [distro]
+    def get_candidates(self, distro: str) -> list[str]:
+        return (self.get_field(distro, "candidates") or "").split() or [distro]
 
-    def _get_packages_url(self, distro, area, arch):
+    def _get_packages_url(self, distro: str, area: str, arch: str) -> str:
         return "%s/dists/%s/%s/binary-%s/Packages" % (
             self.get_mirror(distro),
             self.get_distribution(distro),
@@ -196,26 +200,26 @@ class DistroConfig(UserDict):
             arch,
         )
 
-    def get_packages_urls(self, distro, area, arch):
+    def get_packages_urls(self, distro: str, area: str, arch: str) -> list[str]:
         return [self._get_packages_url(d, area, arch) for d in self.get_candidates(distro)]
 
-    def _get_sources_url(self, distro, area):
+    def _get_sources_url(self, distro: str, area: str) -> str:
         return "%s/dists/%s/%s/source/Sources" % (
             self.get_mirror(distro),
             self.get_distribution(distro),
             area,
         )
 
-    def get_sources_urls(self, distro, area):
+    def get_sources_urls(self, distro: str, area: str) -> list[str]:
         return [self._get_sources_url(d, area) for d in self.get_candidates(distro)]
 
-    def get_target_flags(self, distro):
-        tr = self.get(distro, "target-release")
+    def get_target_flags(self, distro: str) -> list[str]:
+        tr = self.get_field(distro, "target-release")
         if tr:
             return ["-t", tr]
         return []
 
-    def _expand_depends(self, distro, include_virtual=False):
+    def _expand_depends(self, distro: str, include_virtual: bool = False) -> list[str]:
         todo = [distro]
         done = []
         seen = []
@@ -224,25 +228,26 @@ class DistroConfig(UserDict):
             todo = todo[1:]
             if curr not in seen:
                 seen.append(curr)
-                todo = (self.get(curr, "depends") or "").split() + [curr] + todo
+                todo = (self.get_field(curr, "depends") or "").split() + [curr] + todo
             elif curr not in done:
                 if include_virtual or not self._is_virtual(curr):
                     done.append(curr)
         assert len(done) > 0
         return done
 
-    def get_deb_lines(self, distro, components):
+    def get_deb_lines(self, distro: str, components: list[str]) -> list[str]:
         lines = []
         for d in self._expand_depends(distro):
+            d_components = self[d]["components"]
             for c in components:
-                if self[d]["components"] is None or c in self[d]["components"].split():
+                if d_components is None or c in d_components.split():
                     lines.append("deb %s %s %s" % (self.get_mirror(d), self.get_distribution(d), c))
         return lines
 
-    def get_basetgz(self, distro, arch, merged_usr=True):
+    def get_basetgz(self, distro: str, arch: str, merged_usr: bool = True) -> str | None:
         # look for the first base distribution
         for d in self._expand_depends(distro):
-            if self.get(d, "depends"):
+            if self.get_field(d, "depends"):
                 next  # skip partial distro
             return "%s%s_%s.tar.gz" % (
                 self.get_distribution(d),


=====================================
piupartslib/dependencyparser.py
=====================================
@@ -27,7 +27,6 @@ The result uses SimpleDependency objects.
 Lars Wirzenius <liw at iki.fi>
 """
 
-
 import re
 
 
@@ -237,7 +236,7 @@ class DependencyParser:
         name = self._parse_package_name()
         if not name:
             return None
-        (op, version) = self._parse_version_dependency()
+        op, version = self._parse_version_dependency()
         arch = self._parse_arch_restriction()
         return SimpleDependency(name, op, version, arch)
 


=====================================
piupartslib/dwke.py
=====================================
@@ -73,7 +73,7 @@ class Problem:
         tagged = re.sub("^([A-Z_]+=)", r"<hdr>\g<0>", probbody, 0, re.MULTILINE)
 
         for chub in re.split("<hdr>", tagged)[1:]:
-            (name, value) = re.split("=", chub, 1, re.MULTILINE)
+            name, value = re.split("=", chub, 1, re.MULTILINE)
 
             while value[-1] == "\n":
                 value = value[:-1]
@@ -131,7 +131,7 @@ class FailureManager:
             try:
                 with open(get_kpr_path(logpath), "r") as kp:
                     for line in kp:
-                        (where, problem) = self.parse_kpr_line(line)
+                        where, problem = self.parse_kpr_line(line)
                         self.failures.append(make_failure(where, problem, pkgspec))
             except IOError:
                 logging.error("Error processing %s" % get_kpr_path(logpath))


=====================================
piupartslib/packagesdb.py
=====================================
@@ -25,7 +25,6 @@ been tested, the test results, and for determining what to test next.
 Lars Wirzenius <liw at iki.fi>
 """
 
-
 import logging
 import os
 import random
@@ -171,7 +170,7 @@ class PackagesFile(UserDict):
     def load_packages_urls(self, urls, restrict_packages=None):
         for url in urls:
             logging.debug("Opening %s.*" % url)
-            (url, stream) = piupartslib.open_packages_url(url)
+            url, stream = piupartslib.open_packages_url(url)
             logging.debug("Fetching %s" % url)
             self._read_file(stream, restrict_packages=restrict_packages)
             stream.close()
@@ -248,7 +247,7 @@ class LogDB:
         return self.log_exists2(package.name(), package.test_versions(), subdirs)
 
     def create(self, subdir, package, version, contents):
-        (fd, temp_name) = tempfile.mkstemp(dir=subdir)
+        fd, temp_name = tempfile.mkstemp(dir=subdir)
         contents = contents.encode()
         if os.write(fd, contents) != len(contents):
             raise Exception("Partial write?")


=====================================
tests/unittests.py
=====================================
@@ -57,12 +57,10 @@ class PackagesDbTests(unittest.TestCase):
         self.assertEqual(p, None)
 
     def testNoDeps(self):
-        p = self.reserve(
-            """\
+        p = self.reserve("""\
 Package: foo
 Version: 1.0-1
-"""
-        )
+""")
         self.assertNotEqual(p, None)
         self.assertEqual(p["Package"], "foo")
 



View it on GitLab: https://salsa.debian.org/debian/piuparts/-/compare/333ba11576e01055c1bfa31d5b3d2199bfba2547...fb2e2c4ec28d9c5872da17785f5de1dd25e9d151

-- 
View it on GitLab: https://salsa.debian.org/debian/piuparts/-/compare/333ba11576e01055c1bfa31d5b3d2199bfba2547...fb2e2c4ec28d9c5872da17785f5de1dd25e9d151
You're receiving this email because of your account on salsa.debian.org. Manage all notifications: https://salsa.debian.org/-/profile/notifications | Help: https://salsa.debian.org/help


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/piuparts-devel/attachments/20260622/06615f0a/attachment-0001.htm>


More information about the Piuparts-devel mailing list