[Piuparts-devel] [Git][debian/piuparts][helmutg/typehints] add Python type hints to piuparts.py and its dependencies

Helmut Grohne (@helmutg) gitlab at salsa.debian.org
Mon Jul 21 07:58:52 BST 2025



Helmut Grohne pushed to branch helmutg/typehints at Debian / piuparts


Commits:
cd0c859f by Helmut Grohne at 2025-07-21T08:58:40+02:00
add Python type hints to piuparts.py and its dependencies

In order to fix a few type errors, some semantic changes are included.
 * Methods of the Defaults class raise NotImplementedError rather than
   returning None.
 * The settings class assigns default values for apt_unauthenticated,
   distro_config and testobjects in its constructor.
 * Renamed a few variables that were used with different types.
 * Added a number of assert something is not None to guide mypy.
 * get_state_meta_data constructs its result at once.
 * TimeOffsetFormatter.formatTime signature updated to match super
   class: Added default value.
 * The type of the warn_only argument changed to bool in two methods.
 * Added missing return statements and None values to existing return
   statements.
 * find_default_debian_mirrors returns [] when it previously returned
   None.
 * Rewrote DistroConfig.get as get_field to avoid conflicting with the
   super class type.
 * DistroConfig.get_deb_lines caches a lookup to help mypy track state.
 * unqualify always returns a list now.

- - - - -


3 changed files:

- piuparts.py
- piupartslib/__init__.py
- piupartslib/conf.py


Changes:

=====================================
piuparts.py
=====================================
@@ -47,11 +47,11 @@ import sys
 import tempfile
 import time
 import traceback
+import typing
 import uuid
 from collections import defaultdict, namedtuple
 from contextlib import ExitStack
 from signal import SIGALRM, SIGKILL, SIGTERM, alarm, signal
-from typing import Dict, Tuple
 
 import apt_pkg
 import distro_info
@@ -77,56 +77,60 @@ class Defaults:
 
     """
 
-    def get_components(self):
+    def get_components(self) -> list[str]:
         """Return list of default components for a mirror."""
+        raise NotImplementedError
 
-    def get_mirror(self):
+    def get_mirror(self) -> list[tuple[str, list[str]]]:
         """Return default mirror."""
+        raise NotImplementedError
 
-    def get_distribution(self):
+    def get_distribution(self) -> list[str]:
         """Return default distribution."""
+        raise NotImplementedError
 
-    def get_keyring(self):
+    def get_keyring(self) -> str:
         """Return default keyring."""
+        raise NotImplementedError
 
 
 class DebianDefaults(Defaults):
-    def get_components(self):
+    def get_components(self) -> list[str]:
         return ["main", "contrib", "non-free", "non-free-firmware"]
 
-    def get_mirror(self):
+    def get_mirror(self) -> list[tuple[str, list[str]]]:
         return [("http://deb.debian.org/debian", self.get_components())]
 
-    def get_distribution(self):
+    def get_distribution(self) -> list[str]:
         return [distro_info.DebianDistroInfo().devel()]
 
-    def get_keyring(self):
+    def get_keyring(self) -> str:
         return "/usr/share/keyrings/debian-archive-keyring.gpg"
 
 
 class UbuntuDefaults(Defaults):
-    def get_components(self):
+    def get_components(self) -> list[str]:
         return ["main", "universe", "restricted", "multiverse"]
 
-    def get_mirror(self):
+    def get_mirror(self) -> list[tuple[str, list[str]]]:
         return [("http://archive.ubuntu.com/ubuntu", self.get_components())]
 
-    def get_distribution(self):
+    def get_distribution(self) -> list[str]:
         return [distro_info.UbuntuDistroInfo().devel()]
 
-    def get_keyring(self):
+    def get_keyring(self) -> str:
         return "/usr/share/keyrings/ubuntu-archive-keyring.gpg"
 
 
 class DefaultsFactory:
     """Instantiate the right defaults class."""
 
-    def guess_flavor(self):
+    def guess_flavor(self) -> str:
         p = subprocess.Popen(["lsb_release", "-i", "-s"], stdout=subprocess.PIPE, universal_newlines=True)
         stdout, stderr = p.communicate()
         return stdout.strip().lower()
 
-    def new_defaults(self):
+    def new_defaults(self) -> Defaults:
         if not settings.defaults:
             settings.defaults = self.guess_flavor()
         if settings.defaults.lower() == "debian":
@@ -140,9 +144,9 @@ class DefaultsFactory:
 class Settings:
     """Global settings for this program."""
 
-    def __init__(self):
-        self.defaults = None
-        self.tmpdir = None
+    def __init__(self) -> None:
+        self.defaults: str | None = None
+        self.tmpdir: str | None = None
         self.keep_env = False
         self.shell_on_error = False
         self.max_command_output_size = (
@@ -154,13 +158,13 @@ class Settings:
         self.args_are_package_files = True
         # distro setup
         self.proxy = None
-        self.debian_mirrors = []
-        self.extra_repos = []
+        self.debian_mirrors: list[tuple[str, list[str]]] = []
+        self.extra_repos: list[str] = []
         self.testdebs_repo = None
-        self.debian_distros = []
-        self.bootstrapcmd = []
+        self.debian_distros: list[str] = []
+        self.bootstrapcmd: list[str] = []
         self.keep_sources_list = False
-        self.keyring = None
+        self.keyring: str | None = None
         self.do_not_verify_signatures = False
         self.no_check_valid_until = False
         self.install_recommends = False
@@ -168,8 +172,8 @@ class Settings:
         self.eatmydata = True
         self.dpkg_force_unsafe_io = True
         self.dpkg_force_confdef = False
-        self.scriptsdirs = []
-        self.bindmounts = []
+        self.scriptsdirs: list[str] = []
+        self.bindmounts: list[str] = []
         self.allow_database = False
         self.update_retries = None
         # chroot setup
@@ -185,7 +189,7 @@ class Settings:
         self.save_end_meta = None
         self.skip_minimize = True
         self.minimize = False
-        self.debfoster_options = None
+        self.debfoster_options: list[str] | None = None
         self.docker_image = None
         self.merged_usr = True
         # tests and checks
@@ -196,8 +200,8 @@ class Settings:
         self.install_remove_install = False
         self.install_purge_install = False
         self.list_installed_files = False
-        self.fake_essential_packages = []
-        self.extra_old_packages = []
+        self.fake_essential_packages: list[str] = []
+        self.extra_old_packages: list[str] = []
         self.skip_cronfiles_test = False
         self.skip_logrotatefiles_test = False
         self.adequate = True
@@ -416,6 +420,9 @@ class Settings:
             ":/lib/modules/([^/]*/(modules.*)?)?",
         ]
         self.non_pedantic_ignore_patterns = ["/tmp/.*"]
+        self.apt_unauthenticated = "No"
+        self.distro_config: piupartslib.conf.DistroConfig | None = None
+        self.testobjects: list[str] | None = None
 
 
 settings = Settings()
@@ -425,7 +432,7 @@ on_panic_hooks = {}
 counter = 0
 
 
-def do_on_panic(hook):
+def do_on_panic(hook: typing.Callable[[], None]) -> int:
     global counter
     cid = counter
     counter += 1
@@ -433,16 +440,16 @@ def do_on_panic(hook):
     return cid
 
 
-def dont_do_on_panic(id):
+def dont_do_on_panic(id: int) -> None:
     del on_panic_hooks[id]
 
 
 class TimeOffsetFormatter(logging.Formatter):
-    def __init__(self, fmt=None, datefmt=None):
+    def __init__(self, fmt: str | None = None, datefmt: str | None = None):
         self.startup_time = time.time()
         logging.Formatter.__init__(self, fmt, datefmt)
 
-    def formatTime(self, record, datefmt):
+    def formatTime(self, record: logging.LogRecord, datefmt: str | None = None) -> str:
         t = time.time() - self.startup_time
         t_min = int(t / 60)
         t_sec = t % 60.0
@@ -450,36 +457,36 @@ class TimeOffsetFormatter(logging.Formatter):
 
 
 DUMP = logging.DEBUG - 1
-HANDLERS = []
+HANDLERS: list[logging.Handler] = []
 
 
-def setup_logging(log_level, log_file_name):
+def setup_logging(log_level: int, log_file_name: str) -> None:
     logging.addLevelName(DUMP, "DUMP")
 
     logger = logging.getLogger()
     logger.setLevel(log_level)
     formatter = TimeOffsetFormatter("%(asctime)s %(levelname)s: %(message)s")
 
-    handler = logging.StreamHandler(sys.stdout)
-    handler.setFormatter(formatter)
-    logger.addHandler(handler)
-    HANDLERS.append(handler)
+    shandler = logging.StreamHandler(sys.stdout)
+    shandler.setFormatter(formatter)
+    logger.addHandler(shandler)
+    HANDLERS.append(shandler)
 
     if log_file_name:
-        handler = logging.FileHandler(log_file_name)
-        handler.setFormatter(formatter)
-        logger.addHandler(handler)
-        HANDLERS.append(handler)
+        fhandler = logging.FileHandler(log_file_name)
+        fhandler.setFormatter(formatter)
+        logger.addHandler(fhandler)
+        HANDLERS.append(fhandler)
 
 
-def dump(msg):
+def dump(msg: str) -> None:
     logger = logging.getLogger()
     logger.log(DUMP, msg)
     for handler in HANDLERS:
         handler.flush()
 
 
-def panic(exit=1):
+def panic(exit: int = 1) -> typing.NoReturn:
     for i in reversed(range(counter)):
         if i in on_panic_hooks:
             on_panic_hooks[i]()
@@ -487,27 +494,27 @@ def panic(exit=1):
     sys.exit(exit)
 
 
-def indent_string(str):
+def indent_string(str: str) -> str:
     """Indent all lines in a string with two spaces and return result."""
     return "\n".join(["  " + line for line in str.split("\n")])
 
 
-def command2string(command):
+def command2string(command: list[str]) -> str:
     """Quote s.t. copy+paste from the logfile gives a runnable command in the shell."""
     return " ".join([shlex.quote(arg) for arg in command])
 
 
-def unqualify(packages):
+def unqualify(packages: typing.Iterable[str]) -> list[str]:
     if packages:
         return [p.split("=", 1)[0].strip() for p in packages]
-    return packages
+    return []
 
 
 class Alarm(Exception):
     pass
 
 
-def alarm_handler(signum, frame):
+def alarm_handler(signum, frame) -> typing.NoReturn:
     raise Alarm
 
 
@@ -530,17 +537,18 @@ FILTERED_ENVVARS = {
 }
 
 
-def get_clean_environment() -> Dict[str, str]:
+def get_clean_environment() -> dict[str, str]:
     """Get a cleaned up environment"""
+    assert settings.testobjects is not None
     env = {k: v for k, v in os.environ.items() if k not in FILTERED_ENVVARS}
     env["PIUPARTS_OBJECTS"] = " ".join(str(vobject) for vobject in settings.testobjects)
     return env
 
 
-def run(command, ignore_errors=False, timeout=0):
+def run(command: list[str], ignore_errors: bool = False, timeout: int = 0) -> tuple[int, str]:
     """Run an external command and die with error message if it fails."""
 
-    def kill_subprocess(p, reason):
+    def kill_subprocess(p: subprocess.Popen[str], reason: str) -> None:
         logging.error("Terminating command due to %s" % reason)
         p.terminate()
         for i in range(10):
@@ -569,6 +577,7 @@ def run(command, ignore_errors=False, timeout=0):
                 errors="backslashreplace",
             )
         )
+        assert p.stdout is not None
         output = ""
         excessive_output = False
         if timeout > 0:
@@ -610,14 +619,14 @@ def run(command, ignore_errors=False, timeout=0):
         return p.returncode, output
 
 
-def create_temp_file():
+def create_temp_file() -> tuple[int, str]:
     """Create a temporary file and return its full path."""
     (fd, path) = tempfile.mkstemp(dir=settings.tmpdir)
     logging.debug("Created temporary file %s" % path)
     return (fd, path)
 
 
-def create_file(filename, contents):
+def create_file(filename: str, contents: str) -> None:
     """Create a new file with the desired name and contents."""
     try:
         with open(filename, "w") as f:
@@ -627,12 +636,12 @@ def create_file(filename, contents):
         panic()
 
 
-def readlines_file(filename):
+def readlines_file(filename: str) -> list[str]:
     with open(filename, "r") as f:
         return f.readlines()
 
 
-def remove_files(filenames):
+def remove_files(filenames: list[str]) -> None:
     """Remove some files."""
     for filename in filenames:
         logging.debug("Removing %s" % filename)
@@ -643,7 +652,7 @@ def remove_files(filenames):
             panic()
 
 
-def make_metapackage(name, depends, conflicts, arch="all"):
+def make_metapackage(name: str, depends: str, conflicts: str, arch: str = "all") -> str:
     """Return the path to a .deb created just for satisfying dependencies
 
     Caller is responsible for removing the temporary directory containing the
