[Piuparts-devel] [Git][debian/piuparts][helmutg/typehints] add Python type hints to piuparts.py and its dependencies
Helmut Grohne (@helmutg)
gitlab at salsa.debian.org
Mon Jul 21 07:58:52 BST 2025
Helmut Grohne pushed to branch helmutg/typehints at Debian / piuparts
Commits:
cd0c859f by Helmut Grohne at 2025-07-21T08:58:40+02:00
add Python type hints to piuparts.py and its dependencies
In order to fix a few type errors, some semantic changes are included.
* Methods of the Defaults class raise NotImplementedError rather than
returning None.
* The settings class assigns default values for apt_unauthenticated,
distro_config and testobjects in its constructor.
* Renamed a few variables that were used with different types.
* Added a number of assert something is not None to guide mypy.
* get_state_meta_data constructs its result at once.
* TimeOffsetFormatter.formatTime signature updated to match super
class: Added default value.
* The type of the warn_only argument changed to bool in two methods.
* Added missing return statements and None values to existing return
statements.
* find_default_debian_mirrors returns [] when it previously returned
None.
* Rewrote DistroConfig.get as get_field to avoid conflicting with the
super class type.
* DistroConfig.get_deb_lines caches a lookup to help mypy track state.
* unqualify always returns a list now.
- - - - -
3 changed files:
- piuparts.py
- piupartslib/__init__.py
- piupartslib/conf.py
Changes:
=====================================
piuparts.py
=====================================
@@ -47,11 +47,11 @@ import sys
import tempfile
import time
import traceback
+import typing
import uuid
from collections import defaultdict, namedtuple
from contextlib import ExitStack
from signal import SIGALRM, SIGKILL, SIGTERM, alarm, signal
-from typing import Dict, Tuple
import apt_pkg
import distro_info
@@ -77,56 +77,60 @@ class Defaults:
"""
- def get_components(self):
+ def get_components(self) -> list[str]:
"""Return list of default components for a mirror."""
+ raise NotImplementedError
- def get_mirror(self):
+ def get_mirror(self) -> list[tuple[str, list[str]]]:
"""Return default mirror."""
+ raise NotImplementedError
- def get_distribution(self):
+ def get_distribution(self) -> list[str]:
"""Return default distribution."""
+ raise NotImplementedError
- def get_keyring(self):
+ def get_keyring(self) -> str:
"""Return default keyring."""
+ raise NotImplementedError
class DebianDefaults(Defaults):
- def get_components(self):
+ def get_components(self) -> list[str]:
return ["main", "contrib", "non-free", "non-free-firmware"]
- def get_mirror(self):
+ def get_mirror(self) -> list[tuple[str, list[str]]]:
return [("http://deb.debian.org/debian", self.get_components())]
- def get_distribution(self):
+ def get_distribution(self) -> list[str]:
return [distro_info.DebianDistroInfo().devel()]
- def get_keyring(self):
+ def get_keyring(self) -> str:
return "/usr/share/keyrings/debian-archive-keyring.gpg"
class UbuntuDefaults(Defaults):
- def get_components(self):
+ def get_components(self) -> list[str]:
return ["main", "universe", "restricted", "multiverse"]
- def get_mirror(self):
+ def get_mirror(self) -> list[tuple[str, list[str]]]:
return [("http://archive.ubuntu.com/ubuntu", self.get_components())]
- def get_distribution(self):
+ def get_distribution(self) -> list[str]:
return [distro_info.UbuntuDistroInfo().devel()]
- def get_keyring(self):
+ def get_keyring(self) -> str:
return "/usr/share/keyrings/ubuntu-archive-keyring.gpg"
class DefaultsFactory:
"""Instantiate the right defaults class."""
- def guess_flavor(self):
+ def guess_flavor(self) -> str:
p = subprocess.Popen(["lsb_release", "-i", "-s"], stdout=subprocess.PIPE, universal_newlines=True)
stdout, stderr = p.communicate()
return stdout.strip().lower()
- def new_defaults(self):
+ def new_defaults(self) -> Defaults:
if not settings.defaults:
settings.defaults = self.guess_flavor()
if settings.defaults.lower() == "debian":
@@ -140,9 +144,9 @@ class DefaultsFactory:
class Settings:
"""Global settings for this program."""
- def __init__(self):
- self.defaults = None
- self.tmpdir = None
+ def __init__(self) -> None:
+ self.defaults: str | None = None
+ self.tmpdir: str | None = None
self.keep_env = False
self.shell_on_error = False
self.max_command_output_size = (
@@ -154,13 +158,13 @@ class Settings:
self.args_are_package_files = True
# distro setup
self.proxy = None
- self.debian_mirrors = []
- self.extra_repos = []
+ self.debian_mirrors: list[tuple[str, list[str]]] = []
+ self.extra_repos: list[str] = []
self.testdebs_repo = None
- self.debian_distros = []
- self.bootstrapcmd = []
+ self.debian_distros: list[str] = []
+ self.bootstrapcmd: list[str] = []
self.keep_sources_list = False
- self.keyring = None
+ self.keyring: str | None = None
self.do_not_verify_signatures = False
self.no_check_valid_until = False
self.install_recommends = False
@@ -168,8 +172,8 @@ class Settings:
self.eatmydata = True
self.dpkg_force_unsafe_io = True
self.dpkg_force_confdef = False
- self.scriptsdirs = []
- self.bindmounts = []
+ self.scriptsdirs: list[str] = []
+ self.bindmounts: list[str] = []
self.allow_database = False
self.update_retries = None
# chroot setup
@@ -185,7 +189,7 @@ class Settings:
self.save_end_meta = None
self.skip_minimize = True
self.minimize = False
- self.debfoster_options = None
+ self.debfoster_options: list[str] | None = None
self.docker_image = None
self.merged_usr = True
# tests and checks
@@ -196,8 +200,8 @@ class Settings:
self.install_remove_install = False
self.install_purge_install = False
self.list_installed_files = False
- self.fake_essential_packages = []
- self.extra_old_packages = []
+ self.fake_essential_packages: list[str] = []
+ self.extra_old_packages: list[str] = []
self.skip_cronfiles_test = False
self.skip_logrotatefiles_test = False
self.adequate = True
@@ -416,6 +420,9 @@ class Settings:
":/lib/modules/([^/]*/(modules.*)?)?",
]
self.non_pedantic_ignore_patterns = ["/tmp/.*"]
+ self.apt_unauthenticated = "No"
+ self.distro_config: piupartslib.conf.DistroConfig | None = None
+ self.testobjects: list[str] | None = None
settings = Settings()
@@ -425,7 +432,7 @@ on_panic_hooks = {}
counter = 0
-def do_on_panic(hook):
+def do_on_panic(hook: typing.Callable[[], None]) -> int:
global counter
cid = counter
counter += 1
@@ -433,16 +440,16 @@ def do_on_panic(hook):
return cid
-def dont_do_on_panic(id):
+def dont_do_on_panic(id: int) -> None:
del on_panic_hooks[id]
class TimeOffsetFormatter(logging.Formatter):
- def __init__(self, fmt=None, datefmt=None):
+ def __init__(self, fmt: str | None = None, datefmt: str | None = None):
self.startup_time = time.time()
logging.Formatter.__init__(self, fmt, datefmt)
- def formatTime(self, record, datefmt):
+ def formatTime(self, record: logging.LogRecord, datefmt: str | None = None) -> str:
t = time.time() - self.startup_time
t_min = int(t / 60)
t_sec = t % 60.0
@@ -450,36 +457,36 @@ class TimeOffsetFormatter(logging.Formatter):
DUMP = logging.DEBUG - 1
-HANDLERS = []
+HANDLERS: list[logging.Handler] = []
-def setup_logging(log_level, log_file_name):
+def setup_logging(log_level: int, log_file_name: str) -> None:
logging.addLevelName(DUMP, "DUMP")
logger = logging.getLogger()
logger.setLevel(log_level)
formatter = TimeOffsetFormatter("%(asctime)s %(levelname)s: %(message)s")
- handler = logging.StreamHandler(sys.stdout)
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- HANDLERS.append(handler)
+ shandler = logging.StreamHandler(sys.stdout)
+ shandler.setFormatter(formatter)
+ logger.addHandler(shandler)
+ HANDLERS.append(shandler)
if log_file_name:
- handler = logging.FileHandler(log_file_name)
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- HANDLERS.append(handler)
+ fhandler = logging.FileHandler(log_file_name)
+ fhandler.setFormatter(formatter)
+ logger.addHandler(fhandler)
+ HANDLERS.append(fhandler)
-def dump(msg):
+def dump(msg: str) -> None:
logger = logging.getLogger()
logger.log(DUMP, msg)
for handler in HANDLERS:
handler.flush()
-def panic(exit=1):
+def panic(exit: int = 1) -> typing.NoReturn:
for i in reversed(range(counter)):
if i in on_panic_hooks:
on_panic_hooks[i]()
@@ -487,27 +494,27 @@ def panic(exit=1):
sys.exit(exit)
-def indent_string(str):
+def indent_string(str: str) -> str:
"""Indent all lines in a string with two spaces and return result."""
return "\n".join([" " + line for line in str.split("\n")])
-def command2string(command):
+def command2string(command: list[str]) -> str:
"""Quote s.t. copy+paste from the logfile gives a runnable command in the shell."""
return " ".join([shlex.quote(arg) for arg in command])
-def unqualify(packages):
+def unqualify(packages: typing.Iterable[str]) -> list[str]:
if packages:
return [p.split("=", 1)[0].strip() for p in packages]
- return packages
+ return []
class Alarm(Exception):
pass
-def alarm_handler(signum, frame):
+def alarm_handler(signum, frame) -> typing.NoReturn:
raise Alarm
@@ -530,17 +537,18 @@ FILTERED_ENVVARS = {
}
-def get_clean_environment() -> Dict[str, str]:
+def get_clean_environment() -> dict[str, str]:
"""Get a cleaned up environment"""
+ assert settings.testobjects is not None
env = {k: v for k, v in os.environ.items() if k not in FILTERED_ENVVARS}
env["PIUPARTS_OBJECTS"] = " ".join(str(vobject) for vobject in settings.testobjects)
return env
-def run(command, ignore_errors=False, timeout=0):
+def run(command: list[str], ignore_errors: bool = False, timeout: int = 0) -> tuple[int, str]:
"""Run an external command and die with error message if it fails."""
- def kill_subprocess(p, reason):
+ def kill_subprocess(p: subprocess.Popen[str], reason: str) -> None:
logging.error("Terminating command due to %s" % reason)
p.terminate()
for i in range(10):
@@ -569,6 +577,7 @@ def run(command, ignore_errors=False, timeout=0):
errors="backslashreplace",
)
)
+ assert p.stdout is not None
output = ""
excessive_output = False
if timeout > 0:
@@ -610,14 +619,14 @@ def run(command, ignore_errors=False, timeout=0):
return p.returncode, output
-def create_temp_file():
+def create_temp_file() -> tuple[int, str]:
"""Create a temporary file and return its full path."""
(fd, path) = tempfile.mkstemp(dir=settings.tmpdir)
logging.debug("Created temporary file %s" % path)
return (fd, path)
-def create_file(filename, contents):
+def create_file(filename: str, contents: str) -> None:
"""Create a new file with the desired name and contents."""
try:
with open(filename, "w") as f:
@@ -627,12 +636,12 @@ def create_file(filename, contents):
panic()
-def readlines_file(filename):
+def readlines_file(filename: str) -> list[str]:
with open(filename, "r") as f:
return f.readlines()
-def remove_files(filenames):
+def remove_files(filenames: list[str]) -> None:
"""Remove some files."""
for filename in filenames:
logging.debug("Removing %s" % filename)
@@ -643,7 +652,7 @@ def remove_files(filenames):
panic()
-def make_metapackage(name, depends, conflicts, arch="all"):
+def make_metapackage(name: str, depends: str, conflicts: str, arch: str = "all") -> str:
"""Return the path to a .deb created just for satisfying dependencies
Caller is responsible for removing the temporary directory containing the
@@ -682,7 +691,7 @@ def make_metapackage(name, depends, conflicts, arch="all"):
return os.path.join(tmpdir, name + ".deb")
-def split_path(pathname):
+def split_path(pathname: str) -> list[str]:
parts = []
while pathname:
(head, tail) = os.path.split(pathname)
@@ -698,7 +707,15 @@ def split_path(pathname):
return parts
-def canonicalize_path(root, pathname, report_links=False):
+ at typing.overload
+def canonicalize_path(root: str, pathname: str, report_links: typing.Literal[False] = False) -> str: ...
+
+
+ at typing.overload
+def canonicalize_path(root: str, pathname: str, report_links: typing.Literal[True]) -> list[tuple[str, str]]: ...
+
+
+def canonicalize_path(root: str, pathname: str, report_links: bool = False) -> str | list[tuple[str, str]]:
"""Canonicalize a path name, simulating chroot at 'root'.
When resolving the symlink, pretend (similar to chroot) that
@@ -713,7 +730,7 @@ def canonicalize_path(root, pathname, report_links=False):
"""
# print("\nCANONICALIZE %s %s" % (root, pathname))
links = []
- seen = []
+ seen: list[str] = []
parts = split_path(pathname)
# print("PARTS ", list(reversed(parts)))
path = "/"
@@ -751,7 +768,7 @@ def canonicalize_path(root, pathname, report_links=False):
return path
-def is_broken_symlink(root, dirpath, filename):
+def is_broken_symlink(root: str, dirpath: str, filename: str) -> bool:
"""Is symlink dirpath+filename broken?"""
if dirpath[: len(root)] == root:
@@ -768,25 +785,33 @@ def is_broken_symlink(root, dirpath, filename):
FileInfo = namedtuple("FileInfo", ["st", "target", "user", "group"])
+class StateMetaData(typing.TypedDict):
+ initial_selections: dict[str, tuple[str, str | None]] | None
+ avail_md5: list[str]
+ tree: dict[str, FileInfo]
+ selections: dict[str, tuple[str, str | None]]
+ diversions: list[str] | None
+
+
class Chroot:
"""A chroot for testing things in."""
- def __init__(self):
- self.name = None
+ def __init__(self) -> None:
+ self.name: str | None = None
self.bootstrapped = False
- self.mounts = []
- self.initial_selections = None
- self.avail_md5_history = []
- self.systemd_tmpfiles: Dict[str, Tuple[str, str]] = {}
+ self.mounts: list[str] = []
+ self.initial_selections: dict[str, tuple[str, str | None]] | None = None
+ self.avail_md5_history: list[str] = []
+ self.systemd_tmpfiles: dict[str, tuple[str, str]] = {}
- def create_temp_dir(self):
+ def create_temp_dir(self) -> None:
"""Create a temporary directory for the chroot."""
self.name = tempfile.mkdtemp(dir=settings.tmpdir)
create_file(os.path.join(self.name, ".piuparts.tmpdir"), "chroot")
os.chmod(self.name, 0o755)
logging.debug("Created temporary directory %s" % self.name)
- def create(self, temp_tgz=None):
+ def create(self, temp_tgz: str | None = None) -> None:
"""Create a chroot according to user's wishes."""
self.panic_handler_id = do_on_panic(self.remove)
if not settings.schroot and not settings.docker_image:
@@ -845,7 +870,7 @@ class Chroot:
if settings.savetgz and not temp_tgz:
self.pack_into_tgz(settings.savetgz)
- def remove(self):
+ def remove(self) -> None:
"""Remove a chroot and all its contents."""
if not self.name:
return
@@ -877,10 +902,10 @@ class Chroot:
logging.debug("Keeping directory tree at %s" % self.name)
dont_do_on_panic(self.panic_handler_id)
- def was_bootstrapped(self):
+ def was_bootstrapped(self) -> bool:
return self.bootstrapped
- def create_temp_tgz_file(self):
+ def create_temp_tgz_file(self) -> str:
"""Return the path to a file to be used as a temporary tgz file"""
# Yes, create_temp_file() would work just as well, but putting it in
# the interface for Chroot allows the VirtServ hack to work.
@@ -888,14 +913,15 @@ class Chroot:
os.close(fd)
return temp_tgz
- def remove_temp_tgz_file(self, temp_tgz):
+ def remove_temp_tgz_file(self, temp_tgz: str) -> None:
"""Remove the file that was used as a temporary tgz file"""
# Yes, remove_files() would work just as well, but putting it in
# the interface for Chroot allows the VirtServ hack to work.
remove_files([temp_tgz])
- def pack_into_tgz(self, result):
+ def pack_into_tgz(self, result: str) -> None:
"""Tar and compress all files in the chroot."""
+ assert self.name is not None
self.run(["apt-get", "clean"])
logging.debug("Saving %s to %s." % (self.name, result))
@@ -922,15 +948,16 @@ class Chroot:
os.rename(tmpfile, result)
dont_do_on_panic(panic_handler_id)
- def unpack_from_tgz(self, tarball):
+ def unpack_from_tgz(self, tarball: str) -> None:
"""Unpack a tarball to a chroot."""
+ assert self.name is not None
logging.debug("Unpacking %s into %s" % (tarball, self.name))
prefix = []
if settings.eatmydata and os.path.isfile("/usr/bin/eatmydata"):
prefix.append("eatmydata")
run(prefix + ["tar", "-C", self.name, "--auto-compress", "-xf", tarball])
- def setup_from_schroot(self, schroot):
+ def setup_from_schroot(self, schroot: str) -> None:
self.schroot_session = schroot.split(":", 1)[-1] + "-" + str(uuid.uuid1()) + "-piuparts"
run(["schroot", "--begin-session", "--chroot", schroot, "--session-name", self.schroot_session])
ret_code, output = run(["schroot", "--chroot", "session:" + self.schroot_session, "--location"])
@@ -938,13 +965,13 @@ class Chroot:
logging.info("New schroot session in '%s'" % self.name)
@staticmethod
- def check_if_docker_storage_driver_is_supported():
+ def check_if_docker_storage_driver_is_supported() -> None:
ret_code, output = run(["docker", "info"])
if "overlay2" not in output:
logging.error("Only overlay2 storage driver is supported")
panic()
- def setup_from_docker(self, docker_image):
+ def setup_from_docker(self, docker_image: str) -> None:
self.check_if_docker_storage_driver_is_supported()
with tempfile.TemporaryDirectory() as tmpdir:
cidfile = pathlib.Path(tmpdir) / "cidfile"
@@ -959,8 +986,9 @@ class Chroot:
self.name = output.strip()
logging.info("New container created %r at %r", self.docker_container, self.name)
- def setup_from_lvm(self, lvm_volume):
+ def setup_from_lvm(self, lvm_volume: str) -> None:
"""Create a chroot by creating an LVM snapshot."""
+ assert self.name is not None
self.lvm_base = os.path.dirname(lvm_volume)
self.lvm_vol_name = os.path.basename(lvm_volume)
self.lvm_snapshot_name = self.lvm_vol_name + "-" + str(uuid.uuid1())
@@ -971,8 +999,9 @@ class Chroot:
logging.info("Mounting LVM snapshot to %s" % self.name)
run(["mount", self.lvm_snapshot, self.name])
- def setup_from_dir(self, dirname):
+ def setup_from_dir(self, dirname: str) -> None:
"""Create chroot from an existing one."""
+ assert self.name is not None
# if on same device, make hard link
cmd = ["cp"]
if settings.hard_link and os.stat(dirname).st_dev == os.stat(self.name).st_dev:
@@ -986,7 +1015,8 @@ class Chroot:
dst = os.path.join(self.name, name)
run(cmd + [src, dst])
- def interactive_shell(self):
+ def interactive_shell(self) -> None:
+ assert self.name is not None
logging.info("Entering interactive shell in %s" % self.name)
env = os.environ.copy()
env["debian_chroot"] = "piuparts:%s" % self.name
@@ -995,7 +1025,8 @@ class Chroot:
except Exception:
pass
- def run(self, command, ignore_errors=False):
+ def run(self, command: list[str], ignore_errors: bool = False) -> tuple[int, str]:
+ assert self.name is not None
prefix = []
if settings.eatmydata and os.path.isfile(os.path.join(self.name, "usr/bin/eatmydata")):
prefix.append("eatmydata")
@@ -1035,13 +1066,14 @@ class Chroot:
timeout=settings.max_command_runtime,
)
- def mkdir_p(self, path):
+ def mkdir_p(self, path: str) -> None:
fullpath = self.relative(path)
if not os.path.isdir(fullpath):
os.makedirs(fullpath)
- def create_apt_sources(self, distro):
+ def create_apt_sources(self, distro: str) -> None:
"""Create an /etc/apt/sources.list with a given distro."""
+ assert settings.distro_config is not None
lines = []
lines.extend(settings.distro_config.get_deb_lines(distro, settings.debian_mirrors[0][1]))
for mirror, components in settings.debian_mirrors[1:]:
@@ -1051,7 +1083,7 @@ class Chroot:
create_file(self.relative("etc/apt/sources.list"), "\n".join(lines) + "\n")
logging.debug("sources.list:\n" + indent_string("\n".join(lines)))
- def aptupdate_run(self):
+ def aptupdate_run(self) -> int | None:
"""Resynchronize the package index files.
If executed under --update-retries <num>, retry
its execution up to <num> times, useful e.g.
@@ -1059,7 +1091,7 @@ class Chroot:
errors."""
if not settings.update_retries:
self.run(["apt-get", "update"])
- return
+ return None
count = 0
for count, run in enumerate(range(settings.update_retries), 1):
@@ -1074,7 +1106,7 @@ class Chroot:
return status
- def enable_testdebs_repo(self, update=True):
+ def enable_testdebs_repo(self, update: bool = True) -> None:
if settings.testdebs_repo:
if settings.testdebs_repo.startswith("deb"):
debline = settings.testdebs_repo
@@ -1087,12 +1119,12 @@ class Chroot:
if update:
self.aptupdate_run()
- def disable_testdebs_repo(self):
+ def disable_testdebs_repo(self) -> None:
if settings.testdebs_repo:
logging.debug("disabling testdebs repository")
remove_files([self.relative("etc/apt/sources.list.d/piuparts-testdebs-repo.list")])
- def create_apt_conf(self):
+ def create_apt_conf(self) -> None:
"""Create /etc/apt/apt.conf.d/piuparts inside the chroot."""
lines = ['APT::Get::Assume-Yes "yes";\n']
lines.append('APT::Install-Recommends "%d";\n' % int(settings.install_recommends))
@@ -1124,7 +1156,7 @@ class Chroot:
create_file(self.relative("etc/apt/apt.conf.d/piuparts"), "".join(lines))
- def create_dpkg_conf(self):
+ def create_dpkg_conf(self) -> None:
"""Create /etc/dpkg/dpkg.cfg.d/piuparts inside the chroot."""
lines = []
if settings.dpkg_force_unsafe_io:
@@ -1139,7 +1171,7 @@ class Chroot:
self.mkdir_p("etc/dpkg/dpkg.cfg.d")
create_file(self.relative("etc/dpkg/dpkg.cfg.d/piuparts"), "".join(lines))
- def create_policy_rc_d(self):
+ def create_policy_rc_d(self) -> None:
"""Create a policy-rc.d that prevents daemons from running."""
full_name = self.relative("usr/sbin/policy-rc.d")
policy = "#!/bin/sh\n"
@@ -1156,7 +1188,7 @@ class Chroot:
os.chmod(full_name, 0o755)
logging.debug("Created policy-rc.d and chmodded it.")
- def create_resolv_conf(self):
+ def create_resolv_conf(self) -> None:
"""Update resolv.conf based on the current configuration in the host system. Strip comments and whitespace."""
if settings.docker_image:
# Docker takes care of this
@@ -1171,8 +1203,10 @@ class Chroot:
os.chmod(full_name, 0o644)
logging.debug("Created resolv.conf.")
- def setup_minimal_chroot(self):
+ def setup_minimal_chroot(self) -> None:
"""Set up a minimal Debian system in a chroot."""
+ assert self.name is not None
+ assert settings.distro_config is not None
logging.debug("Setting up minimal chroot for %s at %s." % (settings.debian_distros[0], self.name))
prefix = []
if settings.eatmydata and os.path.isfile("/usr/bin/eatmydata"):
@@ -1199,10 +1233,11 @@ class Chroot:
)
self.bootstrapped = True
- def minimize(self):
+ def minimize(self) -> None:
"""Minimize a chroot by removing (almost all) unnecessary packages"""
if settings.skip_minimize or not settings.minimize:
return
+ assert settings.debfoster_options is not None
self.run(["apt-get", "install", "debfoster"])
debfoster_command = ["debfoster"] + settings.debfoster_options
if settings.eatmydata:
@@ -1211,8 +1246,10 @@ class Chroot:
remove_files([self.relative("var/lib/debfoster/keepers")])
self.run(["dpkg", "--purge", "debfoster"])
- def configure_chroot(self):
+ def configure_chroot(self) -> None:
"""Configure a chroot according to current settings"""
+ assert self.name is not None
+ assert settings.distro_config is not None
os.environ["PIUPARTS_DISTRIBUTION"] = settings.distro_config.get_distribution(settings.debian_distros[0])
if not settings.keep_sources_list:
self.create_apt_sources(settings.debian_distros[0])
@@ -1282,18 +1319,18 @@ class Chroot:
for bindmount in settings.bindmounts:
self.mount(bindmount, bindmount, opts=["rbind"])
- def remember_available_md5(self):
+ def remember_available_md5(self) -> None:
"""Keep a history of 'apt-cache dumpavail | md5sum' after initial
setup and each dist-upgrade step to notice outdated reference
chroot metadata"""
errorcode, avail_md5 = self.run(["sh", "-c", "apt-cache dumpavail | md5sum"])
self.avail_md5_history.append(avail_md5.split()[0])
- def remember_initial_selections(self):
+ def remember_initial_selections(self) -> None:
"""Remember initial selections to easily recognize mismatching chroot metadata"""
self.initial_selections = self.get_selections()
- def remember_systemd_tmpfiles(self):
+ def remember_systemd_tmpfiles(self) -> None:
"""Remember files potentially affected by systemd-tmpfiles.
Only files and directories with some flags are ignored by piuparts."""
@@ -1327,8 +1364,9 @@ class Chroot:
if flag[0] in ignored_flags:
self.systemd_tmpfiles[filename] = (current_file, flag)
- def upgrade_to_distros(self, distros, packages, apt_get_upgrade=False):
+ def upgrade_to_distros(self, distros: list[str], packages: list[str], apt_get_upgrade: bool = False) -> None:
"""Upgrade a chroot installation to each successive distro."""
+ assert settings.distro_config is not None
for distro in distros:
logging.debug("Upgrading %s to %s" % (self.name, distro))
os.environ["PIUPARTS_DISTRIBUTION_NEXT"] = settings.distro_config.get_distribution(distro)
@@ -1354,7 +1392,7 @@ class Chroot:
self.run_scripts("post_distupgrade")
self.check_for_no_processes()
- def get_known_packages(self, packages):
+ def get_known_packages(self, packages: list[str]) -> list[str]:
"""Does apt-get (or apt-cache) know about a set of packages?"""
known_packages = []
new_packages = []
@@ -1375,7 +1413,7 @@ class Chroot:
logging.info("the following packages are not in the archive: " + ", ".join(new_packages))
return known_packages
- def copy_files(self, source_names, target_name):
+ def copy_files(self, source_names: list[str], target_name: str) -> None:
"""Copy files in 'source_name' to file/dir 'target_name', relative
to the root of the chroot."""
target_name = self.relative(target_name)
@@ -1387,7 +1425,7 @@ class Chroot:
logging.error("Error copying %s to %s: %s" % (source_name, target_name, detail))
panic()
- def list_installed_files(self, pre_info, post_info):
+ def list_installed_files(self, pre_info: dict[str, FileInfo], post_info: dict[str, FileInfo]) -> None:
"""List the new files installed, removed and modified between two dir trees.
Actually, it is a nice output of the funcion diff_meta_dat."""
(new, removed, modified) = diff_meta_data(pre_info, post_info, self.is_ignored)
@@ -1406,7 +1444,7 @@ class Chroot:
else:
logging.debug("The package did not modify any file.\n")
- def is_installed(self, packages):
+ def is_installed(self, packages: list[str]) -> bool:
if not packages:
return True
retcode, output = self.run(["dpkg-query", "-f", "${Package} ${Status}\n", "-W"] + packages, ignore_errors=True)
@@ -1420,17 +1458,22 @@ class Chroot:
installed = False
return installed
- def install_packages(self, package_files, packages, with_scripts=True, reinstall=False):
+ def install_packages(
+ self, package_files: list[str], packages: list[str], with_scripts: bool = True, reinstall: bool = False
+ ) -> None:
if package_files:
self.install_package_files(package_files, packages, with_scripts=with_scripts)
else:
self.install_packages_by_name(packages, with_scripts=with_scripts, reinstall=reinstall)
- def install_package_files(self, package_files, packages=None, with_scripts=False):
+ def install_package_files(
+ self, package_files: list[str], packages: list[str] | None = None, with_scripts: bool = False
+ ) -> None:
if packages and settings.testdebs_repo:
self.install_packages_by_name(packages, with_scripts=with_scripts)
return
if package_files:
+ assert settings.distro_config is not None
# Check whether apt-get can install debs (supported since apt 1.1)
#
# If it can, this is preferable to the traditional
@@ -1492,8 +1535,9 @@ class Chroot:
remove_files([self.relative(name) for name in tmp_files])
- def install_packages_by_name(self, packages, with_scripts=True, reinstall=False):
+ def install_packages_by_name(self, packages: list[str], with_scripts: bool = True, reinstall: bool = False) -> None:
if packages:
+ assert settings.distro_config is not None
if with_scripts:
self.run_scripts("pre_install")
@@ -1512,7 +1556,14 @@ class Chroot:
if with_scripts:
self.run_scripts("post_install")
- def apt_get_install(self, to_install=[], to_remove=[], to_purge=[], flags=[], reinstall=False):
+ def apt_get_install(
+ self,
+ to_install: list[str] = [],
+ to_remove: list[str] = [],
+ to_purge: list[str] = [],
+ flags: list[str] = [],
+ reinstall: bool = False,
+ ) -> None:
command = ["apt-get", "-y"] + flags + ["install"]
if reinstall:
command.append("--reinstall")
@@ -1521,11 +1572,11 @@ class Chroot:
command.extend(["%s_" % x for x in unqualify(to_purge)])
self.run(command)
- def get_selections(self):
+ def get_selections(self) -> dict[str, tuple[str, str | None]]:
"""Get current package selections in a chroot."""
# "${Status}" emits three columns, e.g. "install ok installed"
# "${binary:Package}" requires a multi-arch dpkg, so fall back to "${Package}" on older versions
- (status, output) = self.run(
+ (command_status, output) = self.run(
["dpkg-query", "-W", "-f", "${Status}\\t${binary:Package}\\t${Package}\\t${Version}\\n"]
)
vdict = {}
@@ -1540,22 +1591,26 @@ class Chroot:
vdict[name] = (status, version)
return vdict
- def get_diversions(self):
+ def get_diversions(self) -> list[str] | None:
"""Get current dpkg-divert --list in a chroot."""
if not settings.check_broken_diversions:
- return
+ return None
(status, output) = self.run(["dpkg-divert", "--list"])
return output.split("\n")
- def get_modified_diversions(self, pre_install_diversions, post_install_diversions=None):
+ def get_modified_diversions(
+ self, pre_install_diversions: list[str], post_install_diversions: list[str] | None = None
+ ) -> tuple[list[str], list[str]]:
"""Check that diversions in chroot are identical (though potentially reordered)."""
if post_install_diversions is None:
post_install_diversions = self.get_diversions()
+ assert post_install_diversions is not None
removed = [ln for ln in pre_install_diversions if ln not in post_install_diversions]
added = [ln for ln in post_install_diversions if ln not in pre_install_diversions]
return (removed, added)
- def check_debsums(self):
+ def check_debsums(self) -> None:
+ assert self.name is not None
(status, output) = run(["debsums", "--root", self.name, "-ac", "--ignore-obsolete"], ignore_errors=True)
if status != 0:
logging.error(
@@ -1565,8 +1620,9 @@ class Chroot:
if not settings.warn_on_debsums_errors:
panic()
- def check_adequate(self, packages):
+ def check_adequate(self, packages: list[str]) -> None:
"""Run adequate and categorize output according to our needs."""
+ assert self.name is not None
packages = unqualify([p for p in packages if not p.endswith("=None")])
if packages and settings.adequate and os.path.isfile("/usr/bin/adequate"):
(status, output) = run(["dpkg-query", "-f", "${Version}\n", "-W", "adequate"], ignore_errors=True)
@@ -1592,7 +1648,7 @@ class Chroot:
"obsolete-conffile",
"broken-symlink",
]
- ignored_tags = []
+ ignored_tags: list[str] = []
(status, output) = run(["adequate", "--root", self.name] + packages, ignore_errors=True)
for tag in ignored_tags:
# ignore some tags
@@ -1631,7 +1687,8 @@ class Chroot:
if not settings.warn_if_inadequate:
panic()
- def list_paths_with_symlinks(self):
+ def list_paths_with_symlinks(self) -> None:
+ assert self.name is not None
file_owners = self.get_files_owned_by_packages()
bad = []
overwrites = False
@@ -1676,7 +1733,9 @@ class Chroot:
else:
logging.info(msg)
- def remove_packages(self, packages, allow_remove_essential=True, ignore_errors=False):
+ def remove_packages(
+ self, packages: list[str], allow_remove_essential: bool = True, ignore_errors: bool = False
+ ) -> None:
"""Remove packages in a chroot."""
base_command = ["apt-get", "remove"]
if allow_remove_essential:
@@ -1687,12 +1746,12 @@ class Chroot:
ignore_errors=ignore_errors,
)
- def purge_packages(self, packages, ignore_errors=False):
+ def purge_packages(self, packages: typing.Iterable[str], ignore_errors: bool = False) -> None:
"""Purge packages in a chroot."""
if packages:
self.run(["dpkg", "--purge"] + unqualify(packages), ignore_errors=ignore_errors)
- def restore_selections(self, reference_chroot_state, packages_qualified):
+ def restore_selections(self, reference_chroot_state: StateMetaData, packages_qualified: list[str]) -> None:
"""Restore package selections in a chroot to the state in
'reference_chroot_state'."""
@@ -1772,7 +1831,7 @@ class Chroot:
self.run(["dpkg", "--purge", "--pending"])
self.run(["dpkg", "--remove", "--pending"])
- def get_tree_meta_data(self):
+ def get_tree_meta_data(self) -> dict[str, FileInfo]:
"""Return the filesystem meta data for all objects in the chroot."""
self.run(["apt-get", "clean"])
logging.debug("Recording chroot state")
@@ -1783,8 +1842,8 @@ class Chroot:
(usr, x, uid) = line.split(":")[0:3]
uidmap[int(uid)] = usr
gidmap = {}
- with open(self.relative("etc/group"), "r") as group:
- for line in group:
+ with open(self.relative("etc/group"), "r") as groupfile:
+ for line in groupfile:
(grp, x, gid) = line.split(":")[0:3]
gidmap[int(gid)] = grp
vdict = {}
@@ -1815,24 +1874,25 @@ class Chroot:
vdict[name[len(root) :]] = FileInfo(st, target, user, group)
return vdict
- def get_state_meta_data(self):
- chroot_state = {}
- chroot_state["initial_selections"] = self.initial_selections
- chroot_state["avail_md5"] = self.avail_md5_history
- chroot_state["tree"] = self.get_tree_meta_data()
- chroot_state["selections"] = self.get_selections()
- chroot_state["diversions"] = self.get_diversions()
- return chroot_state
+ def get_state_meta_data(self) -> StateMetaData:
+ return {
+ "initial_selections": self.initial_selections,
+ "avail_md5": self.avail_md5_history,
+ "tree": self.get_tree_meta_data(),
+ "selections": self.get_selections(),
+ "diversions": self.get_diversions(),
+ }
- def relative(self, pathname):
+ def relative(self, pathname: str) -> str:
+ assert self.name is not None
if pathname.startswith("/"):
return os.path.join(self.name, pathname[1:])
return os.path.join(self.name, pathname)
- def get_files_owned_by_packages(self):
+ def get_files_owned_by_packages(self) -> dict[str, list[str]]:
"""Return dict[filename] = [packagenamelist]."""
vdir = self.relative("var/lib/dpkg/info")
- vdict = {}
+ vdict: dict[str, list[str]] = {}
for basename in os.listdir(vdir):
if basename.endswith(".list"):
pkg = basename[: -len(".list")]
@@ -1844,12 +1904,13 @@ class Chroot:
vdict[pathname] = [pkg]
return vdict
- def check_for_no_processes(self, fail=None):
+ def check_for_no_processes(self, fail: bool | None = None) -> None:
"""Check there are no processes running inside the chroot."""
if settings.docker_image:
(status, output) = run(["docker", "top", self.docker_container])
count = len(output.strip().split("\n")) - 2 # header + bash launched on container creation
else:
+ assert self.name is not None
(status, output) = run(["lsof", "-w", "+D", self.name], ignore_errors=True)
count = len(output.split("\n")) - 1
if count > 0:
@@ -1862,11 +1923,12 @@ class Chroot:
self.terminate_running_processes()
panic()
- def terminate_running_processes(self):
+ def terminate_running_processes(self) -> None:
"""Terminate all processes running in the chroot."""
if settings.docker_image:
# Docker takes care of this
return
+ assert self.name is not None
seen = []
while True:
p = subprocess.Popen(
@@ -1895,14 +1957,17 @@ class Chroot:
# If /selinux is present, assume that this is the only supported
# location by libselinux. Otherwise use the new location.
# /selinux was shipped by the libselinux package until wheezy.
- def selinuxfs_path(self):
+ def selinuxfs_path(self) -> str:
if os.path.isdir(self.relative("/selinux")):
return "/selinux"
else:
return "/sys/fs/selinux"
- def mount(self, source, path, fstype=None, opts=None, no_mkdir=False):
+ def mount(
+ self, source: str, path: str, fstype: str | None = None, opts: list[str] | None = None, no_mkdir: bool = False
+ ) -> None:
"""Mount something into the chroot and remember it for unmount_all()."""
+ assert self.name is not None
if opts is None:
opts = []
path = canonicalize_path(self.name, path)
@@ -1922,7 +1987,7 @@ class Chroot:
run(command)
self.mounts.append(fullpath)
- def unmount_all(self):
+ def unmount_all(self) -> None:
"""Unmount everything we mount()ed into the chroot."""
# Workaround to unmount /proc/sys/fs/binfmt_misc which is mounted by
@@ -1935,7 +2000,7 @@ class Chroot:
for mountpoint in reversed(self.mounts):
run(["umount", mountpoint], ignore_errors=True)
- def mount_proc(self):
+ def mount_proc(self) -> None:
"""Mount /proc etc. inside chroot."""
self.mount("proc", "/proc", fstype="proc")
etcmtab = self.relative("etc/mtab")
@@ -1969,7 +2034,7 @@ class Chroot:
if selinux_enabled():
self.mount("/sys/fs/selinux", self.selinuxfs_path(), opts=["bind", "ro"])
- def is_ignored(self, pathname, info="PATH", quiet=False):
+ def is_ignored(self, pathname: str, info: str = "PATH", quiet: bool = False) -> bool:
"""Is a file (or dir or whatever) to be ignored?"""
if pathname in settings.ignored_files:
return True
@@ -1995,7 +2060,13 @@ class Chroot:
return True
return False
- def check_files_moved_usr(self, packages=[], files_before={}, files_after={}, warn_only=None):
+ def check_files_moved_usr(
+ self,
+ packages: list[str] = [],
+ files_before: dict[str, list[str]] = {},
+ files_after: dict[str, list[str]] = {},
+ warn_only: bool = False,
+ ) -> None:
"""Check that no files were moved from /{bin|sbin|lib*} and /usr/{bin|sbin|lib*}"""
if settings.warn_on_usr_move == "disabled":
@@ -2041,8 +2112,9 @@ class Chroot:
else:
logging.debug("No file moved between /{bin|sbin|lib*} and /usr/{bin|sbin|lib*}.")
- def check_for_broken_symlinks(self, warn_only=None, file_owners={}):
+ def check_for_broken_symlinks(self, warn_only: bool = False, file_owners: dict[str, list[str]] = {}) -> None:
"""Check that all symlinks in chroot are non-broken."""
+ assert self.name is not None
if not settings.check_broken_symlinks:
return
broken = []
@@ -2073,7 +2145,7 @@ class Chroot:
else:
logging.debug("No broken symlinks as far as we can find.")
- def check_if_cronfiles(self, packages):
+ def check_if_cronfiles(self, packages: list[str]) -> list[str]:
"""Check if the packages have cron files under /etc/cron.d and in case positive,
it returns the list of files."""
@@ -2099,7 +2171,7 @@ class Chroot:
return vlist
- def check_output_cronfiles(self, list):
+ def check_output_cronfiles(self, list: list[str]) -> None:
"""Check if a given list of cronfiles has any output. Executes
cron file as cron would do (except for SHELL)"""
failed = False
@@ -2115,7 +2187,7 @@ class Chroot:
if failed:
panic()
- def check_if_logrotatefiles(self, packages):
+ def check_if_logrotatefiles(self, packages: list[str]) -> list[str]:
"""Check if the packages have logrotate files under /etc/logrotate.d and in case positive,
it returns the list of files."""
@@ -2137,7 +2209,7 @@ class Chroot:
return vlist
- def install_logrotate(self):
+ def install_logrotate(self) -> typing.Iterable[str]:
"""Install logrotate for check_output_logrotatefiles, and return the
list of packages that were installed"""
old_selections = self.get_selections()
@@ -2148,7 +2220,7 @@ class Chroot:
diff = diff_selections(self, old_selections)
return diff.keys()
- def check_output_logrotatefiles(self, list):
+ def check_output_logrotatefiles(self, list: list[str]) -> None:
"""Check if a given list of logrotatefiles has any output. Executes
logrotate file as logrotate would do from cron (except for SHELL)"""
failed = False
@@ -2164,7 +2236,7 @@ class Chroot:
if failed:
panic()
- def run_scripts(self, step, ignore_errors=False):
+ def run_scripts(self, step: str, ignore_errors: bool = False) -> int:
"""Run custom scripts to given step post-install|remove|purge"""
errorcodes = 0
@@ -2184,16 +2256,17 @@ class Chroot:
return errorcodes
-def selinux_enabled(enabled_test="/usr/sbin/selinuxenabled"):
+def selinux_enabled(enabled_test: str = "/usr/sbin/selinuxenabled") -> bool | None:
if os.access(enabled_test, os.X_OK):
retval, output = run([enabled_test], ignore_errors=True)
if retval == 0:
return True
else:
return False
+ return None
-def objects_are_different(obj1, obj2):
+def objects_are_different(obj1: FileInfo, obj2: FileInfo) -> bool:
"""Are filesystem objects different based on their meta data?"""
if (
obj1.st.st_mode != obj2.st.st_mode
@@ -2207,7 +2280,7 @@ def objects_are_different(obj1, obj2):
return False
-def format_object_attributes(obj):
+def format_object_attributes(obj: FileInfo) -> str:
st = obj.st
ft = ""
if stat.S_ISDIR(st.st_mode):
@@ -2228,7 +2301,16 @@ def format_object_attributes(obj):
return res
-def diff_meta_data(tree1, tree2, is_ignored, quiet=False):
+class IsIgnored(typing.Protocol):
+ def __call__(self, pathname: str, info: str, quiet: bool) -> bool: ...
+
+
+def diff_meta_data(
+ tree1: dict[str, FileInfo],
+ tree2: dict[str, FileInfo],
+ is_ignored: IsIgnored,
+ quiet: bool = False,
+) -> tuple[list[tuple[str, FileInfo]], list[tuple[str, FileInfo]], list[tuple[str, FileInfo]]]:
"""Compare two dir trees and return list of new files (only in 'tree2'),
removed files (only in 'tree1'), and modified files."""
@@ -2277,7 +2359,7 @@ def diff_meta_data(tree1, tree2, is_ignored, quiet=False):
return new, removed, modified
-def file_list(meta_infos, file_owners):
+def file_list(meta_infos: list[tuple[str, FileInfo]], file_owners: dict[str, list[str]]) -> str:
"""Return list of indented filenames."""
meta_infos = sorted(meta_infos[:])
vlist = []
@@ -2297,7 +2379,7 @@ def file_list(meta_infos, file_owners):
return "".join(vlist)
-def offending_packages(meta_infos, file_owners):
+def offending_packages(meta_infos: list[tuple[str, FileInfo]], file_owners: dict[str, list[str]]) -> set[str]:
"""Return a Set of offending packages."""
pkgset = set()
for name, data in meta_infos:
@@ -2307,7 +2389,9 @@ def offending_packages(meta_infos, file_owners):
return pkgset
-def prune_files_list(files, depsfiles):
+def prune_files_list(
+ files: list[tuple[str, FileInfo]], depsfiles: list[tuple[str, FileInfo]]
+) -> list[tuple[str, FileInfo]]:
"""Remove elements from 'files' that are in 'depsfiles', and return the
list of removed elements.
"""
@@ -2320,12 +2404,12 @@ def prune_files_list(files, depsfiles):
return warn
-def diff_selections(chroot, selections):
+def diff_selections(chroot: Chroot, selections: dict[str, tuple[str, str | None]]) -> dict[str, tuple[str, str | None]]:
"""Compare original and current package selection.
Return dict where dict[package_name] = original_status, that is,
the value in the dict is the state that the package needs to be
set to to restore original selections."""
- changes = {}
+ changes: dict[str, tuple[str, str | None]] = {}
current = chroot.get_selections()
for name, (value, version) in current.items():
if name not in selections:
@@ -2338,7 +2422,7 @@ def diff_selections(chroot, selections):
return changes
-def get_package_names_from_package_files(package_files):
+def get_package_names_from_package_files(package_files: list[str]) -> list[str]:
"""Return list of package names given list of package file names."""
vlist = []
for filename in package_files:
@@ -2362,7 +2446,7 @@ def get_package_names_from_package_files(package_files):
# from the 'Files' stanza.
-def process_changes(changes):
+def process_changes(changes: str) -> list[str] | None:
# Determine the path to the changes file, then check if it's readable.
dir_path = ""
changes_path = ""
@@ -2373,7 +2457,7 @@ def process_changes(changes):
changes_path = os.path.abspath(changes)
if not os.access(changes_path, os.R_OK):
logging.warning(changes_path + " is not readable. Skipping.")
- return
+ return None
# Determine the packages in the changes file through the 'Files' stanza.
field = "Files"
@@ -2404,7 +2488,12 @@ def process_changes(changes):
return package_list
-def check_results(chroot, chroot_state, file_owners, deps_info=None):
+def check_results(
+ chroot: Chroot,
+ chroot_state: StateMetaData,
+ file_owners: dict[str, list[str]],
+ deps_info: dict[str, FileInfo] | None = None,
+) -> bool:
"""Check that current chroot state matches 'chroot_state'.
If settings.warn_on_others is True and deps_info is not None, then only
@@ -2418,15 +2507,18 @@ def check_results(chroot, chroot_state, file_owners, deps_info=None):
reference_info = chroot_state["tree"]
ok = True
if settings.check_broken_diversions:
- (removed, added) = chroot.get_modified_diversions(chroot_state["diversions"])
- if added:
+ assert chroot_state["diversions"] is not None
+ (removed_diversions, added_diversions) = chroot.get_modified_diversions(chroot_state["diversions"])
+ if added_diversions:
logging.error(
- "FAIL: Installed diversions (dpkg-divert) not removed by purge:\n%s" % indent_string("\n".join(added))
+ "FAIL: Installed diversions (dpkg-divert) not removed by purge:\n%s"
+ % indent_string("\n".join(added_diversions))
)
ok = False
- if removed:
+ if removed_diversions:
logging.error(
- "FAIL: Existing diversions (dpkg-divert) removed/modified:\n%s" % indent_string("\n".join(removed))
+ "FAIL: Existing diversions (dpkg-divert) removed/modified:\n%s"
+ % indent_string("\n".join(removed_diversions))
)
ok = False
@@ -2484,7 +2576,13 @@ def check_results(chroot, chroot_state, file_owners, deps_info=None):
return ok
-def install_purge_test(chroot, chroot_state, package_files, packages, extra_packages):
+def install_purge_test(
+ chroot: Chroot,
+ chroot_state: StateMetaData,
+ package_files: list[str],
+ packages: list[str],
+ extra_packages: list[str],
+) -> bool:
"""Do an install-purge test. Return True if successful, False if not.
Assume 'root' is a directory already populated with a working
chroot, with packages in states given by 'selections'."""
@@ -2508,6 +2606,7 @@ def install_purge_test(chroot, chroot_state, package_files, packages, extra_pack
if settings.warn_on_others or settings.install_purge_install:
# Create a metapackage with dependencies from the given packages
+ control_infos: typing.Iterable[deb822.Deb822]
if package_files:
control_infos = []
# We were given package files, so let's get the Depends and
@@ -2561,7 +2660,7 @@ def install_purge_test(chroot, chroot_state, package_files, packages, extra_pack
"piuparts-depends-dummy", depends=all_depends, conflicts=all_conflicts, arch=arch
)
- def cleanup_metapackage():
+ def cleanup_metapackage() -> None:
shutil.rmtree(os.path.dirname(metapackage))
panic_handler_id = do_on_panic(cleanup_metapackage)
@@ -2637,7 +2736,9 @@ def install_purge_test(chroot, chroot_state, package_files, packages, extra_pack
return check_results(chroot, chroot_state, file_owners, deps_info=deps_info)
-def install_upgrade_test(chroot, chroot_state, package_files, packages, old_packages):
+def install_upgrade_test(
+ chroot: Chroot, chroot_state: StateMetaData, package_files: list[str], packages: list[str], old_packages: list[str]
+) -> bool:
"""Install old_packages via apt-get, then upgrade from package files.
Return True if successful, False if not."""
@@ -2685,21 +2786,21 @@ def install_upgrade_test(chroot, chroot_state, package_files, packages, old_pack
return check_results(chroot, chroot_state, file_owners_after)
-def save_meta_data(filename, chroot_state):
+def save_meta_data(filename: str, chroot_state: StateMetaData) -> None:
"""Save directory tree meta data into a file for fast access later."""
logging.debug("Saving chroot meta data to %s" % filename)
with open(filename, "wb") as f:
pickle.dump(chroot_state, f)
-def load_meta_data(filename):
+def load_meta_data(filename: str) -> StateMetaData:
"""Load meta data saved by 'save_meta_data'."""
logging.debug("Loading chroot meta data from %s" % filename)
with open(filename, "rb") as f:
return pickle.load(f)
-def install_and_upgrade_between_distros(package_files, packages_qualified):
+def install_and_upgrade_between_distros(package_files: list[str], packages_qualified: list[str]) -> bool:
"""Install package and upgrade it between distributions, then remove.
Return True if successful, False if not."""
@@ -2863,14 +2964,14 @@ def install_and_upgrade_between_distros(package_files, packages_qualified):
return result
-def parse_mirror_spec(str, defaultcomponents=[]):
+def parse_mirror_spec(str: str, defaultcomponents: list[str] = []) -> tuple[str, list[str]]:
"""Parse a mirror specification from the --mirror option argument.
Return (mirror, componentslist)."""
parts = str.split()
return parts[0], parts[1:] or defaultcomponents[:]
-def find_default_debian_mirrors():
+def find_default_debian_mirrors() -> list[tuple[str, list[str]]]:
"""Find the default Debian mirrors."""
mirrors = []
try:
@@ -2881,11 +2982,19 @@ def find_default_debian_mirrors():
mirrors.append((parts[1], parts[3:]))
break # Only use the first one, at least for now.
except IOError:
- return None
+ return []
return mirrors
-def forget_ignores(option, opt, value, parser, *args, **kwargs):
+def forget_ignores(
+ option: optparse.Option,
+ opt: str,
+ value: typing.Any,
+ parser: optparse.OptionParser,
+ *args: typing.Any,
+ **kwargs: typing.Any,
+) -> None:
+ assert parser.values is not None
settings.bindmounts = []
parser.values.ignore = []
parser.values.ignore_regex = []
@@ -2893,11 +3002,19 @@ def forget_ignores(option, opt, value, parser, *args, **kwargs):
settings.ignored_patterns = []
-def set_basetgz_to_pbuilder(option, opt, value, parser, *args, **kwargs):
+def set_basetgz_to_pbuilder(
+ option: optparse.Option,
+ opt: str,
+ value: typing.Any,
+ parser: optparse.OptionParser,
+ *args: typing.Any,
+ **kwargs: typing.Any,
+) -> None:
+ assert parser.values is not None
parser.values.basetgz = "/var/cache/pbuilder/base.tgz"
-def parse_command_line():
+def parse_command_line() -> list[str]:
"""Parse the command line, change global settings, return non-options."""
parser = optparse.OptionParser(usage="%prog [options] package ...", version="piuparts %s" % VERSION)
@@ -3069,7 +3186,10 @@ def parse_command_line():
"--install-suggests", action="store_true", default=False, help="Enable the installation of Suggests."
)
- def keep_env_parser(option, opt_str, value, parser):
+ def keep_env_parser(
+ option: optparse.Option, opt_str: str, value: typing.Any, parser: optparse.OptionParser
+ ) -> None:
+ assert option.dest is not None
setattr(parser.values, option.dest, True)
if "--keep-tmpdir" == opt_str:
print("WARNING `--keep-tmpdir` is deprecated, use `--keep-env` " "instead")
@@ -3315,7 +3435,10 @@ def parse_command_line():
)
parser.add_option(
- "--skip-cronfiles-test", action="store_true", default=False, help="Skip testing the output from the cron files."
+ "--skip-cronfiles-test",
+ action="store_true",
+ default=False,
+ help="Skip testing the output from the cron files.",
)
parser.add_option(
@@ -3511,11 +3634,10 @@ def parse_command_line():
settings.install_purge_install = opts.install_purge_install
settings.install_remove_install = opts.install_remove_install
settings.list_installed_files = opts.list_installed_files
- [
+ for csv in opts.fake_essential_packages:
settings.fake_essential_packages.extend([i.strip() for i in csv.split(",")])
- for csv in opts.fake_essential_packages
- ]
- [settings.extra_old_packages.extend([i.strip() for i in csv.split(",")]) for csv in opts.extra_old_packages]
+ for csv in opts.extra_old_packages:
+ settings.extra_old_packages.extend([i.strip() for i in csv.split(",")])
settings.skip_cronfiles_test = opts.skip_cronfiles_test
settings.skip_logrotatefiles_test = opts.skip_logrotatefiles_test
settings.adequate = not opts.no_adequate
@@ -3552,6 +3674,7 @@ def parse_command_line():
settings.tmpdir = os.environ["TMPDIR"]
else:
settings.tmpdir = "/tmp"
+ assert settings.tmpdir is not None
if not os.path.isdir(settings.tmpdir):
logging.error("Temporary directory is not a directory: %s" % settings.tmpdir)
@@ -3594,12 +3717,12 @@ def parse_command_line():
return args
-def get_chroot():
+def get_chroot() -> Chroot:
return Chroot()
# Process the packages given in a list
-def process_packages(package_list):
+def process_packages(package_list: list[str]) -> None:
# Find the names of packages.
if settings.args_are_package_files:
packages = get_package_names_from_package_files(package_list)
@@ -3661,7 +3784,7 @@ def process_packages(package_list):
panic()
-def main():
+def main() -> None:
"""Main program. But you knew that."""
args = parse_command_line()
@@ -3721,6 +3844,7 @@ def main():
for arg in args:
if changes_p.match(arg):
package_list = process_changes(arg)
+ assert package_list is not None
if settings.single_changes_list:
for package in package_list:
regular_packages_list.append(package)
=====================================
piupartslib/__init__.py
=====================================
@@ -19,22 +19,27 @@
import bz2
import lzma
+import typing
import urllib.error
import urllib.request
import zlib
+class Decompressor(typing.Protocol):
+ def decompress(self, chunk: bytes) -> bytes: ...
+
+
class DecompressedStream:
- def __init__(self, fileobj, decompressor=None):
+ def __init__(self, fileobj, decompressor: Decompressor | None = None):
self._input = fileobj
self._decompressor = decompressor
self._buffer = ""
- self._line_buffer = []
+ self._line_buffer: list[str] = []
self._i = 0
self._end = 0
self._undecbuf = b""
- def _split_decode(self, myb):
+ def _split_decode(self, myb: bytes) -> tuple[str, bytes]:
lmyb = len(myb)
for end in range(lmyb, max(lmyb - 6, -1), -1):
try:
@@ -44,7 +49,7 @@ class DecompressedStream:
# not returned yet? We have a problem
raise UnicodeDecodeError
- def _refill(self):
+ def _refill(self) -> bool:
if self._input is None:
return False
while True:
@@ -62,10 +67,10 @@ class DecompressedStream:
if chunk:
return True
- def readline(self):
+ def readline(self) -> str:
while not self._i < self._end:
self._i = self._end = 0
- self._line_buffer = None
+ self._line_buffer = []
empty = not self._refill()
if not self._buffer:
break
@@ -80,13 +85,13 @@ class DecompressedStream:
return self._line_buffer[self._i - 1]
return ""
- def close(self):
+ def close(self) -> None:
if self._input:
self._input.close()
self._input = self._decompressor = None
-def open_packages_url(url):
+def open_packages_url(url: str) -> tuple[str, DecompressedStream]:
"""Open a Packages.bz2 file pointed to by a URL"""
socket = None
error = None
@@ -98,17 +103,15 @@ def open_packages_url(url):
else:
break
else:
+ assert error is not None
raise error
url = socket.geturl()
if ext == ".bz2":
- decompressor = bz2.BZ2Decompressor()
- decompressed = DecompressedStream(socket, decompressor)
+ decompressed = DecompressedStream(socket, bz2.BZ2Decompressor())
elif ext == ".gz":
- decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
- decompressed = DecompressedStream(socket, decompressor)
+ decompressed = DecompressedStream(socket, zlib.decompressobj(16 + zlib.MAX_WBITS))
elif ext == ".xz":
- decompressor = lzma.LZMADecompressor()
- decompressed = DecompressedStream(socket, decompressor)
+ decompressed = DecompressedStream(socket, lzma.LZMADecompressor())
elif ext == "":
decompressed = socket
else:
=====================================
piupartslib/conf.py
=====================================
@@ -33,17 +33,19 @@ import distro_info
class MissingSection(Exception):
- def __init__(self, filename, section):
+ def __init__(self, filename: str, section: str):
self.args = ("Section %s not defined in configuration file %s" % (section, filename),)
class MissingMandatorySetting(Exception):
- def __init__(self, filename, key):
+ def __init__(self, filename: str, key: str):
self.args = ("Value for %s not set in configuration file %s" % (key, filename),)
-class Config(UserDict):
- def __init__(self, section, defaults, mandatory=[], defaults_section=None):
+class Config(UserDict[str, str]):
+ def __init__(
+ self, section: str, defaults: dict[str, str], mandatory: list[str] = [], defaults_section: str | None = None
+ ):
UserDict.__init__(self)
self._section = section
self._defaults_section = defaults_section
@@ -51,7 +53,7 @@ class Config(UserDict):
self[key] = value
self._mandatory = mandatory
- def read(self, filename):
+ def read(self, filename: str) -> None:
cp = configparser.ConfigParser()
cp.read(filename)
if not cp.has_section(self._section):
@@ -64,17 +66,17 @@ class Config(UserDict):
elif key in self._mandatory:
raise MissingMandatorySetting(filename, key)
- def get_mirror(self, distro=None):
+ def get_mirror(self, distro: str | None = None) -> str:
if self["mirror"] is not None:
return self["mirror"]
return "http://deb.debian.org/debian"
- def get_distros(self):
+ def get_distros(self) -> list[str]:
if self["upgrade-test-distros"] is not None:
return self["upgrade-test-distros"].split()
return []
- def get_distro(self):
+ def get_distro(self) -> str | None:
if self["distro"]:
return self["distro"]
distros = self.get_distros()
@@ -82,19 +84,19 @@ class Config(UserDict):
return distros[-1]
return None
- def get_start_distro(self):
+ def get_start_distro(self) -> str:
distros = self.get_distros()
if distros:
return distros[0]
return self["distro"]
- def get_final_distro(self):
+ def get_final_distro(self) -> str:
distros = self.get_distros()
if distros:
return distros[-1]
return self["distro"]
- def _get_distmap(self):
+ def _get_distmap(self) -> defaultdict[str, str]:
debdist = distro_info.DebianDistroInfo()
# start with e.g. "sid" -> "unstable"
@@ -120,32 +122,33 @@ class Config(UserDict):
return distmap
- def _map_distro(self, distro):
+ def _map_distro(self, distro: str) -> str:
distro_root = re.split("[-.]", distro)[0]
distmap = self._get_distmap()
return distmap[distro_root]
- def get_std_distro(self, distrolist=[]):
+ def get_std_distro(self, distrolist: list[str] = []) -> str:
if not distrolist:
distrolist = [self.get_distro()] + self.get_distros()
mappedlist = [self._map_distro(x) for x in distrolist]
return reduce(lambda x, y: y if y != "unknown" else x, mappedlist)
- def get_area(self):
+ def get_area(self) -> str:
if self["area"] is not None:
return self["area"]
return "main"
- def get_arch(self):
+ def get_arch(self) -> str:
if not self["arch"]:
# Try to figure it out ourselves, using dpkg
p = subprocess.Popen(["dpkg", "--print-architecture"], stdout=subprocess.PIPE)
+ assert p.stdout is not None
self["arch"] = p.stdout.read().decode().rstrip()
return self["arch"]
-class DistroConfig(UserDict):
- def __init__(self, filename, mirror):
+class DistroConfig(UserDict[str, dict[str, str | None]]):
+ def __init__(self, filename: str, mirror: str):
UserDict.__init__(self)
self._mirror = mirror
self._defaults = {
@@ -164,31 +167,32 @@ class DistroConfig(UserDict):
if cp.has_option(section, key):
self[section][key] = cp.get(section, key)
- def get(self, section, key=None):
- if section not in self.keys():
- self[section] = dict(self._defaults, distribution=section)
- if key is not None:
- return self[section][key]
- return self[section]
+ def get_field(self, section: str, key: str) -> str | None:
+ try:
+ sectdict = self[section]
+ except KeyError:
+ sectdict = dict(self._defaults, distribution=section)
+ self[section] = sectdict
+ return sectdict[key]
- def _is_virtual(self, distro):
- uri = self.get(distro, "uri")
+ def _is_virtual(self, distro: str) -> bool:
+ uri = self.get_field(distro, "uri")
return uri is not None and uri == "None"
- def get_mirror(self, distro):
+ def get_mirror(self, distro: str) -> str:
if self._is_virtual(distro):
distro = self._expand_depends(distro)[0]
- return self.get(distro, "uri") or self._mirror
+ return self.get_field(distro, "uri") or self._mirror
- def get_distribution(self, distro):
+ def get_distribution(self, distro: str) -> str:
if self._is_virtual(distro):
distro = self._expand_depends(distro)[0]
- return self.get(distro, "distribution") or distro
+ return self.get_field(distro, "distribution") or distro
- def get_candidates(self, distro):
- return (self.get(distro, "candidates") or "").split() or [distro]
+ def get_candidates(self, distro: str) -> list[str]:
+ return (self.get_field(distro, "candidates") or "").split() or [distro]
- def _get_packages_url(self, distro, area, arch):
+ def _get_packages_url(self, distro: str, area: str, arch: str) -> str:
return "%s/dists/%s/%s/binary-%s/Packages" % (
self.get_mirror(distro),
self.get_distribution(distro),
@@ -196,26 +200,26 @@ class DistroConfig(UserDict):
arch,
)
- def get_packages_urls(self, distro, area, arch):
+ def get_packages_urls(self, distro: str, area: str, arch: str) -> list[str]:
return [self._get_packages_url(d, area, arch) for d in self.get_candidates(distro)]
- def _get_sources_url(self, distro, area):
+ def _get_sources_url(self, distro: str, area: str) -> str:
return "%s/dists/%s/%s/source/Sources" % (
self.get_mirror(distro),
self.get_distribution(distro),
area,
)
- def get_sources_urls(self, distro, area):
+ def get_sources_urls(self, distro: str, area: str) -> list[str]:
return [self._get_sources_url(d, area) for d in self.get_candidates(distro)]
- def get_target_flags(self, distro):
- tr = self.get(distro, "target-release")
+ def get_target_flags(self, distro: str) -> list[str]:
+ tr = self.get_field(distro, "target-release")
if tr:
return ["-t", tr]
return []
- def _expand_depends(self, distro, include_virtual=False):
+ def _expand_depends(self, distro: str, include_virtual: bool = False) -> list[str]:
todo = [distro]
done = []
seen = []
@@ -224,25 +228,26 @@ class DistroConfig(UserDict):
todo = todo[1:]
if curr not in seen:
seen.append(curr)
- todo = (self.get(curr, "depends") or "").split() + [curr] + todo
+ todo = (self.get_field(curr, "depends") or "").split() + [curr] + todo
elif curr not in done:
if include_virtual or not self._is_virtual(curr):
done.append(curr)
assert len(done) > 0
return done
- def get_deb_lines(self, distro, components):
+ def get_deb_lines(self, distro: str, components: list[str]) -> list[str]:
lines = []
for d in self._expand_depends(distro):
+ d_components = self[d]["components"]
for c in components:
- if self[d]["components"] is None or c in self[d]["components"].split():
+ if d_components is None or c in d_components.split():
lines.append("deb %s %s %s" % (self.get_mirror(d), self.get_distribution(d), c))
return lines
- def get_basetgz(self, distro, arch, merged_usr=True):
+ def get_basetgz(self, distro: str, arch: str, merged_usr: bool = True) -> str | None:
# look for the first base distribution
for d in self._expand_depends(distro):
- if self.get(d, "depends"):
+ if self.get_field(d, "depends"):
next # skip partial distro
return "%s%s_%s.tar.gz" % (
self.get_distribution(d),
View it on GitLab: https://salsa.debian.org/debian/piuparts/-/commit/cd0c859fd5204b7137a9849e19024a44d4b6a3eb
--
View it on GitLab: https://salsa.debian.org/debian/piuparts/-/commit/cd0c859fd5204b7137a9849e19024a44d4b6a3eb
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/piuparts-devel/attachments/20250721/d4a9f896/attachment-0001.htm>
More information about the Piuparts-devel
mailing list