@@ -682,7 +691,7 @@ def make_metapackage(name, depends, conflicts, arch="all"):
     return os.path.join(tmpdir, name + ".deb")
 
 
-def split_path(pathname):
+def split_path(pathname: str) -> list[str]:
     parts = []
     while pathname:
         (head, tail) = os.path.split(pathname)
@@ -698,7 +707,15 @@ def split_path(pathname):
     return parts
 
 
-def canonicalize_path(root, pathname, report_links=False):
+ at typing.overload
+def canonicalize_path(root: str, pathname: str, report_links: typing.Literal[False] = False) -> str: ...
+
+
+ at typing.overload
+def canonicalize_path(root: str, pathname: str, report_links: typing.Literal[True]) -> list[tuple[str, str]]: ...
+
+
+def canonicalize_path(root: str, pathname: str, report_links: bool = False) -> str | list[tuple[str, str]]:
     """Canonicalize a path name, simulating chroot at 'root'.
 
     When resolving the symlink, pretend (similar to chroot) that
@@ -713,7 +730,7 @@ def canonicalize_path(root, pathname, report_links=False):
     """
     # print("\nCANONICALIZE %s %s" % (root, pathname))
     links = []
-    seen = []
+    seen: list[str] = []
     parts = split_path(pathname)
     # print("PARTS ", list(reversed(parts)))
     path = "/"
@@ -751,7 +768,7 @@ def canonicalize_path(root, pathname, report_links=False):
     return path
 
 
-def is_broken_symlink(root, dirpath, filename):
+def is_broken_symlink(root: str, dirpath: str, filename: str) -> bool:
     """Is symlink dirpath+filename broken?"""
 
     if dirpath[: len(root)] == root:
@@ -768,25 +785,33 @@ def is_broken_symlink(root, dirpath, filename):
 FileInfo = namedtuple("FileInfo", ["st", "target", "user", "group"])
 
 
+class StateMetaData(typing.TypedDict):
+    initial_selections: dict[str, tuple[str, str | None]] | None
+    avail_md5: list[str]
+    tree: dict[str, FileInfo]
+    selections: dict[str, tuple[str, str | None]]
+    diversions: list[str] | None
+
+
 class Chroot:
     """A chroot for testing things in."""
 
-    def __init__(self):
-        self.name = None
+    def __init__(self) -> None:
+        self.name: str | None = None
         self.bootstrapped = False
-        self.mounts = []
-        self.initial_selections = None
-        self.avail_md5_history = []
-        self.systemd_tmpfiles: Dict[str, Tuple[str, str]] = {}
+        self.mounts: list[str] = []
+        self.initial_selections: dict[str, tuple[str, str | None]] | None = None
+        self.avail_md5_history: list[str] = []
+        self.systemd_tmpfiles: dict[str, tuple[str, str]] = {}
 
-    def create_temp_dir(self):
+    def create_temp_dir(self) -> None:
         """Create a temporary directory for the chroot."""
         self.name = tempfile.mkdtemp(dir=settings.tmpdir)
         create_file(os.path.join(self.name, ".piuparts.tmpdir"), "chroot")
         os.chmod(self.name, 0o755)
         logging.debug("Created temporary directory %s" % self.name)
 
-    def create(self, temp_tgz=None):
+    def create(self, temp_tgz: str | None = None) -> None:
         """Create a chroot according to user's wishes."""
         self.panic_handler_id = do_on_panic(self.remove)
         if not settings.schroot and not settings.docker_image:
@@ -845,7 +870,7 @@ class Chroot:
         if settings.savetgz and not temp_tgz:
             self.pack_into_tgz(settings.savetgz)
 
-    def remove(self):
+    def remove(self) -> None:
         """Remove a chroot and all its contents."""
         if not self.name:
             return
@@ -877,10 +902,10 @@ class Chroot:
                 logging.debug("Keeping directory tree at %s" % self.name)
         dont_do_on_panic(self.panic_handler_id)
 
-    def was_bootstrapped(self):
+    def was_bootstrapped(self) -> bool:
         return self.bootstrapped
 
-    def create_temp_tgz_file(self):
+    def create_temp_tgz_file(self) -> str:
         """Return the path to a file to be used as a temporary tgz file"""
         # Yes, create_temp_file() would work just as well, but putting it in
         # the interface for Chroot allows the VirtServ hack to work.
@@ -888,14 +913,15 @@ class Chroot:
         os.close(fd)
         return temp_tgz
 
-    def remove_temp_tgz_file(self, temp_tgz):
+    def remove_temp_tgz_file(self, temp_tgz: str) -> None:
         """Remove the file that was used as a temporary tgz file"""
         # Yes, remove_files() would work just as well, but putting it in
         # the interface for Chroot allows the VirtServ hack to work.
         remove_files([temp_tgz])
 
-    def pack_into_tgz(self, result):
+    def pack_into_tgz(self, result: str) -> None:
         """Tar and compress all files in the chroot."""
+        assert self.name is not None
         self.run(["apt-get", "clean"])
         logging.debug("Saving %s to %s." % (self.name, result))
 
@@ -922,15 +948,16 @@ class Chroot:
         os.rename(tmpfile, result)
         dont_do_on_panic(panic_handler_id)
 
-    def unpack_from_tgz(self, tarball):
+    def unpack_from_tgz(self, tarball: str) -> None:
         """Unpack a tarball to a chroot."""
+        assert self.name is not None
         logging.debug("Unpacking %s into %s" % (tarball, self.name))
         prefix = []
         if settings.eatmydata and os.path.isfile("/usr/bin/eatmydata"):
             prefix.append("eatmydata")
         run(prefix + ["tar", "-C", self.name, "--auto-compress", "-xf", tarball])
 
-    def setup_from_schroot(self, schroot):
+    def setup_from_schroot(self, schroot: str) -> None:
         self.schroot_session = schroot.split(":", 1)[-1] + "-" + str(uuid.uuid1()) + "-piuparts"
         run(["schroot", "--begin-session", "--chroot", schroot, "--session-name", self.schroot_session])
         ret_code, output = run(["schroot", "--chroot", "session:" + self.schroot_session, "--location"])
@@ -938,13 +965,13 @@ class Chroot:
         logging.info("New schroot session in '%s'" % self.name)
 
     @staticmethod
-    def check_if_docker_storage_driver_is_supported():
+    def check_if_docker_storage_driver_is_supported() -> None:
         ret_code, output = run(["docker", "info"])
         if "overlay2" not in output:
             logging.error("Only overlay2 storage driver is supported")
             panic()
 
-    def setup_from_docker(self, docker_image):
+    def setup_from_docker(self, docker_image: str) -> None:
         self.check_if_docker_storage_driver_is_supported()
         with tempfile.TemporaryDirectory() as tmpdir:
             cidfile = pathlib.Path(tmpdir) / "cidfile"
@@ -959,8 +986,9 @@ class Chroot:
         self.name = output.strip()
         logging.info("New container created %r at %r", self.docker_container, self.name)
 
-    def setup_from_lvm(self, lvm_volume):
+    def setup_from_lvm(self, lvm_volume: str) -> None:
         """Create a chroot by creating an LVM snapshot."""
+        assert self.name is not None
         self.lvm_base = os.path.dirname(lvm_volume)
         self.lvm_vol_name = os.path.basename(lvm_volume)
         self.lvm_snapshot_name = self.lvm_vol_name + "-" + str(uuid.uuid1())
@@ -971,8 +999,9 @@ class Chroot:
         logging.info("Mounting LVM snapshot to %s" % self.name)
         run(["mount", self.lvm_snapshot, self.name])
 
-    def setup_from_dir(self, dirname):
+    def setup_from_dir(self, dirname: str) -> None:
         """Create chroot from an existing one."""
+        assert self.name is not None
         # if on same device, make hard link
         cmd = ["cp"]
         if settings.hard_link and os.stat(dirname).st_dev == os.stat(self.name).st_dev:
@@ -986,7 +1015,8 @@ class Chroot:
             dst = os.path.join(self.name, name)
             run(cmd + [src, dst])
 
-    def interactive_shell(self):
+    def interactive_shell(self) -> None:
+        assert self.name is not None
         logging.info("Entering interactive shell in %s" % self.name)
         env = os.environ.copy()
         env["debian_chroot"] = "piuparts:%s" % self.name
@@ -995,7 +1025,8 @@ class Chroot:
         except Exception:
             pass
 
-    def run(self, command, ignore_errors=False):
+    def run(self, command: list[str], ignore_errors: bool = False) -> tuple[int, str]:
+        assert self.name is not None
         prefix = []
         if settings.eatmydata and os.path.isfile(os.path.join(self.name, "usr/bin/eatmydata")):
             prefix.append("eatmydata")
@@ -1035,13 +1066,14 @@ class Chroot:
                 timeout=settings.max_command_runtime,
             )
 
-    def mkdir_p(self, path):
+    def mkdir_p(self, path: str) -> None:
         fullpath = self.relative(path)
         if not os.path.isdir(fullpath):
             os.makedirs(fullpath)
 
-    def create_apt_sources(self, distro):
+    def create_apt_sources(self, distro: str) -> None:
         """Create an /etc/apt/sources.list with a given distro."""
+        assert settings.distro_config is not None
         lines = []
         lines.extend(settings.distro_config.get_deb_lines(distro, settings.debian_mirrors[0][1]))
         for mirror, components in settings.debian_mirrors[1:]:
@@ -1051,7 +1083,7 @@ class Chroot:
         create_file(self.relative("etc/apt/sources.list"), "\n".join(lines) + "\n")
         logging.debug("sources.list:\n" + indent_string("\n".join(lines)))
 
-    def aptupdate_run(self):
+    def aptupdate_run(self) -> int | None:
         """Resynchronize the package index files.
         If executed under --update-retries <num>, retry
         its execution up to <num> times, useful e.g.
@@ -1059,7 +1091,7 @@ class Chroot:
         errors."""
         if not settings.update_retries:
             self.run(["apt-get", "update"])
-            return
+            return None
 
         count = 0
         for count, run in enumerate(range(settings.update_retries), 1):
@@ -1074,7 +1106,7 @@ class Chroot:
 
         return status
 
-    def enable_testdebs_repo(self, update=True):
+    def enable_testdebs_repo(self, update: bool = True) -> None:
         if settings.testdebs_repo:
             if settings.testdebs_repo.startswith("deb"):
                 debline = settings.testdebs_repo
@@ -1087,12 +1119,12 @@ class Chroot:
             if update:
                 self.aptupdate_run()
 
-    def disable_testdebs_repo(self):
+    def disable_testdebs_repo(self) -> None:
         if settings.testdebs_repo:
             logging.debug("disabling testdebs repository")
             remove_files([self.relative("etc/apt/sources.list.d/piuparts-testdebs-repo.list")])
 
-    def create_apt_conf(self):
+    def create_apt_conf(self) -> None:
         """Create /etc/apt/apt.conf.d/piuparts inside the chroot."""
         lines = ['APT::Get::Assume-Yes "yes";\n']
         lines.append('APT::Install-Recommends "%d";\n' % int(settings.install_recommends))
@@ -1124,7 +1156,7 @@ class Chroot:
 
         create_file(self.relative("etc/apt/apt.conf.d/piuparts"), "".join(lines))
 
-    def create_dpkg_conf(self):
+    def create_dpkg_conf(self) -> None:
         """Create /etc/dpkg/dpkg.cfg.d/piuparts inside the chroot."""
         lines = []
         if settings.dpkg_force_unsafe_io:
@@ -1139,7 +1171,7 @@ class Chroot:
             self.mkdir_p("etc/dpkg/dpkg.cfg.d")
             create_file(self.relative("etc/dpkg/dpkg.cfg.d/piuparts"), "".join(lines))
 
-    def create_policy_rc_d(self):
+    def create_policy_rc_d(self) -> None:
         """Create a policy-rc.d that prevents daemons from running."""
         full_name = self.relative("usr/sbin/policy-rc.d")
         policy = "#!/bin/sh\n"
@@ -1156,7 +1188,7 @@ class Chroot:
         os.chmod(full_name, 0o755)
         logging.debug("Created policy-rc.d and chmodded it.")
 
-    def create_resolv_conf(self):
+    def create_resolv_conf(self) -> None:
         """Update resolv.conf based on the current configuration in the host system. Strip comments and whitespace."""
         if settings.docker_image:
             # Docker takes care of this
@@ -1171,8 +1203,10 @@ class Chroot:
         os.chmod(full_name, 0o644)
         logging.debug("Created resolv.conf.")
 
-    def setup_minimal_chroot(self):
+    def setup_minimal_chroot(self) -> None:
         """Set up a minimal Debian system in a chroot."""
+        assert self.name is not None
+        assert settings.distro_config is not None
         logging.debug("Setting up minimal chroot for %s at %s." % (settings.debian_distros[0], self.name))
         prefix = []
         if settings.eatmydata and os.path.isfile("/usr/bin/eatmydata"):
@@ -1199,10 +1233,11 @@ class Chroot:
         )
         self.bootstrapped = True
 
-    def minimize(self):
+    def minimize(self) -> None:
         """Minimize a chroot by removing (almost all) unnecessary packages"""
         if settings.skip_minimize or not settings.minimize:
             return
+        assert settings.debfoster_options is not None
         self.run(["apt-get", "install", "debfoster"])
         debfoster_command = ["debfoster"] + settings.debfoster_options
         if settings.eatmydata:
@@ -1211,8 +1246,10 @@ class Chroot:
         remove_files([self.relative("var/lib/debfoster/keepers")])
         self.run(["dpkg", "--purge", "debfoster"])
 
-    def configure_chroot(self):
+    def configure_chroot(self) -> None:
         """Configure a chroot according to current settings"""
+        assert self.name is not None
+        assert settings.distro_config is not None
         os.environ["PIUPARTS_DISTRIBUTION"] = settings.distro_config.get_distribution(settings.debian_distros[0])
         if not settings.keep_sources_list:
             self.create_apt_sources(settings.debian_distros[0])
@@ -1282,18 +1319,18 @@ class Chroot:
         for bindmount in settings.bindmounts:
             self.mount(bindmount, bindmount, opts=["rbind"])
 
-    def remember_available_md5(self):
+    def remember_available_md5(self) -> None:
         """Keep a history of 'apt-cache dumpavail | md5sum' after initial
         setup and each dist-upgrade step to notice outdated reference
         chroot metadata"""
         errorcode, avail_md5 = self.run(["sh", "-c", "apt-cache dumpavail | md5sum"])
         self.avail_md5_history.append(avail_md5.split()[0])
 
-    def remember_initial_selections(self):
+    def remember_initial_selections(self) -> None:
         """Remember initial selections to easily recognize mismatching chroot metadata"""
         self.initial_selections = self.get_selections()
 
-    def remember_systemd_tmpfiles(self):
+    def remember_systemd_tmpfiles(self) -> None:
         """Remember files potentially affected by systemd-tmpfiles.
 
         Only files and directories with some flags are ignored by piuparts."""
@@ -1327,8 +1364,9 @@ class Chroot:
             if flag[0] in ignored_flags:
                 self.systemd_tmpfiles[filename] = (current_file, flag)
 
-    def upgrade_to_distros(self, distros, packages, apt_get_upgrade=False):
+    def upgrade_to_distros(self, distros: list[str], packages: list[str], apt_get_upgrade: bool = False) -> None:
         """Upgrade a chroot installation to each successive distro."""
+        assert settings.distro_config is not None
         for distro in distros:
             logging.debug("Upgrading %s to %s" % (self.name, distro))
             os.environ["PIUPARTS_DISTRIBUTION_NEXT"] = settings.distro_config.get_distribution(distro)
@@ -1354,7 +1392,7 @@ class Chroot:
             self.run_scripts("post_distupgrade")
             self.check_for_no_processes()
 
-    def get_known_packages(self, packages):
+    def get_known_packages(self, packages: list[str]) -> list[str]:
         """Does apt-get (or apt-cache) know about a set of packages?"""
         known_packages = []
         new_packages = []
@@ -1375,7 +1413,7 @@ class Chroot:
                 logging.info("the following packages are not in the archive: " + ", ".join(new_packages))
         return known_packages
 
-    def copy_files(self, source_names, target_name):
+    def copy_files(self, source_names: list[str], target_name: str) -> None:
         """Copy files in 'source_name' to file/dir 'target_name', relative
         to the root of the chroot."""
         target_name = self.relative(target_name)
@@ -1387,7 +1425,7 @@ class Chroot:
                 logging.error("Error copying %s to %s: %s" % (source_name, target_name, detail))
                 panic()
 
-    def list_installed_files(self, pre_info, post_info):
+    def list_installed_files(self, pre_info: dict[str, FileInfo], post_info: dict[str, FileInfo]) -> None:
         """List the new files installed, removed and modified between two dir trees.
         Actually, it is a nice output of the funcion diff_meta_dat."""
         (new, removed, modified) = diff_meta_data(pre_info, post_info, self.is_ignored)
@@ -1406,7 +1444,7 @@ class Chroot:
         else:
             logging.debug("The package did not modify any file.\n")
 
-    def is_installed(self, packages):
+    def is_installed(self, packages: list[str]) -> bool:
         if not packages:
             return True
         retcode, output = self.run(["dpkg-query", "-f", "${Package} ${Status}\n", "-W"] + packages, ignore_errors=True)
@@ -1420,17 +1458,22 @@ class Chroot:
                 installed = False
         return installed
 
-    def install_packages(self, package_files, packages, with_scripts=True, reinstall=False):
+    def install_packages(
+        self, package_files: list[str], packages: list[str], with_scripts: bool = True, reinstall: bool = False
+    ) -> None:
         if package_files:
             self.install_package_files(package_files, packages, with_scripts=with_scripts)
         else:
             self.install_packages_by_name(packages, with_scripts=with_scripts, reinstall=reinstall)
 
-    def install_package_files(self, package_files, packages=None, with_scripts=False):
+    def install_package_files(
+        self, package_files: list[str], packages: list[str] | None = None, with_scripts: bool = False
+    ) -> None:
         if packages and settings.testdebs_repo:
             self.install_packages_by_name(packages, with_scripts=with_scripts)
             return
         if package_files:
+            assert settings.distro_config is not None
             # Check whether apt-get can install debs (supported since apt 1.1)
             #
             # If it can, this is preferable to the traditional
@@ -1492,8 +1535,9 @@ class Chroot:
 
             remove_files([self.relative(name) for name in tmp_files])
 
-    def install_packages_by_name(self, packages, with_scripts=True, reinstall=False):
+    def install_packages_by_name(self, packages: list[str], with_scripts: bool = True, reinstall: bool = False) -> None:
         if packages:
+            assert settings.distro_config is not None
             if with_scripts:
                 self.run_scripts("pre_install")
 
@@ -1512,7 +1556,14 @@ class Chroot:
             if with_scripts:
                 self.run_scripts("post_install")
 
-    def apt_get_install(self, to_install=[], to_remove=[], to_purge=[], flags=[], reinstall=False):
+    def apt_get_install(
+        self,
+        to_install: list[str] = [],
+        to_remove: list[str] = [],
+        to_purge: list[str] = [],
+        flags: list[str] = [],
+        reinstall: bool = False,
+    ) -> None:
         command = ["apt-get", "-y"] + flags + ["install"]
         if reinstall:
             command.append("--reinstall")
@@ -1521,11 +1572,11 @@ class Chroot:
         command.extend(["%s_" % x for x in unqualify(to_purge)])
         self.run(command)
 
-    def get_selections(self):
+    def get_selections(self) -> dict[str, tuple[str, str | None]]:
         """Get current package selections in a chroot."""
         # "${Status}" emits three columns, e.g. "install ok installed"
         # "${binary:Package}" requires a multi-arch dpkg, so fall back to "${Package}" on older versions
-        (status, output) = self.run(
+        (command_status, output) = self.run(
             ["dpkg-query", "-W", "-f", "${Status}\\t${binary:Package}\\t${Package}\\t${Version}\\n"]
         )
         vdict = {}
@@ -1540,22 +1591,26 @@ class Chroot:
             vdict[name] = (status, version)
         return vdict
 
-    def get_diversions(self):
+    def get_diversions(self) -> list[str] | None:
         """Get current dpkg-divert --list in a chroot."""
         if not settings.check_broken_diversions:
-            return
+            return None
         (status, output) = self.run(["dpkg-divert", "--list"])
         return output.split("\n")
 
-    def get_modified_diversions(self, pre_install_diversions, post_install_diversions=None):
+    def get_modified_diversions(
+        self, pre_install_diversions: list[str], post_install_diversions: list[str] | None = None
+    ) -> tuple[list[str], list[str]]:
         """Check that diversions in chroot are identical (though potentially reordered)."""
         if post_install_diversions is None:
             post_install_diversions = self.get_diversions()
+            assert post_install_diversions is not None
         removed = [ln for ln in pre_install_diversions if ln not in post_install_diversions]
         added = [ln for ln in post_install_diversions if ln not in pre_install_diversions]
         return (removed, added)
 
-    def check_debsums(self):
+    def check_debsums(self) -> None:
+        assert self.name is not None
         (status, output) = run(["debsums", "--root", self.name, "-ac", "--ignore-obsolete"], ignore_errors=True)
         if status != 0:
             logging.error(
@@ -1565,8 +1620,9 @@ class Chroot:
             if not settings.warn_on_debsums_errors:
                 panic()
 
-    def check_adequate(self, packages):
+    def check_adequate(self, packages: list[str]) -> None:
         """Run adequate and categorize output according to our needs."""
+        assert self.name is not None
         packages = unqualify([p for p in packages if not p.endswith("=None")])
         if packages and settings.adequate and os.path.isfile("/usr/bin/adequate"):
             (status, output) = run(["dpkg-query", "-f", "${Version}\n", "-W", "adequate"], ignore_errors=True)
@@ -1592,7 +1648,7 @@ class Chroot:
                 "obsolete-conffile",
                 "broken-symlink",
             ]
-            ignored_tags = []
+            ignored_tags: list[str] = []
             (status, output) = run(["adequate", "--root", self.name] + packages, ignore_errors=True)
             for tag in ignored_tags:
                 # ignore some tags
@@ -1631,7 +1687,8 @@ class Chroot:
                 if not settings.warn_if_inadequate:
                     panic()
 
-    def list_paths_with_symlinks(self):
+    def list_paths_with_symlinks(self) -> None:
+        assert self.name is not None
         file_owners = self.get_files_owned_by_packages()
         bad = []
         overwrites = False
@@ -1676,7 +1733,9 @@ class Chroot:
             else:
                 logging.info(msg)
 
-    def remove_packages(self, packages, allow_remove_essential=True, ignore_errors=False):
+    def remove_packages(
+        self, packages: list[str], allow_remove_essential: bool = True, ignore_errors: bool = False
+    ) -> None:
         """Remove packages in a chroot."""
         base_command = ["apt-get", "remove"]
         if allow_remove_essential:
@@ -1687,12 +1746,12 @@ class Chroot:
                 ignore_errors=ignore_errors,
             )
 
-    def purge_packages(self, packages, ignore_errors=False):
+    def purge_packages(self, packages: typing.Iterable[str], ignore_errors: bool = False) -> None:
         """Purge packages in a chroot."""
         if packages:
             self.run(["dpkg", "--purge"] + unqualify(packages), ignore_errors=ignore_errors)
 
-    def restore_selections(self, reference_chroot_state, packages_qualified):
+    def restore_selections(self, reference_chroot_state: StateMetaData, packages_qualified: list[str]) -> None:
         """Restore package selections in a chroot to the state in
         'reference_chroot_state'."""
 
@@ -1772,7 +1831,7 @@ class Chroot:
         self.run(["dpkg", "--purge", "--pending"])
         self.run(["dpkg", "--remove", "--pending"])
 
-    def get_tree_meta_data(self):
+    def get_tree_meta_data(self) -> dict[str, FileInfo]:
         """Return the filesystem meta data for all objects in the chroot."""
         self.run(["apt-get", "clean"])
         logging.debug("Recording chroot state")
@@ -1783,8 +1842,8 @@ class Chroot:
                 (usr, x, uid) = line.split(":")[0:3]
                 uidmap[int(uid)] = usr
         gidmap = {}
-        with open(self.relative("etc/group"), "r") as group:
-            for line in group:
+        with open(self.relative("etc/group"), "r") as groupfile:
+            for line in groupfile:
                 (grp, x, gid) = line.split(":")[0:3]
                 gidmap[int(gid)] = grp
         vdict = {}
@@ -1815,24 +1874,25 @@ class Chroot:
                 vdict[name[len(root) :]] = FileInfo(st, target, user, group)
         return vdict
 
-    def get_state_meta_data(self):
-        chroot_state = {}
-        chroot_state["initial_selections"] = self.initial_selections
-        chroot_state["avail_md5"] = self.avail_md5_history
-        chroot_state["tree"] = self.get_tree_meta_data()
-        chroot_state["selections"] = self.get_selections()
-        chroot_state["diversions"] = self.get_diversions()
-        return chroot_state
+    def get_state_meta_data(self) -> StateMetaData:
+        return {
+            "initial_selections": self.initial_selections,
+            "avail_md5": self.avail_md5_history,
+            "tree": self.get_tree_meta_data(),
+            "selections": self.get_selections(),
+            "diversions": self.get_diversions(),
+        }
 
-    def relative(self, pathname):
+    def relative(self, pathname: str) -> str:
+        assert self.name is not None
         if pathname.startswith("/"):
             return os.path.join(self.name, pathname[1:])
         return os.path.join(self.name, pathname)
 
-    def get_files_owned_by_packages(self):
+    def get_files_owned_by_packages(self) -> dict[str, list[str]]:
         """Return dict[filename] = [packagenamelist]."""
         vdir = self.relative("var/lib/dpkg/info")
-        vdict = {}
+        vdict: dict[str, list[str]] = {}
         for basename in os.listdir(vdir):
             if basename.endswith(".list"):
                 pkg = basename[: -len(".list")]
@@ -1844,12 +1904,13 @@ class Chroot:
                         vdict[pathname] = [pkg]
         return vdict
 
-    def check_for_no_processes(self, fail=None):
+    def check_for_no_processes(self, fail: bool | None = None) -> None:
         """Check there are no processes running inside the chroot."""
         if settings.docker_image:
             (status, output) = run(["docker", "top", self.docker_container])
             count = len(output.strip().split("\n")) - 2  # header + bash launched on container creation
         else:
+            assert self.name is not None
             (status, output) = run(["lsof", "-w", "+D", self.name], ignore_errors=True)
             count = len(output.split("\n")) - 1
         if count > 0:
@@ -1862,11 +1923,12 @@ class Chroot:
                 self.terminate_running_processes()
                 panic()
 
-    def terminate_running_processes(self):
+    def terminate_running_processes(self) -> None:
         """Terminate all processes running in the chroot."""
         if settings.docker_image:
             # Docker takes care of this
             return
+        assert self.name is not None
         seen = []
         while True:
             p = subprocess.Popen(
@@ -1895,14 +1957,17 @@ class Chroot:
     # If /selinux is present, assume that this is the only supported
     # location by libselinux. Otherwise use the new location.
     # /selinux was shipped by the libselinux package until wheezy.
-    def selinuxfs_path(self):
+    def selinuxfs_path(self) -> str:
         if os.path.isdir(self.relative("/selinux")):
             return "/selinux"
         else:
             return "/sys/fs/selinux"
 
-    def mount(self, source, path, fstype=None, opts=None, no_mkdir=False):
+    def mount(
+        self, source: str, path: str, fstype: str | None = None, opts: list[str] | None = None, no_mkdir: bool = False
+    ) -> None:
         """Mount something into the chroot and remember it for unmount_all()."""
+        assert self.name is not None
         if opts is None:
             opts = []
         path = canonicalize_path(self.name, path)
@@ -1922,7 +1987,7 @@ class Chroot:
         run(command)
         self.mounts.append(fullpath)
 
-    def unmount_all(self):
+    def unmount_all(self) -> None:
         """Unmount everything we mount()ed into the chroot."""
 
         # Workaround to unmount /proc/sys/fs/binfmt_misc which is mounted by
@@ -1935,7 +2000,7 @@ class Chroot:
         for mountpoint in reversed(self.mounts):
             run(["umount", mountpoint], ignore_errors=True)
 
-    def mount_proc(self):
+    def mount_proc(self) -> None:
         """Mount /proc etc. inside chroot."""
         self.mount("proc", "/proc", fstype="proc")
         etcmtab = self.relative("etc/mtab")
@@ -1969,7 +2034,7 @@ class Chroot:
         if selinux_enabled():
             self.mount("/sys/fs/selinux", self.selinuxfs_path(), opts=["bind", "ro"])
 
-    def is_ignored(self, pathname, info="PATH", quiet=False):
+    def is_ignored(self, pathname: str, info: str = "PATH", quiet: bool = False) -> bool:
         """Is a file (or dir or whatever) to be ignored?"""
         if pathname in settings.ignored_files:
             return True
@@ -1995,7 +2060,13 @@ class Chroot:
                 return True
         return False
 
-    def check_files_moved_usr(self, packages=[], files_before={}, files_after={}, warn_only=None):
+    def check_files_moved_usr(
+        self,
+        packages: list[str] = [],
+        files_before: dict[str, list[str]] = {},
+        files_after: dict[str, list[str]] = {},
+        warn_only: bool = False,
+    ) -> None:
         """Check that no files were moved from /{bin|sbin|lib*} and /usr/{bin|sbin|lib*}"""
 
         if settings.warn_on_usr_move == "disabled":
@@ -2041,8 +2112,9 @@ class Chroot:
         else:
             logging.debug("No file moved between /{bin|sbin|lib*} and /usr/{bin|sbin|lib*}.")
 
-    def check_for_broken_symlinks(self, warn_only=None, file_owners={}):
+    def check_for_broken_symlinks(self, warn_only: bool = False, file_owners: dict[str, list[str]] = {}) -> None:
         """Check that all symlinks in chroot are non-broken."""
+        assert self.name is not None
         if not settings.check_broken_symlinks:
             return
         broken = []
@@ -2073,7 +2145,7 @@ class Chroot:
         else:
             logging.debug("No broken symlinks as far as we can find.")
 
-    def check_if_cronfiles(self, packages):
+    def check_if_cronfiles(self, packages: list[str]) -> list[str]:
         """Check if the packages have cron files under /etc/cron.d and in case positive,
         it returns the list of files."""
 
@@ -2099,7 +2171,7 @@ class Chroot:
 
         return vlist
 
-    def check_output_cronfiles(self, list):
+    def check_output_cronfiles(self, list: list[str]) -> None:
         """Check if a given list of cronfiles has any output. Executes
         cron file as cron would do (except for SHELL)"""
         failed = False
@@ -2115,7 +2187,7 @@ class Chroot:
         if failed:
             panic()
 
-    def check_if_logrotatefiles(self, packages):
+    def check_if_logrotatefiles(self, packages: list[str]) -> list[str]:
         """Check if the packages have logrotate files under /etc/logrotate.d and in case positive,
         it returns the list of files."""
 
@@ -2137,7 +2209,7 @@ class Chroot:
 
         return vlist
 
-    def install_logrotate(self):
+    def install_logrotate(self) -> typing.Iterable[str]:
         """Install logrotate for check_output_logrotatefiles, and return the
         list of packages that were installed"""
         old_selections = self.get_selections()
@@ -2148,7 +2220,7 @@ class Chroot:
         diff = diff_selections(self, old_selections)
         return diff.keys()
 
-    def check_output_logrotatefiles(self, list):
+    def check_output_logrotatefiles(self, list: list[str]) -> None:
         """Check if a given list of logrotatefiles has any output. Executes
         logrotate file as logrotate would do from cron (except for SHELL)"""
         failed = False
@@ -2164,7 +2236,7 @@ class Chroot:
         if failed:
             panic()
 
-    def run_scripts(self, step, ignore_errors=False):
+    def run_scripts(self, step: str, ignore_errors: bool = False) -> int:
         """Run custom scripts to given step post-install|remove|purge"""
 
         errorcodes = 0
@@ -2184,16 +2256,17 @@ class Chroot:
         return errorcodes
 
 
-def selinux_enabled(enabled_test="/usr/sbin/selinuxenabled"):
+def selinux_enabled(enabled_test: str = "/usr/sbin/selinuxenabled") -> bool | None:
     if os.access(enabled_test, os.X_OK):
         retval, output = run([enabled_test], ignore_errors=True)
         if retval == 0:
             return True
         else:
             return False
+    return None
 
 
-def objects_are_different(obj1, obj2):
+def objects_are_different(obj1: FileInfo, obj2: FileInfo) -> bool:
     """Are filesystem objects different based on their meta data?"""
     if (
         obj1.st.st_mode != obj2.st.st_mode
@@ -2207,7 +2280,7 @@ def objects_are_different(obj1, obj2):
     return False
 
 
-def format_object_attributes(obj):
+def format_object_attributes(obj: FileInfo) -> str:
     st = obj.st
     ft = ""
     if stat.S_ISDIR(st.st_mode):
@@ -2228,7 +2301,16 @@ def format_object_attributes(obj):
     return res
 
 
-def diff_meta_data(tree1, tree2, is_ignored, quiet=False):
+class IsIgnored(typing.Protocol):
+    def __call__(self, pathname: str, info: str, quiet: bool) -> bool: ...
+
+
+def diff_meta_data(
+    tree1: dict[str, FileInfo],
+    tree2: dict[str, FileInfo],
+    is_ignored: IsIgnored,
+    quiet: bool = False,
+) -> tuple[list[tuple[str, FileInfo]], list[tuple[str, FileInfo]], list[tuple[str, FileInfo]]]:
     """Compare two dir trees and return list of new files (only in 'tree2'),
     removed files (only in 'tree1'), and modified files."""
 
@@ -2277,7 +2359,7 @@ def diff_meta_data(tree1, tree2, is_ignored, quiet=False):
     return new, removed, modified
 
 
-def file_list(meta_infos, file_owners):
+def file_list(meta_infos: list[tuple[str, FileInfo]], file_owners: dict[str, list[str]]) -> str:
     """Return list of indented filenames."""
     meta_infos = sorted(meta_infos[:])
     vlist = []
@@ -2297,7 +2379,7 @@ def file_list(meta_infos, file_owners):
     return "".join(vlist)
 
 
-def offending_packages(meta_infos, file_owners):
+def offending_packages(meta_infos: list[tuple[str, FileInfo]], file_owners: dict[str, list[str]]) -> set[str]:
     """Return a Set of offending packages."""
     pkgset = set()
     for name, data in meta_infos:
@@ -2307,7 +2389,9 @@ def offending_packages(meta_infos, file_owners):
     return pkgset
 
 
-def prune_files_list(files, depsfiles):
+def prune_files_list(
+    files: list[tuple[str, FileInfo]], depsfiles: list[tuple[str, FileInfo]]
+) -> list[tuple[str, FileInfo]]:
     """Remove elements from 'files' that are in 'depsfiles', and return the
     list of removed elements.
     """
@@ -2320,12 +2404,12 @@ def prune_files_list(files, depsfiles):
     return warn
 
 
-def diff_selections(chroot, selections):
+def diff_selections(chroot: Chroot, selections: dict[str, tuple[str, str | None]]) -> dict[str, tuple[str, str | None]]:
     """Compare original and current package selection.
     Return dict where dict[package_name] = original_status, that is,
     the value in the dict is the state that the package needs to be
     set to to restore original selections."""
-    changes = {}
+    changes: dict[str, tuple[str, str | None]] = {}
     current = chroot.get_selections()
     for name, (value, version) in current.items():
         if name not in selections:
@@ -2338,7 +2422,7 @@ def diff_selections(chroot, selections):
     return changes
 
 
-def get_package_names_from_package_files(package_files):
+def get_package_names_from_package_files(package_files: list[str]) -> list[str]:
     """Return list of package names given list of package file names."""
     vlist = []
     for filename in package_files:
@@ -2362,7 +2446,7 @@ def get_package_names_from_package_files(package_files):
 # from the 'Files' stanza.
 
 
-def process_changes(changes):
+def process_changes(changes: str) -> list[str] | None:
     # Determine the path to the changes file, then check if it's readable.
     dir_path = ""
     changes_path = ""
@@ -2373,7 +2457,7 @@ def process_changes(changes):
         changes_path = os.path.abspath(changes)
     if not os.access(changes_path, os.R_OK):
         logging.warning(changes_path + " is not readable. Skipping.")
-        return
+        return None
 
     # Determine the packages in the changes file through the 'Files' stanza.
     field = "Files"
@@ -2404,7 +2488,12 @@ def process_changes(changes):
     return package_list
 
 
-def check_results(chroot, chroot_state, file_owners, deps_info=None):
+def check_results(
+    chroot: Chroot,
+    chroot_state: StateMetaData,
+    file_owners: dict[str, list[str]],
+    deps_info: dict[str, FileInfo] | None = None,
+) -> bool:
     """Check that current chroot state matches 'chroot_state'.
 
     If settings.warn_on_others is True and deps_info is not None, then only
@@ -2418,15 +2507,18 @@ def check_results(chroot, chroot_state, file_owners, deps_info=None):
     reference_info = chroot_state["tree"]
     ok = True
     if settings.check_broken_diversions:
-        (removed, added) = chroot.get_modified_diversions(chroot_state["diversions"])
-        if added:
+        assert chroot_state["diversions"] is not None
+        (removed_diversions, added_diversions) = chroot.get_modified_diversions(chroot_state["diversions"])
+        if added_diversions:
             logging.error(
-                "FAIL: Installed diversions (dpkg-divert) not removed by purge:\n%s" % indent_string("\n".join(added))
+                "FAIL: Installed diversions (dpkg-divert) not removed by purge:\n%s"
+                % indent_string("\n".join(added_diversions))
             )
             ok = False
-        if removed:
+        if removed_diversions:
             logging.error(
-                "FAIL: Existing diversions (dpkg-divert) removed/modified:\n%s" % indent_string("\n".join(removed))
+                "FAIL: Existing diversions (dpkg-divert) removed/modified:\n%s"
+                % indent_string("\n".join(removed_diversions))
             )
             ok = False
 
@@ -2484,7 +2576,13 @@ def check_results(chroot, chroot_state, file_owners, deps_info=None):
     return ok
 
 
-def install_purge_test(chroot, chroot_state, package_files, packages, extra_packages):
+def install_purge_test(
+    chroot: Chroot,
+    chroot_state: StateMetaData,
+    package_files: list[str],
+    packages: list[str],
+    extra_packages: list[str],
+) -> bool:
     """Do an install-purge test. Return True if successful, False if not.
     Assume 'root' is a directory already populated with a working
     chroot, with packages in states given by 'selections'."""
@@ -2508,6 +2606,7 @@ def install_purge_test(chroot, chroot_state, package_files, packages, extra_pack
 
     if settings.warn_on_others or settings.install_purge_install:
         # Create a metapackage with dependencies from the given packages
+        control_infos: typing.Iterable[deb822.Deb822]
         if package_files:
             control_infos = []
             # We were given package files, so let's get the Depends and
@@ -2561,7 +2660,7 @@ def install_purge_test(chroot, chroot_state, package_files, packages, extra_pack
             "piuparts-depends-dummy", depends=all_depends, conflicts=all_conflicts, arch=arch
         )
 
-        def cleanup_metapackage():
+        def cleanup_metapackage() -> None:
             shutil.rmtree(os.path.dirname(metapackage))
 
         panic_handler_id = do_on_panic(cleanup_metapackage)
@@ -2637,7 +2736,9 @@ def install_purge_test(chroot, chroot_state, package_files, packages, extra_pack
     return check_results(chroot, chroot_state, file_owners, deps_info=deps_info)
 
 
-def install_upgrade_test(chroot, chroot_state, package_files, packages, old_packages):
+def install_upgrade_test(
+    chroot: Chroot, chroot_state: StateMetaData, package_files: list[str], packages: list[str], old_packages: list[str]
+) -> bool:
     """Install old_packages via apt-get, then upgrade from package files.
     Return True if successful, False if not."""
 
@@ -2685,21 +2786,21 @@ def install_upgrade_test(chroot, chroot_state, package_files, packages, old_pack
     return check_results(chroot, chroot_state, file_owners_after)
 
 
-def save_meta_data(filename, chroot_state):
+def save_meta_data(filename: str, chroot_state: StateMetaData) -> None:
     """Save directory tree meta data into a file for fast access later."""
     logging.debug("Saving chroot meta data to %s" % filename)
     with open(filename, "wb") as f:
         pickle.dump(chroot_state, f)
 
 
-def load_meta_data(filename):
+def load_meta_data(filename: str) -> StateMetaData:
     """Load meta data saved by 'save_meta_data'."""
     logging.debug("Loading chroot meta data from %s" % filename)
     with open(filename, "rb") as f:
         return pickle.load(f)
 
 
-def install_and_upgrade_between_distros(package_files, packages_qualified):
+def install_and_upgrade_between_distros(package_files: list[str], packages_qualified: list[str]) -> bool:
     """Install package and upgrade it between distributions, then remove.
     Return True if successful, False if not."""
 
@@ -2863,14 +2964,14 @@ def install_and_upgrade_between_distros(package_files, packages_qualified):
     return result
 
 
-def parse_mirror_spec(str, defaultcomponents=[]):
+def parse_mirror_spec(str: str, defaultcomponents: list[str] = []) -> tuple[str, list[str]]:
     """Parse a mirror specification from the --mirror option argument.
     Return (mirror, componentslist)."""
     parts = str.split()
     return parts[0], parts[1:] or defaultcomponents[:]
 
 
-def find_default_debian_mirrors():
+def find_default_debian_mirrors() -> list[tuple[str, list[str]]]:
     """Find the default Debian mirrors."""
     mirrors = []
     try:
@@ -2881,11 +2982,19 @@ def find_default_debian_mirrors():
                 mirrors.append((parts[1], parts[3:]))
                 break  # Only use the first one, at least for now.
     except IOError:
-        return None
+        return []
     return mirrors
 
 
-def forget_ignores(option, opt, value, parser, *args, **kwargs):
+def forget_ignores(
+    option: optparse.Option,
+    opt: str,
+    value: typing.Any,
+    parser: optparse.OptionParser,
+    *args: typing.Any,
+    **kwargs: typing.Any,
+) -> None:
+    assert parser.values is not None
     settings.bindmounts = []
     parser.values.ignore = []
     parser.values.ignore_regex = []
@@ -2893,11 +3002,19 @@ def forget_ignores(option, opt, value, parser, *args, **kwargs):
     settings.ignored_patterns = []
 
 
-def set_basetgz_to_pbuilder(option, opt, value, parser, *args, **kwargs):
+def set_basetgz_to_pbuilder(
+    option: optparse.Option,
+    opt: str,
+    value: typing.Any,
+    parser: optparse.OptionParser,
+    *args: typing.Any,
+    **kwargs: typing.Any,
+) -> None:
+    assert parser.values is not None
     parser.values.basetgz = "/var/cache/pbuilder/base.tgz"
 
 
-def parse_command_line():
+def parse_command_line() -> list[str]:
     """Parse the command line, change global settings, return non-options."""
 
     parser = optparse.OptionParser(usage="%prog [options] package ...", version="piuparts %s" % VERSION)
@@ -3069,7 +3186,10 @@ def parse_command_line():
         "--install-suggests", action="store_true", default=False, help="Enable the installation of Suggests."
     )
 
-    def keep_env_parser(option, opt_str, value, parser):
+    def keep_env_parser(
+        option: optparse.Option, opt_str: str, value: typing.Any, parser: optparse.OptionParser
+    ) -> None:
+        assert option.dest is not None
         setattr(parser.values, option.dest, True)
         if "--keep-tmpdir" == opt_str:
             print("WARNING `--keep-tmpdir` is deprecated, use `--keep-env` " "instead")
@@ -3315,7 +3435,10 @@ def parse_command_line():
     )
 
     parser.add_option(
-        "--skip-cronfiles-test", action="store_true", default=False, help="Skip testing the output from the cron files."
+        "--skip-cronfiles-test",
+        action="store_true",
+        default=False,
+        help="Skip testing the output from the cron files.",
     )
 
     parser.add_option(
@@ -3511,11 +3634,10 @@ def parse_command_line():
     settings.install_purge_install = opts.install_purge_install
     settings.install_remove_install = opts.install_remove_install
     settings.list_installed_files = opts.list_installed_files
-    [
+    for csv in opts.fake_essential_packages:
         settings.fake_essential_packages.extend([i.strip() for i in csv.split(",")])
-        for csv in opts.fake_essential_packages
-    ]
-    [settings.extra_old_packages.extend([i.strip() for i in csv.split(",")]) for csv in opts.extra_old_packages]
+    for csv in opts.extra_old_packages:
+        settings.extra_old_packages.extend([i.strip() for i in csv.split(",")])
     settings.skip_cronfiles_test = opts.skip_cronfiles_test
     settings.skip_logrotatefiles_test = opts.skip_logrotatefiles_test
     settings.adequate = not opts.no_adequate
@@ -3552,6 +3674,7 @@ def parse_command_line():
             settings.tmpdir = os.environ["TMPDIR"]
         else:
             settings.tmpdir = "/tmp"
+    assert settings.tmpdir is not None
 
     if not os.path.isdir(settings.tmpdir):
         logging.error("Temporary directory is not a directory: %s" % settings.tmpdir)
@@ -3594,12 +3717,12 @@ def parse_command_line():
     return args
 
 
-def get_chroot():
+def get_chroot() -> Chroot:
     return Chroot()
 
 
 # Process the packages given in a list
-def process_packages(package_list):
+def process_packages(package_list: list[str]) -> None:
     # Find the names of packages.
     if settings.args_are_package_files:
         packages = get_package_names_from_package_files(package_list)
@@ -3661,7 +3784,7 @@ def process_packages(package_list):
             panic()
 
 
-def main():
+def main() -> None:
     """Main program. But you knew that."""
 
     args = parse_command_line()
@@ -3721,6 +3844,7 @@ def main():
     for arg in args:
         if changes_p.match(arg):
             package_list = process_changes(arg)
+            assert package_list is not None
             if settings.single_changes_list:
                 for package in package_list:
                     regular_packages_list.append(package)


=====================================
piupartslib/__init__.py
=====================================
@@ -19,22 +19,27 @@
 
 import bz2
 import lzma
+import typing
 import urllib.error
 import urllib.request
 import zlib
 
 
+class Decompressor(typing.Protocol):
+    def decompress(self, chunk: bytes) -> bytes: ...
+
+
 class DecompressedStream:
-    def __init__(self, fileobj, decompressor=None):
+    def __init__(self, fileobj, decompressor: Decompressor | None = None):
         self._input = fileobj
         self._decompressor = decompressor
         self._buffer = ""
-        self._line_buffer = []
+        self._line_buffer: list[str] = []
         self._i = 0
         self._end = 0
         self._undecbuf = b""
 
-    def _split_decode(self, myb):
+    def _split_decode(self, myb: bytes) -> tuple[str, bytes]:
         lmyb = len(myb)
         for end in range(lmyb, max(lmyb - 6, -1), -1):
             try:
@@ -44,7 +49,7 @@ class DecompressedStream:
         # not returned yet? We have a problem
         raise UnicodeDecodeError
 
-    def _refill(self):
+    def _refill(self) -> bool:
         if self._input is None:
             return False
         while True:
@@ -62,10 +67,10 @@ class DecompressedStream:
             if chunk:
                 return True
 
-    def readline(self):
+    def readline(self) -> str:
         while not self._i < self._end:
             self._i = self._end = 0
-            self._line_buffer = None
+            self._line_buffer = []
             empty = not self._refill()
             if not self._buffer:
                 break
@@ -80,13 +85,13 @@ class DecompressedStream:
             return self._line_buffer[self._i - 1]
         return ""
 
-    def close(self):
+    def close(self) -> None:
         if self._input:
             self._input.close()
         self._input = self._decompressor = None
 
 
-def open_packages_url(url):
+def open_packages_url(url: str) -> tuple[str, DecompressedStream]:
     """Open a Packages.bz2 file pointed to by a URL"""
     socket = None
     error = None
@@ -98,17 +103,15 @@ def open_packages_url(url):
         else:
             break
     else:
+        assert error is not None
         raise error
     url = socket.geturl()
     if ext == ".bz2":
-        decompressor = bz2.BZ2Decompressor()
-        decompressed = DecompressedStream(socket, decompressor)
+        decompressed = DecompressedStream(socket, bz2.BZ2Decompressor())
     elif ext == ".gz":
-        decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
-        decompressed = DecompressedStream(socket, decompressor)
+        decompressed = DecompressedStream(socket, zlib.decompressobj(16 + zlib.MAX_WBITS))
     elif ext == ".xz":
-        decompressor = lzma.LZMADecompressor()
-        decompressed = DecompressedStream(socket, decompressor)
+        decompressed = DecompressedStream(socket, lzma.LZMADecompressor())
     elif ext == "":
         decompressed = socket
     else:


=====================================
piupartslib/conf.py
=====================================
@@ -33,17 +33,19 @@ import distro_info
 
 
 class MissingSection(Exception):
-    def __init__(self, filename, section):
+    def __init__(self, filename: str, section: str):
         self.args = ("Section %s not defined in configuration file %s" % (section, filename),)
 
 
 class MissingMandatorySetting(Exception):
-    def __init__(self, filename, key):
+    def __init__(self, filename: str, key: str):
         self.args = ("Value for %s not set in configuration file %s" % (key, filename),)
 
 
-class Config(UserDict):
-    def __init__(self, section, defaults, mandatory=[], defaults_section=None):
+class Config(UserDict[str, str]):
+    def __init__(
+        self, section: str, defaults: dict[str, str], mandatory: list[str] = [], defaults_section: str | None = None
+    ):
         UserDict.__init__(self)
         self._section = section
         self._defaults_section = defaults_section
@@ -51,7 +53,7 @@ class Config(UserDict):
             self[key] = value
         self._mandatory = mandatory
 
-    def read(self, filename):
+    def read(self, filename: str) -> None:
         cp = configparser.ConfigParser()
         cp.read(filename)
         if not cp.has_section(self._section):
@@ -64,17 +66,17 @@ class Config(UserDict):
             elif key in self._mandatory:
                 raise MissingMandatorySetting(filename, key)
 
-    def get_mirror(self, distro=None):
+    def get_mirror(self, distro: str | None = None) -> str:
         if self["mirror"] is not None:
             return self["mirror"]
         return "http://deb.debian.org/debian"
 
-    def get_distros(self):
+    def get_distros(self) -> list[str]:
         if self["upgrade-test-distros"] is not None:
             return self["upgrade-test-distros"].split()
         return []
 
-    def get_distro(self):
+    def get_distro(self) -> str | None:
         if self["distro"]:
             return self["distro"]
         distros = self.get_distros()
@@ -82,19 +84,19 @@ class Config(UserDict):
             return distros[-1]
         return None
 
-    def get_start_distro(self):
+    def get_start_distro(self) -> str:
         distros = self.get_distros()
         if distros:
             return distros[0]
         return self["distro"]
 
-    def get_final_distro(self):
+    def get_final_distro(self) -> str:
         distros = self.get_distros()
         if distros:
             return distros[-1]
         return self["distro"]
 
-    def _get_distmap(self):
+    def _get_distmap(self) -> defaultdict[str, str]:
         debdist = distro_info.DebianDistroInfo()
 
         # start with e.g. "sid" -> "unstable"
@@ -120,32 +122,33 @@ class Config(UserDict):
 
         return distmap
 
-    def _map_distro(self, distro):
+    def _map_distro(self, distro: str) -> str:
         distro_root = re.split("[-.]", distro)[0]
         distmap = self._get_distmap()
         return distmap[distro_root]
 
-    def get_std_distro(self, distrolist=[]):
+    def get_std_distro(self, distrolist: list[str] = []) -> str:
         if not distrolist:
             distrolist = [self.get_distro()] + self.get_distros()
         mappedlist = [self._map_distro(x) for x in distrolist]
         return reduce(lambda x, y: y if y != "unknown" else x, mappedlist)
 
-    def get_area(self):
+    def get_area(self) -> str:
         if self["area"] is not None:
             return self["area"]
         return "main"
 
-    def get_arch(self):
+    def get_arch(self) -> str:
         if not self["arch"]:
             # Try to figure it out ourselves, using dpkg
             p = subprocess.Popen(["dpkg", "--print-architecture"], stdout=subprocess.PIPE)
+            assert p.stdout is not None
             self["arch"] = p.stdout.read().decode().rstrip()
         return self["arch"]
 
 
-class DistroConfig(UserDict):
-    def __init__(self, filename, mirror):
+class DistroConfig(UserDict[str, dict[str, str | None]]):
+    def __init__(self, filename: str, mirror: str):
         UserDict.__init__(self)
         self._mirror = mirror
         self._defaults = {
@@ -164,31 +167,32 @@ class DistroConfig(UserDict):
                 if cp.has_option(section, key):
                     self[section][key] = cp.get(section, key)
 
-    def get(self, section, key=None):
-        if section not in self.keys():
-            self[section] = dict(self._defaults, distribution=section)
-        if key is not None:
-            return self[section][key]
-        return self[section]
+    def get_field(self, section: str, key: str) -> str | None:
+        try:
+            sectdict = self[section]
+        except KeyError:
+            sectdict = dict(self._defaults, distribution=section)
+            self[section] = sectdict
+        return sectdict[key]
 
-    def _is_virtual(self, distro):
-        uri = self.get(distro, "uri")
+    def _is_virtual(self, distro: str) -> bool:
+        uri = self.get_field(distro, "uri")
         return uri is not None and uri == "None"
 
-    def get_mirror(self, distro):
+    def get_mirror(self, distro: str) -> str:
         if self._is_virtual(distro):
             distro = self._expand_depends(distro)[0]
-        return self.get(distro, "uri") or self._mirror
+        return self.get_field(distro, "uri") or self._mirror
 
-    def get_distribution(self, distro):
+    def get_distribution(self, distro: str) -> str:
         if self._is_virtual(distro):
             distro = self._expand_depends(distro)[0]
-        return self.get(distro, "distribution") or distro
+        return self.get_field(distro, "distribution") or distro
 
-    def get_candidates(self, distro):
-        return (self.get(distro, "candidates") or "").split() or [distro]
+    def get_candidates(self, distro: str) -> list[str]:
+        return (self.get_field(distro, "candidates") or "").split() or [distro]
 
-    def _get_packages_url(self, distro, area, arch):
+    def _get_packages_url(self, distro: str, area: str, arch: str) -> str:
         return "%s/dists/%s/%s/binary-%s/Packages" % (
             self.get_mirror(distro),
             self.get_distribution(distro),
@@ -196,26 +200,26 @@ class DistroConfig(UserDict):
             arch,
         )
 
-    def get_packages_urls(self, distro, area, arch):
+    def get_packages_urls(self, distro: str, area: str, arch: str) -> list[str]:
         return [self._get_packages_url(d, area, arch) for d in self.get_candidates(distro)]
 
-    def _get_sources_url(self, distro, area):
+    def _get_sources_url(self, distro: str, area: str) -> str:
         return "%s/dists/%s/%s/source/Sources" % (
             self.get_mirror(distro),
             self.get_distribution(distro),
             area,
         )
 
-    def get_sources_urls(self, distro, area):
+    def get_sources_urls(self, distro: str, area: str) -> list[str]:
         return [self._get_sources_url(d, area) for d in self.get_candidates(distro)]
 
-    def get_target_flags(self, distro):
-        tr = self.get(distro, "target-release")
+    def get_target_flags(self, distro: str) -> list[str]:
+        tr = self.get_field(distro, "target-release")
         if tr:
             return ["-t", tr]
         return []
 
-    def _expand_depends(self, distro, include_virtual=False):
+    def _expand_depends(self, distro: str, include_virtual: bool = False) -> list[str]:
         todo = [distro]
         done = []
         seen = []
@@ -224,25 +228,26 @@ class DistroConfig(UserDict):
             todo = todo[1:]
             if curr not in seen:
                 seen.append(curr)
-                todo = (self.get(curr, "depends") or "").split() + [curr] + todo
+                todo = (self.get_field(curr, "depends") or "").split() + [curr] + todo
             elif curr not in done:
                 if include_virtual or not self._is_virtual(curr):
                     done.append(curr)
         assert len(done) > 0
         return done
 
-    def get_deb_lines(self, distro, components):
+    def get_deb_lines(self, distro: str, components: list[str]) -> list[str]:
         lines = []
         for d in self._expand_depends(distro):
+            d_components = self[d]["components"]
             for c in components:
-                if self[d]["components"] is None or c in self[d]["components"].split():
+                if d_components is None or c in d_components.split():
                     lines.append("deb %s %s %s" % (self.get_mirror(d), self.get_distribution(d), c))
         return lines
 
-    def get_basetgz(self, distro, arch, merged_usr=True):
+    def get_basetgz(self, distro: str, arch: str, merged_usr: bool = True) -> str | None:
         # look for the first base distribution
         for d in self._expand_depends(distro):
-            if self.get(d, "depends"):
+            if self.get_field(d, "depends"):
                 next  # skip partial distro
             return "%s%s_%s.tar.gz" % (
                 self.get_distribution(d),



View it on GitLab: https://salsa.debian.org/debian/piuparts/-/commit/cd0c859fd5204b7137a9849e19024a44d4b6a3eb

-- 
View it on GitLab: https://salsa.debian.org/debian/piuparts/-/commit/cd0c859fd5204b7137a9849e19024a44d4b6a3eb
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/piuparts-devel/attachments/20250721/d4a9f896/attachment-0001.htm>


More information about the Piuparts-devel mailing list