[Pkg-freeipa-devel] [Git][freeipa-team/freeipa-healthcheck][master] 23 commits: Handle slight different in exception output in Python 3.10

Timo Aaltonen (@tjaalton) gitlab at salsa.debian.org
Wed Oct 20 10:24:05 BST 2021



Timo Aaltonen pushed to branch master at FreeIPA packaging / freeipa-healthcheck


Commits:
733fd7a2 by Rob Crittenden at 2021-01-14T21:04:11+01:00
Handle slight different in exception output in Python 3.10

Python 3.10 adds the class name to the exception so a
x = Result() failure will return:

Result.__init__() missing...

instead as with previous python versions:

__init__() missing...

https://bugzilla.redhat.com/show_bug.cgi?id=1915256

- - - - -
a6897503 by Rob Crittenden at 2021-01-14T21:30:09+01:00
In installation check be specific about server

Previously if IPA server was not configured the message was
"IPA is not configured." If ipa-healthcheck is run on a client
then this could be misleading.

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
3b921ba2 by Rob Crittenden at 2021-01-14T17:02:31-05:00
Check for a host certificate to avoid a false positive tracking

It is possible to have a machine certificate tracked by certmonger
for a promoted server. Include it in the list of possible
certificates for the purpose of avoiding a false positive of an
unknown cert.

https://github.com/freeipa/freeipa-healthcheck/issues/180

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
feb1592a by Rob Crittenden at 2021-01-18T11:27:23-05:00
Failed expected group should be a string, not a list

This was caused by the code to allow multiple possible
file owner and group. The expected value for group wasn't
being converted into a string so was kept as a list.

Test updated to catch this situation.

https://github.com/freeipa/freeipa-healthcheck/issues/183

- - - - -
69ecaf49 by Antonio Torres at 2021-02-16T11:29:45-05:00
Return user-friendly message when no issues found

Return user-friendly message instead of empty string
when no issues found and using the "human" output type.

Fixes: https://bugzilla.redhat.com/show_bug.cgi?id=1780062
Signed-off-by: Antonio Torres <antorres at redhat.com>

- - - - -
3f6ed439 by Antonio Torres at 2021-02-25T15:48:31-05:00
Add check for IPA KRA Agent

Add check to validate KRA Agent in case KRA is installed, including
checking for the KRA Agent LDAP entry.

Fixes: https://bugzilla.redhat.com/show_bug.cgi?id=1894781
Signed-off-by: Antonio Torres <antorres at redhat.com>

- - - - -
a6504bd7 by Antonio Torres at 2021-02-25T15:48:31-05:00
Add tests for KRA Agent validation

Add unit tests for KRA Agent validation.

Signed-off-by: Antonio Torres <antorres at redhat.com>

- - - - -
56d68e8d by Rob Crittenden at 2021-05-14T13:56:04-04:00
Add a --version cli option

I chose the pkg_resources method since it seemed the most
straightforward.

https://github.com/freeipa/freeipa-healthcheck/issues/189

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
76961874 by Rob Crittenden at 2021-05-14T16:19:39-04:00
Replace the soon-to-be EOL F32 container with F34

Also drop testing using python 3.8 since both F33 and 34 use
3.9 by default.

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
b0b782da by Rob Crittenden at 2021-05-19T13:26:54-04:00
Restore the log level after loading the resources

It seems that loading the resources can affect the log level so
save it off before loading them and then restore it.

I'm not entirely sure what is happening but this is an easy fix
and we can address it further later if we want.

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
ac14c6a3 by Rob Crittenden at 2021-05-19T13:30:13-04:00
Log and skip missing expected tracking in IPACertDNSSAN

This state is already reported by IPACertTracking so
avoid double-reporting it.

Also fix a missing space in a split line which caused
tracked by to display as trackedby.

https://github.com/freeipa/freeipa-healthcheck/issues/203

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
f762f8f5 by Antonio Torres at 2021-05-20T10:29:35-04:00
Add checks to detect mismatch of certificates

Add checks to detect mismatch of certificates between LDAP
and NSS databases. Check for existance of entries as well as
ensure the certificates match between the different databases.

Related: https://bugzilla.redhat.com/show_bug.cgi?id=1886770
Signed-off-by: Antonio Torres <antorres at redhat.com>

- - - - -
94ca49d9 by Antonio Torres at 2021-05-20T10:29:35-04:00
Add tests for certificate mismatch detection

Add tests for the IPACertMatchCheck and IPADogtagCertsMatchCheck plugins.

Related: https://bugzilla.redhat.com/show_bug.cgi?id=1886770
Signed-off-by: Antonio Torres <antorres at redhat.com>

- - - - -
9d6c6a8c by Rob Crittenden at 2021-06-02T08:36:26-04:00
Add log files to the set of files checked for owner/group/mode

Extend the list of files to be checked to include most IPA service
log files.

https://bugzilla.redhat.com/show_bug.cgi?id=1780020

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
d981f049 by Rob Crittenden at 2021-06-02T08:36:50-04:00
Replace or drop print() statements

The print in dogtag/ca.py was supposed to be a debug statement.
Converted it to use logger and fixed a related issue that was
calling logging.debug instead of logger.debug.

ipa/plugin.py didn't assign logger at all and was still using
a direct logging.debug statement.

Drop a print() statement from a test, probably leftover from
troubleshooting a failure.

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

https://github.com/freeipa/freeipa-healthcheck/issues/207

- - - - -
a63d5ac0 by Rob Crittenden at 2021-06-02T11:58:06-04:00
Don't collect the CRLManager role if the CA is not configured

This was raising a false positive in the IPA CA-less case.

https://github.com/freeipa/freeipa-healthcheck/issues/201

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
970ffd31 by Rob Crittenden at 2021-06-02T11:58:06-04:00
Filter out the pki healthcheck sources if IPA CA is not installed

The pki checks spew the error "Invalid PKI instance: pki-tomcat" so
we need to suppress them in the IPA CA-less installation case.

So if the IPA CA is not configured then don't register the
pki sources.

A side-effect is that to user the sources will not be listed at
all in this case.

This should not affect pki-healthcheck and it will continue to
return errors in the unconfigured case.

https://github.com/freeipa/freeipa-healthcheck/issues/201

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
501554e0 by Rob Crittenden at 2021-06-08T11:54:25-04:00
Add dirsrv dependency to IPAHostKeytab plugin

The KDC won't work without its backend, add a dependency
on 389-ds

https://bugzilla.redhat.com/show_bug.cgi?id=1776687

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
c5a48fb2 by Rob Crittenden at 2021-06-08T11:54:25-04:00
Add service check dependencies

Since 389-ds is the heart of IPA there may not be a point in checking
all dependent services. ipa-dnskeysyncd in particular doesn't like
when it can't connect and tries to restart itself multiple times.

Note that this currently works because the services are sorted
alphabetically and dirsrv appears near the top. Re-ordering may be
necessary in the future.

I'm choosing not to add dirsrv to the other services because they
return cleanly if it is not available.

https://bugzilla.redhat.com/show_bug.cgi?id=1776687

Signed-off-by: Rob Crittenden <rcritten at redhat.com>

- - - - -
55cb92b0 by Rob Crittenden at 2021-06-08T13:16:20-04:00
Become 0.9

- - - - -
e8631810 by Timo Aaltonen at 2021-10-20T10:50:07+03:00
Merge branch 'upstream'

- - - - -
0d7945c9 by Timo Aaltonen at 2021-10-20T10:50:29+03:00
bump version

- - - - -
2366731f by Timo Aaltonen at 2021-10-20T12:23:16+03:00
releasing package freeipa-healthcheck version 0.9-1

- - - - -


22 changed files:

- .github/workflows/pipelines.yml
- README.md
- debian/changelog
- setup.py
- src/ipahealthcheck/core/core.py
- src/ipahealthcheck/core/files.py
- src/ipahealthcheck/core/main.py
- src/ipahealthcheck/core/output.py
- src/ipahealthcheck/dogtag/ca.py
- src/ipahealthcheck/ipa/certs.py
- src/ipahealthcheck/ipa/files.py
- src/ipahealthcheck/ipa/host.py
- src/ipahealthcheck/ipa/plugin.py
- src/ipahealthcheck/ipa/roles.py
- src/ipahealthcheck/meta/services.py
- + tests/test_commands.py
- tests/test_core_files.py
- tests/test_ipa_agent.py
- + tests/test_ipa_cert_match.py
- tests/test_ipa_roles.py
- tests/test_ipa_trust.py
- tests/test_results.py


Changes:

=====================================
.github/workflows/pipelines.yml
=====================================
@@ -10,7 +10,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        python-version: [3.8, 3.9]
+        python-version: [3.9]
 
     steps:
     - uses: actions/checkout at v2
@@ -35,7 +35,7 @@ jobs:
     runs-on: ubuntu-latest
     strategy:
       matrix:
-        fedora-release: [32, 33]
+        fedora-release: [33, 34]
 
     steps:
     - uses: actions/checkout at v2


=====================================
README.md
=====================================
@@ -507,6 +507,30 @@ The trust for certificates stored in NSS databases is compared against a known g
       }
     }
 
+### IPACertMatchCheck
+Ensure CA certificate entries in LDAP and NSS databases match.
+
+    {
+      "source": "ipahealthcheck.ipa.certs",
+      "check": "IPACertMatchCheck",
+      "result": "ERROR",
+      "kw": {
+        "msg": "CA Certificate from /etc/ipa/nssdb does not match /etc/ipa/ca.crt"
+      }
+    }
+
+### IPADogtagCertsMatchCheck
+Check if Dogtag certificates present in both NSS DB and LDAP match.
+
+    {
+      "source": "ipahealthcheck.ipa.certs",
+      "check": "IPADogtagCertsMatchCheck",
+      "result": "ERROR",
+      "kw": {
+        "msg": "'subsystemCert cert-pki-ca' certificate in NSS DB does not match entry in LDAP"
+      }
+    }
+
 ### IPANSSChainValidation
 Validate the certificate chain of the NSS certificates. This executes: certutil -V -u V -e -d [dbdir] -n [nickname].
 
@@ -547,7 +571,21 @@ Verify the description and userCertificate values in uid=ipara,ou=People,o=ipaca
       "kw": {
         "expected": "2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST",
         "got": "2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST",
-        "msg": "RA agent description does not match 2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST in LDAP and expected 2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST"
+        "msg": "RA agent description does not match. Found 2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST in LDAP and expected 2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST"
+      }
+    }
+
+### IPAKRAAgent
+Verify the description and userCertificate values in uid=ipakra,ou=people,o=kra,o=ipaca.
+
+    {
+      "source": "ipahealthcheck.ipa.certs",
+      "check": "IPAKRAAgent",
+      "result": "ERROR",
+      "kw": {
+        "expected": "2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST",
+        "got": "2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST",
+        "msg": "KRA agent description does not match. Found 2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST in LDAP and expected 2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST"
       }
     }
 


=====================================
debian/changelog
=====================================
@@ -1,3 +1,9 @@
+freeipa-healthcheck (0.9-1) unstable; urgency=medium
+
+  * New upstream release.
+
+ -- Timo Aaltonen <tjaalton at debian.org>  Wed, 20 Oct 2021 12:21:52 +0300
+
 freeipa-healthcheck (0.8-1) unstable; urgency=medium
 
   * New upstream release.


=====================================
setup.py
=====================================
@@ -3,7 +3,7 @@ from setuptools import find_packages, setup
 
 setup(
     name='ipahealthcheck',
-    version='0.8',
+    version='0.9',
     namespace_packages=['ipahealthcheck', 'ipaclustercheck'],
     package_dir={'': 'src'},
     # packages=find_packages(where='src'),


=====================================
src/ipahealthcheck/core/core.py
=====================================
@@ -21,12 +21,15 @@ logger = logging.getLogger()
 
 
 def find_registries(entry_points):
+    # Loading the resources may reset the log level, save it.
+    log_level = logger.level
     registries = {}
     for entry_point in entry_points:
         registries.update({
             ep.name: ep.resolve()
             for ep in pkg_resources.iter_entry_points(entry_point)
         })
+    logger.setLevel(log_level)
     return registries
 
 
@@ -81,6 +84,26 @@ def run_service_plugins(plugins, source, check):
         if not isinstance(plugin, ServiceCheck):
             continue
 
+        # Try to save some time to not check dependent services if the
+        # parent is down.
+        if not set(plugin.requires).issubset(available):
+            # A required service is not available. Either it hasn't been
+            # checked yet or it isn't running. If not running break.
+            running = True
+            for result in results.results:
+                if result.check in plugin.requires:
+                    # if not in available but in results the service failed
+                    running = False
+                    break
+            if not running:
+                logger.debug(
+                    'Skipping %s:%s because %s service(s) not running',
+                    plugin.__class__.__module__,
+                    plugin.__class__.__name__,
+                    ', '.join(set(plugin.requires) - set(available))
+                )
+                continue
+
         logger.debug('Calling check %s', plugin)
         for result in plugin.check():
             # always run the service checks so dependencies work
@@ -157,6 +180,8 @@ def add_default_options(parser, output_registry, default_output):
                         default=default_output, help='Output method')
     parser.add_argument('--output-file', dest='outfile', default=None,
                         help='File to store output')
+    parser.add_argument('--version', dest='version', action='store_true',
+                        help='Report the version number and exit')
 
 
 def add_output_options(parser, output_registry):
@@ -243,6 +268,16 @@ class RunChecks:
         options = parse_options(self.parser)
         self.options = options
 
+        if options.version:
+            for registry in self.entry_points:
+                name = registry.split('.')[0]
+                try:
+                    version = pkg_resources.get_distribution(name).version
+                except pkg_resources.DistributionNotFound:
+                    continue
+                print('%s: %s' % (name, version))
+            return 0
+
         # pylint: disable=assignment-from-none
         rval = self.validate_options()
         # pylint: enable=assignment-from-none
@@ -265,6 +300,13 @@ class RunChecks:
         if rval is not None:
             return rval
 
+        # If we have IPA configured without a CA then we want to skip
+        # the pkihealthcheck plugins otherwise they will generated a
+        # lot of false positives. The IPA plugins are loaded first so
+        # which should set ca_configured in its registry to True or
+        # False. We will skip the pkihealthcheck plugins only if
+        # ca_configured is False which means that it was set by IPA.
+        ca_configured = None
         for name, registry in find_registries(self.entry_points).items():
             try:
                 registry.initialize(framework, config, options)
@@ -276,6 +318,11 @@ class RunChecks:
                 except Exception as e:
                     logger.error("Unable to initialize %s: %s", name, e)
                     continue
+            if hasattr(registry, 'ca_configured'):
+                ca_configured = registry.ca_configured
+            if 'pkihealthcheck' in name and ca_configured is False:
+                logger.debug('IPA CA is not configured, skipping %s', name)
+                continue
             for plugin in find_plugins(name, registry):
                 plugins.append(plugin)
 


=====================================
src/ipahealthcheck/core/files.py
=====================================
@@ -98,6 +98,7 @@ class FileCheck:
                     msg = 'Group of %s is %s and should ' \
                           'be one of %s' % \
                           (path, actual.gr_name, ','.join(group))
+                group = ','.join(group)
                 yield Result(self, constants.WARNING, key=key,
                              path=path, type='group', expected=group,
                              got=actual.gr_name,


=====================================
src/ipahealthcheck/core/main.py
=====================================
@@ -15,7 +15,7 @@ from ipaserver.install.installutils import is_ipa_configured
 class IPAChecks(RunChecks):
     def pre_check(self):
         if not is_ipa_configured():
-            print("IPA is not configured")
+            print("IPA server is not configured")
             return 1
 
         return None


=====================================
src/ipahealthcheck/core/output.py
=====================================
@@ -126,6 +126,8 @@ class Human(Output):
     options = ()
 
     def generate(self, data):
+        if not data:
+            return "No issues found.\n"
         output = ''
         for line in data:
             kw = line.get('kw')


=====================================
src/ipahealthcheck/dogtag/ca.py
=====================================
@@ -53,14 +53,14 @@ class DogtagCertsConfigCheck(DogtagPlugin):
         db = certs.CertDB(api.env.realm, paths.PKI_TOMCAT_ALIAS_DIR)
         for nickname, _trust_flags in db.list_certs():
             if nickname in skip:
-                logging.debug('Skipping nickname %s because it isn\'t in '
-                              'the configuration file')
+                logger.debug('Skipping nickname %s because it isn\'t in '
+                             'the configuration file')
                 continue
             try:
                 val = get_directive(paths.CA_CS_CFG_PATH,
                                     blobs[nickname], '=')
             except KeyError:
-                print("%s not found, assuming 3rd party" % nickname)
+                logger.debug("%s not found, assuming 3rd party", nickname)
                 continue
             if val is None:
                 yield Result(self, constants.ERROR,


=====================================
src/ipahealthcheck/ipa/certs.py
=====================================
@@ -29,6 +29,7 @@ from ipaserver.plugins import ldap2
 from ipapython import certdb
 from ipapython import ipautil
 from ipapython.dn import DN
+from ipapython.ipaldap import realm_to_serverid
 
 logger = logging.getLogger()
 DAY = 60 * 60 * 24
@@ -145,6 +146,23 @@ def get_expected_requests(ca, ds, serverid):
     else:
         logger.debug('No KDC pkinit certificate')
 
+    # See if a host certificate was issued. This is only to
+    # prevent a false-positive if one is indeed installed.
+    local = {
+        paths.IPA_NSSDB_DIR: 'Local IPA host',
+        paths.NSS_DB_DIR: 'IPA Machine Certificate - %s' % socket.getfqdn(),
+    }
+    for db, nickname in local.items():
+        nssdb = certdb.NSSDatabase(db)
+        if nssdb.has_nickname(nickname):
+            requests.append(
+                {
+                    'cert-database': db,
+                    'cert-nickname': nickname,
+                    'ca-name': 'IPA',
+                }
+            )
+
     return requests
 
 
@@ -373,7 +391,7 @@ class IPACertTracking(IPAPlugin):
                              key=request_id,
                              error=str(e),
                              msg='Found request id {key} but it is not tracked'
-                                 'by certmonger!?: {error}')
+                                 ' by certmonger!?: {error}')
                 continue
 
             # The criteria was not met
@@ -418,10 +436,14 @@ class IPACertDNSSAN(IPAPlugin):
         for request in requests:
             request_id = certmonger.get_request_id(request)
             if request_id is None:
-                yield Result(self, constants.ERROR,
-                             key=request_id,
-                             msg='Found request id {key} but it is not tracked'
-                                 'by certmonger!?')
+                # log and skip. Missed tracking is reported by IPACertTracking
+                flatten = ', '.join("{!s}={!s}".format(key, val)
+                                    for (key, val) in request.items())
+
+                logger.debug(
+                    "Skipping %s since it is handled by IPACertTracking",
+                    flatten
+                )
                 continue
 
             ca_name = certmonger.get_request_value(request_id, 'ca-name')
@@ -566,6 +588,224 @@ class IPACertNSSTrust(IPAPlugin):
                     'verifying trust')
 
 
+ at registry
+class IPACertMatchCheck(IPAPlugin):
+    """
+    Ensure certificates match between LDAP and NSS databases
+    """
+
+    requires = ('dirsrv',)
+
+    def get_cert_list_from_db(self, nssdb, nickname):
+        """
+        Retrieve all certificates from an NSS database for nickname.
+        """
+        try:
+            args = ["-L", "-n", nickname, "-a"]
+            result = nssdb.run_certutil(args, capture_output=True)
+            return x509.load_certificate_list(result.raw_output)
+        except ipautil.CalledProcessError:
+            return []
+
+    @duration
+    def check(self):
+        if not self.ca.is_configured():
+            logger.debug("No CA configured, skipping certificate match check")
+            return
+
+        # Ensure /etc/ipa/ca.crt matches the NSS DB CA certificates
+        def match_cacert_and_db(plugin, cacerts, dbpath):
+            db = certs.CertDB(api.env.realm, dbpath)
+            nickname = '%s IPA CA' % api.env.realm
+            try:
+                dbcacerts = self.get_cert_list_from_db(db, nickname)
+            except Exception as e:
+                yield Result(plugin, constants.ERROR,
+                             error=str(e),
+                             msg='Unable to load CA cert: {error}')
+                return False
+
+            ok = True
+            for cert in dbcacerts:
+                if cert not in cacerts:
+                    ok = False
+                    yield Result(plugin, constants.ERROR,
+                                 nickname=nickname,
+                                 serial_number=cert.serial_number,
+                                 dbdir=dbpath,
+                                 certdir=paths.IPA_CA_CRT,
+                                 msg=('CA Certificate nickname {nickname} '
+                                      'with serial number {serial} '
+                                      'is in {dbdir} but is not in'
+                                      '%s' % paths.IPA_CA_CRT))
+            return ok
+
+        try:
+            cacerts = x509.load_certificate_list_from_file(paths.IPA_CA_CRT)
+        except Exception:
+            yield Result(self, constants.ERROR,
+                         path=paths.IPA_CA_CRT,
+                         msg='Unable to load CA cert file {path}: {error}')
+            return
+
+        # Ensure CA cert entry from LDAP matches /etc/ipa/ca.crt
+        dn = DN('cn=%s IPA CA' % api.env.realm,
+                'cn=certificates,cn=ipa,cn=etc',
+                api.env.basedn)
+        try:
+            entry = self.conn.get_entry(dn)
+        except errors.NotFound:
+            yield Result(self, constants.ERROR,
+                         dn=str(dn),
+                         msg='CA Certificate entry \'{dn}\' '
+                             'not found in LDAP')
+            return
+
+        cacerts_ok = True
+        # Are all the certs in LDAP for the IPA CA in /etc/ipa/ca.crt
+        for cert in entry['CACertificate']:
+            if cert not in cacerts:
+                cacerts_ok = False
+                yield Result(self, constants.ERROR,
+                             dn=str(dn),
+                             serial_number=cert.serial_number,
+                             msg=('CA Certificate serial number {serial} is '
+                                  'in LDAP \'{dn}\' but is not in '
+                                  '%s' % paths.IPA_CA_CRT))
+
+        # Ensure NSS DBs have matching CA certs for /etc/ipa/ca.crt
+        serverid = realm_to_serverid(api.env.realm)
+        dspath = paths.ETC_DIRSRV_SLAPD_INSTANCE_TEMPLATE % serverid
+
+        cacertds_ok = yield from match_cacert_and_db(self, cacerts, dspath)
+        cacertnss_ok = yield from match_cacert_and_db(self, cacerts,
+                                                      paths.IPA_NSSDB_DIR)
+        if cacerts_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=paths.IPA_CA_CRT)
+        if cacertds_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=dspath)
+        if cacertnss_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=paths.IPA_NSSDB_DIR)
+
+
+ at registry
+class IPADogtagCertsMatchCheck(IPAPlugin):
+    """
+    Check if dogtag certs present in both NSS DB and LDAP match
+    """
+    requires = ('dirsrv',)
+
+    @duration
+    def check(self):
+        if not self.ca.is_configured():
+            logger.debug('CA is not configured, skipping connectivity check')
+            return
+
+        def match_ldap_nss_cert(plugin, ldap, db, cert_dn, attr, cert_nick):
+            try:
+                entry = ldap.get_entry(cert_dn)
+            except errors.NotFound:
+                yield Result(plugin, constants.ERROR,
+                             msg='%s entry not found in LDAP' % cert_dn)
+                return False
+            try:
+                nsscert = db.get_cert_from_db(cert_nick)
+            except Exception as e:
+                yield Result(plugin, constants.ERROR,
+                             error=str(e),
+                             msg=('Unable to load %s certificate:'
+                                  '{error}' % cert_nick))
+                return False
+            cert_matched = any([cert == nsscert for cert in entry[attr]])
+            if not cert_matched:
+                yield Result(plugin, constants.ERROR,
+                             key=cert_nick,
+                             nickname=cert_nick,
+                             dbdir=db.secdir,
+                             msg=('{nickname} certificate in NSS DB {dbdir} '
+                                  'does not match entry in LDAP'))
+                return False
+            return True
+
+        def match_ldap_nss_certs_by_subject(plugin, ldap, db, dn,
+                                            expected_nicks_subjects):
+            entries = ldap.get_entries(dn)
+            all_ok = True
+            for nick, subject in expected_nicks_subjects.items():
+                cert = db.get_cert_from_db(nick)
+                ok = any([cert in entry['userCertificate'] and
+                          subject == entry['subjectName'][0]
+                          for entry in entries
+                          if 'userCertificate' in entry])
+                if not ok:
+                    all_ok = False
+                    yield Result(plugin, constants.ERROR,
+                                 key=nick,
+                                 nickname=nick,
+                                 dbdir=db.secdir,
+                                 msg=('{nickname} certificate in NSS DB '
+                                      '{dbdir} does not match entry in LDAP'))
+            return all_ok
+
+        db = certs.CertDB(api.env.realm, paths.PKI_TOMCAT_ALIAS_DIR)
+        dn = DN('uid=pkidbuser,ou=people,o=ipaca')
+        subsystem_nick = 'subsystemCert cert-pki-ca'
+        subsystem_ok = yield from match_ldap_nss_cert(self, self.conn,
+                                                      db, dn,
+                                                      'userCertificate',
+                                                      subsystem_nick)
+        dn = DN('cn=%s IPA CA' % api.env.realm,
+                'cn=certificates,cn=ipa,cn=etc',
+                api.env.basedn)
+        casigning_nick = 'caSigningCert cert-pki-ca'
+        casigning_ok = yield from match_ldap_nss_cert(self, self.conn,
+                                                      db, dn, 'CACertificate',
+                                                      casigning_nick)
+
+        expected_nicks_subjects = {
+            'ocspSigningCert cert-pki-ca':
+                'CN=OCSP Subsystem,O=%s' % api.env.realm,
+            'subsystemCert cert-pki-ca':
+                'CN=CA Subsystem,O=%s' % api.env.realm,
+            'auditSigningCert cert-pki-ca':
+                'CN=CA Audit,O=%s' % api.env.realm,
+            'Server-Cert cert-pki-ca':
+                'CN=%s,O=%s' % (api.env.host, api.env.realm),
+        }
+
+        kra = krainstance.KRAInstance(api.env.realm)
+        if kra.is_installed():
+            kra_expected_nicks_subjects = {
+                'transportCert cert-pki-kra':
+                    'CN=KRA Transport Certificate,O=%s' % api.env.realm,
+                'storageCert cert-pki-kra':
+                    'CN=KRA Storage Certificate,O=%s' % api.env.realm,
+                'auditSigningCert cert-pki-kra':
+                    'CN=KRA Audit,O=%s' % api.env.realm,
+            }
+            expected_nicks_subjects.update(kra_expected_nicks_subjects)
+
+        ipaca_basedn = DN('ou=certificateRepository,ou=ca,o=ipaca')
+        ipaca_certs_ok = yield from match_ldap_nss_certs_by_subject(
+                                    self, self.conn, db,
+                                    ipaca_basedn,
+                                    expected_nicks_subjects
+                                )
+
+        if subsystem_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=subsystem_nick)
+        if casigning_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=casigning_nick)
+        if ipaca_certs_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=str(ipaca_basedn))
+
+
 @registry
 class IPANSSChainValidation(IPAPlugin):
     """Validate the certificate chain of the certs."""
@@ -707,6 +947,83 @@ class IPAOpenSSLChainValidation(IPAPlugin):
                         self, constants.SUCCESS, key=cert)
 
 
+def check_agent(plugin, base_dn, agent_type):
+    """Check RA/KRA Agent"""
+
+    try:
+        cert = x509.load_certificate_from_file(paths.RA_AGENT_PEM)
+    except Exception as e:
+        yield Result(plugin, constants.ERROR,
+                     error=str(e),
+                     msg='Unable to load RA cert: {error}')
+        return
+    serial_number = cert.serial_number
+    subject = DN(cert.subject)
+    issuer = DN(cert.issuer)
+    description = '2;%d;%s;%s' % (serial_number, issuer, subject)
+    logger.debug('%s agent description should be %s', agent_type, description)
+    db_filter = ldap2.ldap2.combine_filters(
+        [
+            ldap2.ldap2.make_filter({'objectClass': 'inetOrgPerson'}),
+            ldap2.ldap2.make_filter(
+                {'description': ';%s;%s' % (issuer, subject)},
+                exact=False, trailing_wildcard=False),
+        ],
+        ldap2.ldap2.MATCH_ALL)
+    try:
+        entries = plugin.conn.get_entries(base_dn,
+                                          plugin.conn.SCOPE_SUBTREE,
+                                          db_filter)
+    except errors.NotFound:
+        yield Result(plugin, constants.ERROR,
+                     description=description,
+                     msg='%s agent not found in LDAP' % agent_type)
+        return
+    except Exception as e:
+        yield Result(plugin, constants.ERROR,
+                     error=str(e),
+                     msg='Retrieving %s agent from LDAP failed {error}'
+                         % agent_type)
+        return
+    else:
+        logger.debug('%s agent description is %s', agent_type, description)
+        if len(entries) != 1:
+            yield Result(plugin, constants.ERROR,
+                         found=len(entries),
+                         msg='Too many %s agent entries found, {found}'
+                             % agent_type)
+            return
+        entry = entries[0]
+        raw_desc = entry.get('description')
+        if raw_desc is None:
+            yield Result(plugin, constants.ERROR,
+                         msg='%s agent is missing the description '
+                             'attribute or it is not readable' % agent_type)
+            return
+        ra_desc = raw_desc[0]
+        ra_certs = entry.get('usercertificate')
+        if ra_desc != description:
+            yield Result(plugin, constants.ERROR,
+                         expected=description,
+                         got=ra_desc,
+                         msg='%s agent description does not match. Found '
+                         '{got} in LDAP and expected {expected}' % agent_type)
+            return
+        found = False
+        for candidate in ra_certs:
+            if candidate == cert:
+                found = True
+                break
+        if not found:
+            yield Result(plugin, constants.ERROR,
+                         certfile=paths.RA_AGENT_PEM,
+                         dn=str(entry.dn),
+                         msg='%s agent certificate in {certfile} not '
+                             'found in LDAP userCertificate attribute '
+                             'for the entry {dn}' % agent_type)
+        yield Result(plugin, constants.SUCCESS)
+
+
 @registry
 class IPARAAgent(IPAPlugin):
     """Validate the RA Agent used to talk to the CA
@@ -722,82 +1039,32 @@ class IPARAAgent(IPAPlugin):
             logger.debug('CA is not configured, skipping RA Agent check')
             return
 
-        try:
-            cert = x509.load_certificate_from_file(paths.RA_AGENT_PEM)
-        except Exception as e:
-            yield Result(self, constants.ERROR,
-                         error=str(e),
-                         msg='Unable to load RA cert: {error}')
-            return
+        base_dn = DN('uid=ipara,ou=people,o=ipaca')
+        yield from check_agent(self, base_dn, 'RA')
 
-        serial_number = cert.serial_number
-        subject = DN(cert.subject)
-        issuer = DN(cert.issuer)
-        description = '2;%d;%s;%s' % (serial_number, issuer, subject)
 
-        logger.debug('RA agent description should be %s', description)
+ at registry
+class IPAKRAAgent(IPAPlugin):
+    """Validate the KRA Agent
 
-        db_filter = ldap2.ldap2.combine_filters(
-            [
-                ldap2.ldap2.make_filter({'objectClass': 'inetOrgPerson'}),
-                ldap2.ldap2.make_filter({'sn': 'ipara'}),
-                ldap2.ldap2.make_filter(
-                    {'description': ';%s;%s' % (issuer, subject)},
-                    exact=False, trailing_wildcard=False),
-            ],
-            ldap2.ldap2.MATCH_ALL)
+       Compare the description and usercertificate values.
+    """
 
-        base_dn = DN(('o', 'ipaca'))
-        try:
-            entries = self.conn.get_entries(base_dn,
-                                            self.conn.SCOPE_SUBTREE,
-                                            db_filter)
-        except errors.NotFound:
-            yield Result(self, constants.ERROR,
-                         description=description,
-                         msg='RA agent not found in LDAP')
+    requires = ('dirsrv',)
+
+    @duration
+    def check(self):
+        if not self.ca.is_configured():
+            logger.debug('CA is not configured, skipping KRA Agent check')
             return
-        except Exception as e:
-            yield Result(self, constants.ERROR,
-                         error=str(e),
-                         msg='Retrieving RA agent from LDAP failed {error}')
+
+        kra = krainstance.KRAInstance(api.env.realm)
+        if not kra.is_installed():
+            logger.debug('KRA is not installed, skipping KRA Agent check')
             return
-        else:
-            logger.debug('RA agent description is %s', description)
-            if len(entries) != 1:
-                yield Result(self, constants.ERROR,
-                             found=len(entries),
-                             msg='Too many RA agent entries found, {found}')
-                return
-            entry = entries[0]
-            raw_desc = entry.get('description')
-            if raw_desc is None:
-                yield Result(self, constants.ERROR,
-                             msg='RA agent is missing the description '
-                                 'attribute or it is not readable')
-                return
-            ra_desc = raw_desc[0]
-            ra_certs = entry.get('usercertificate')
-            if ra_desc != description:
-                yield Result(self, constants.ERROR,
-                             expected=description,
-                             got=ra_desc,
-                             msg='RA agent description does not match. Found '
-                             '{got} in LDAP and expected {expected}')
-                return
-            found = False
-            for candidate in ra_certs:
-                if candidate == cert:
-                    found = True
-                    break
-            if not found:
-                yield Result(self, constants.ERROR,
-                             certfile=paths.RA_AGENT_PEM,
-                             dn=str(entry.dn),
-                             msg='RA agent certificate in {certfile} not '
-                                 'found in LDAP userCertificate attribute '
-                                 'for the entry {dn}')
-            yield Result(self, constants.SUCCESS)
+
+        base_dn = DN('uid=ipakra,ou=people,o=kra,o=ipaca')
+        yield from check_agent(self, base_dn, 'KRA')
 
 
 @registry


=====================================
src/ipahealthcheck/ipa/files.py
=====================================
@@ -2,6 +2,7 @@
 # Copyright (C) 2019 FreeIPA Contributors see COPYING for license
 #
 
+import glob
 import logging
 import os
 
@@ -96,6 +97,67 @@ class IPAFileCheck(IPAPlugin, FileCheck):
                           ('root', 'systemd-resolve'), '0644'))
         self.files.append((paths.HOSTS, 'root', 'root', '0644'))
 
+        # IPA log files that may vary by installation. Only verify
+        # those that exist
+        for filename in (
+            paths.IPABACKUP_LOG,
+            paths.IPARESTORE_LOG,
+            paths.IPACLIENT_INSTALL_LOG,
+            paths.IPACLIENT_UNINSTALL_LOG,
+            paths.IPAREPLICA_CA_INSTALL_LOG,
+            paths.IPAREPLICA_CONNCHECK_LOG,
+            paths.IPAREPLICA_INSTALL_LOG,
+            paths.IPASERVER_INSTALL_LOG,
+            paths.IPASERVER_KRA_INSTALL_LOG,
+            paths.IPASERVER_UNINSTALL_LOG,
+            paths.IPAUPGRADE_LOG,
+            paths.IPATRUSTENABLEAGENT_LOG,
+        ):
+            if os.path.exists(filename):
+                self.files.append((filename, 'root', 'root', '0600'))
+
+        self.files.append((paths.IPA_CUSTODIA_AUDIT_LOG,
+                          'root', 'root', '0644'))
+
+        self.files.append((paths.KADMIND_LOG, 'root', 'root', '0600'))
+        self.files.append((paths.KRB5KDC_LOG, 'root', 'root', '0640'))
+
+        inst = api.env.realm.replace('.', '-')
+        self.files.append((paths.SLAPD_INSTANCE_ACCESS_LOG_TEMPLATE % inst,
+                           'dirsrv', 'dirsrv', '0600'))
+        self.files.append((paths.SLAPD_INSTANCE_ERROR_LOG_TEMPLATE % inst,
+                           'dirsrv', 'dirsrv', '0600'))
+
+        self.files.append((paths.VAR_LOG_HTTPD_ERROR, 'root', 'root', '0644'))
+
+        for globpath in glob.glob("%s/debug*.log" % paths.TOMCAT_CA_DIR):
+            self.files.append((globpath, "pkiuser", "pkiuser", "0644"))
+
+        for globpath in glob.glob(
+            "%s/ca_audit*" % paths.TOMCAT_SIGNEDAUDIT_DIR
+        ):
+            self.files.append((globpath, 'pkiuser', 'pkiuser', '0640'))
+
+        for filename in ('selftests.log', 'system', 'transactions'):
+            self.files.append((
+                os.path.join(paths.TOMCAT_CA_DIR, filename),
+                'pkiuser', 'pkiuser', '0640'
+            ))
+
+        for globpath in glob.glob("%s/debug*.log" % paths.TOMCAT_KRA_DIR):
+            self.files.append((globpath, "pkiuser", "pkiuser", "0644"))
+
+        for globpath in glob.glob(
+            "%s/ca_audit*" % paths.TOMCAT_KRA_SIGNEDAUDIT_DIR
+        ):
+            self.files.append((globpath, 'pkiuser', 'pkiuser', '0640'))
+
+        for filename in ('selftests.log', 'system', 'transactions'):
+            self.files.append((
+                os.path.join(paths.TOMCAT_KRA_DIR, filename),
+                'pkiuser', 'pkiuser', '0640'
+            ))
+
         return FileCheck.check(self)
 
 


=====================================
src/ipahealthcheck/ipa/host.py
=====================================
@@ -23,7 +23,7 @@ logger = logging.getLogger()
 @registry
 class IPAHostKeytab(IPAPlugin):
     """Ensure the host keytab can get a TGT"""
-    requires = ('krb5kdc',)
+    requires = ('krb5kdc', 'dirsrv')
 
     @duration
     def check(self):


=====================================
src/ipahealthcheck/ipa/plugin.py
=====================================
@@ -15,8 +15,7 @@ from ipaserver.install import installutils
 
 from ipahealthcheck.core.plugin import Plugin, Registry
 
-
-logging.getLogger()
+logger = logging.getLogger()
 
 
 class IPAPlugin(Plugin):
@@ -34,6 +33,7 @@ class IPARegistry(Registry):
         super().__init__()
         self.trust_agent = False
         self.trust_controller = False
+        self.ca_configured = False
 
     def initialize(self, framework, config, options=None):
         super().initialize(framework, config)
@@ -56,7 +56,7 @@ class IPARegistry(Registry):
             try:
                 api.Backend.ldap2.connect()
             except (errors.CCacheError, errors.NetworkError) as e:
-                logging.debug('Failed to connect to LDAP: %s', e)
+                logger.debug('Failed to connect to LDAP: %s', e)
             return
 
         # This package is pulled in when the trust package is installed
@@ -85,5 +85,8 @@ class IPARegistry(Registry):
         if role.get('status') == 'enabled':
             self.trust_controller = True
 
+        ca = cainstance.CAInstance(api.env.realm, host_name=api.env.host)
+        self.ca_configured = ca.is_configured()
+
 
 registry = IPARegistry()


=====================================
src/ipahealthcheck/ipa/roles.py
=====================================
@@ -25,6 +25,8 @@ class IPACRLManagerCheck(IPAPlugin):
     """
     @duration
     def check(self):
+        if not self.ca.is_configured():
+            return
         try:
             enabled = self.ca.is_crlgen_enabled()
         except AttributeError:


=====================================
src/ipahealthcheck/meta/services.py
=====================================
@@ -92,6 +92,8 @@ class ipa_custodia(IPAServiceCheck):
 
 @registry
 class ipa_dnskeysyncd(IPAServiceCheck):
+    requires = ('dirsrv',)
+
     def check(self, instance=''):
         self.service_name = 'ipa-dnskeysyncd'
 


=====================================
tests/test_commands.py
=====================================
@@ -0,0 +1,15 @@
+#
+# Copyright (C) 2021 FreeIPA Contributors see COPYING for license
+#
+
+import os
+
+from ipapython.ipautil import run
+
+
+def test_version():
+    """
+    Test the --version option
+    """
+    output = run(['ipa-healthcheck', '--version'], env=os.environ)
+    assert 'ipahealthcheck' in output.raw_output.decode('utf-8')


=====================================
tests/test_core_files.py
=====================================
@@ -67,9 +67,17 @@ def test_files_owner(mock_stat):
     results = capture_results(f)
     my_results = get_results(results, 'owner')
     assert my_results.results[0].result == constants.WARNING
+    assert my_results.results[0].kw.get('got') == 'nobody'
+    assert my_results.results[0].kw.get('expected') == 'root'
+    assert my_results.results[0].kw.get('type') == 'owner'
+
     assert my_results.results[1].result == constants.SUCCESS
     assert my_results.results[2].result == constants.SUCCESS
+
     assert my_results.results[3].result == constants.WARNING
+    assert my_results.results[3].kw.get('got') == 'nobody'
+    assert my_results.results[3].kw.get('expected') == 'root,bin'
+    assert my_results.results[3].kw.get('type') == 'owner'
     assert my_results.results[3].kw.get('msg') == \
         'Ownership of fiz is nobody and should be one of root,bin'
 
@@ -97,9 +105,17 @@ def test_files_group(mock_stat):
     results = capture_results(f)
     my_results = get_results(results, 'group')
     assert my_results.results[0].result == constants.WARNING
+    assert my_results.results[0].kw.get('got') == 'nobody'
+    assert my_results.results[0].kw.get('expected') == 'root'
+    assert my_results.results[0].kw.get('type') == 'group'
+
     assert my_results.results[1].result == constants.SUCCESS
     assert my_results.results[2].result == constants.SUCCESS
+
     assert my_results.results[3].result == constants.WARNING
+    assert my_results.results[3].kw.get('got') == 'nobody'
+    assert my_results.results[3].kw.get('expected') == 'root,bin'
+    assert my_results.results[3].kw.get('type') == 'group'
     assert my_results.results[3].kw.get('msg') == \
         'Group of fiz is nobody and should be one of root,bin'
 


=====================================
tests/test_ipa_agent.py
=====================================
@@ -4,11 +4,11 @@
 
 from base import BaseTest
 from unittest.mock import Mock, patch
-from util import capture_results, CAInstance
+from util import capture_results, CAInstance, KRAInstance
 
 from ipahealthcheck.core import config, constants
 from ipahealthcheck.ipa.plugin import registry
-from ipahealthcheck.ipa.certs import IPARAAgent
+from ipahealthcheck.ipa.certs import IPARAAgent, IPAKRAAgent
 
 from ipalib import errors
 from ipapython.dn import DN
@@ -218,3 +218,173 @@ class TestNSSAgent(BaseTest):
         assert result.result == constants.SUCCESS
         assert result.source == 'ipahealthcheck.ipa.certs'
         assert result.check == 'IPARAAgent'
+
+
+class TestKRAAgent(BaseTest):
+    cert = IPACertificate()
+    patches = {
+        'ldap.initialize':
+        Mock(return_value=mock_ldap_conn()),
+        'ipaserver.install.krainstance.KRAInstance':
+        Mock(return_value=KRAInstance()),
+        'ipalib.x509.load_certificate_from_file':
+        Mock(return_value=cert),
+    }
+
+    def test_kra_agent_ok(self):
+
+        attrs = dict(
+            description=['2;1;CN=ISSUER;CN=RA AGENT'],
+            usercertificate=[self.cert],
+        )
+        fake_conn = LDAPClient('ldap://localhost', no_schema=True)
+        ldapentry = LDAPEntry(fake_conn,
+                              DN('uid=ipakra,ou=people,o=kra,o=ipaca'))
+        for attr, values in attrs.items():
+            ldapentry[attr] = values
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPAKRAAgent(registry)
+
+        f.conn = mock_ldap([ldapentry])
+        self.results = capture_results(f)
+
+        assert len(self.results) == 1
+
+        result = self.results.results[0]
+        assert result.result == constants.SUCCESS
+        assert result.source == 'ipahealthcheck.ipa.certs'
+        assert result.check == 'IPAKRAAgent'
+
+    def test_kra_agent_no_description(self):
+
+        attrs = dict(
+            usercertificate=[self.cert],
+        )
+        fake_conn = LDAPClient('ldap://localhost', no_schema=True)
+        ldapentry = LDAPEntry(fake_conn,
+                              DN('uid=ipakra,ou=people,o=kra,o=ipaca'))
+        for attr, values in attrs.items():
+            ldapentry[attr] = values
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPAKRAAgent(registry)
+
+        f.conn = mock_ldap([ldapentry])
+        self.results = capture_results(f)
+        result = self.results.results[0]
+
+        assert result.result == constants.ERROR
+        assert 'description' in result.kw.get('msg')
+
+    @patch('ipalib.x509.load_certificate_from_file')
+    def test_kra_agent_load_failure(self, mock_load_cert):
+
+        mock_load_cert.side_effect = IOError('test')
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPAKRAAgent(registry)
+
+        self.results = capture_results(f)
+        result = self.results.results[0]
+
+        assert result.result == constants.ERROR
+        assert result.kw.get('error') == 'test'
+
+    def test_kra_agent_no_entry_found(self):
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPAKRAAgent(registry)
+
+        f.conn = mock_ldap(None)  # None == NotFound
+        self.results = capture_results(f)
+        result = self.results.results[0]
+
+        assert result.result == constants.ERROR
+        assert result.kw.get('msg') == 'KRA agent not found in LDAP'
+
+    def test_kra_agent_too_many(self):
+
+        attrs = dict(
+            description=['2;1;CN=ISSUER;CN=RA AGENT'],
+            usercertificate=[self.cert],
+        )
+        fake_conn = LDAPClient('ldap://localhost', no_schema=True)
+        ldapentry = LDAPEntry(fake_conn,
+                              DN('uid=ipakra,ou=people,o=kra,o=ipaca'))
+        for attr, values in attrs.items():
+            ldapentry[attr] = values
+
+        ldapentry2 = LDAPEntry(fake_conn,
+                               DN('uid=ipakra,ou=people,o=kra,o=ipaca'))
+        for attr, values in attrs.items():
+            ldapentry[attr] = values
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPAKRAAgent(registry)
+
+        f.conn = mock_ldap([ldapentry, ldapentry2])
+        self.results = capture_results(f)
+        result = self.results.results[0]
+
+        assert result.result == constants.ERROR
+        assert result.kw.get('found') == 2
+
+    def test_kra_agent_nonmatching_cert(self):
+
+        cert2 = IPACertificate(2)
+
+        attrs = dict(
+            description=['2;1;CN=ISSUER;CN=RA AGENT'],
+            usercertificate=[cert2],
+        )
+        fake_conn = LDAPClient('ldap://localhost', no_schema=True)
+        ldapentry = LDAPEntry(fake_conn,
+                              DN('uid=ipakra,ou=people,o=kra,o=ipaca'))
+        for attr, values in attrs.items():
+            ldapentry[attr] = values
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPAKRAAgent(registry)
+
+        f.conn = mock_ldap([ldapentry])
+        self.results = capture_results(f)
+        result = self.results.results[0]
+
+        assert result.result == constants.ERROR
+        assert result.kw.get('certfile') == paths.RA_AGENT_PEM
+        assert result.kw.get('dn') == 'uid=ipakra,ou=people,o=kra,o=ipaca'
+
+    def test_kra_agent_multiple_certs(self):
+
+        cert2 = IPACertificate(2)
+
+        attrs = dict(
+            description=['2;1;CN=ISSUER;CN=RA AGENT'],
+            usercertificate=[cert2, self.cert],
+        )
+        fake_conn = LDAPClient('ldap://localhost', no_schema=True)
+        ldapentry = LDAPEntry(fake_conn,
+                              DN('uid=ipakra,ou=people,o=kra,o=ipaca'))
+        for attr, values in attrs.items():
+            ldapentry[attr] = values
+
+        framework = object()
+        registry.initialize(framework, config.Config)
+        f = IPAKRAAgent(registry)
+
+        f.conn = mock_ldap([ldapentry])
+        self.results = capture_results(f)
+
+        assert len(self.results) == 1
+
+        result = self.results.results[0]
+        assert result.result == constants.SUCCESS
+        assert result.source == 'ipahealthcheck.ipa.certs'
+        assert result.check == 'IPAKRAAgent'


=====================================
tests/test_ipa_cert_match.py
=====================================
@@ -0,0 +1,282 @@
+#
+# Copyright (C) 2021 FreeIPA Contributors see COPYING for license
+#
+
+from util import capture_results, m_api, CAInstance, KRAInstance
+from base import BaseTest
+from ipahealthcheck.core import config, constants
+from ipahealthcheck.ipa.plugin import registry
+from ipahealthcheck.ipa.certs import IPACertMatchCheck
+from ipahealthcheck.ipa.certs import IPADogtagCertsMatchCheck
+from unittest.mock import Mock, patch
+
+from ipalib import errors
+from ipapython.dn import DN
+from ipapython.ipaldap import LDAPClient, LDAPEntry
+
+
+class IPACertificate:
+    def __init__(self, serial_number=1):
+        self.serial_number = serial_number
+
+    def __eq__(self, other):
+        return self.serial_number == other.serial_number
+
+    def __hash__(self):
+        return hash(self.serial_number)
+
+
+class mock_ldap:
+    SCOPE_BASE = 1
+    SCOPE_ONELEVEL = 2
+    SCOPE_SUBTREE = 4
+
+    def __init__(self, entries):
+        """Initialize the results that we will return from get_entry"""
+        self.results = {entry.dn: entry for entry in entries}
+
+    def get_entry(self, dn, attrs_list=None, time_limit=None,
+                  size_limit=None, get_effective_rights=False):
+        if self.results is None:
+            raise errors.NotFound(reason='test')
+        return self.results[dn]
+
+    def get_entries(self, base_dn, scope=SCOPE_SUBTREE, filter=None,
+                    attrs_list=None, get_effective_rights=False, **kwargs):
+        if self.results is None:
+            raise errors.NotFound(reason='test')
+        return self.results.values()
+
+
+class mock_ldap_conn:
+    def set_option(self, option, invalue):
+        pass
+
+    def search_s(self, base, scope, filterstr=None,
+                 attrlist=None, attrsonly=0):
+        return tuple()
+
+
+class mock_CertDB:
+    def __init__(self, trust):
+        """A dict of nickname + NSSdb trust flags"""
+        self.trust = trust
+        self.secdir = '/foo/bar/testdir'
+
+    def get_cert_from_db(self, nickname):
+        if nickname not in self.trust.keys():
+            raise errors.NotFound(reason='test')
+        return IPACertificate()
+
+    def run_certutil(self, args, capture_output):
+        class RunResult:
+            def __init__(self, output):
+                self.raw_output = output
+
+        return RunResult(b'test output')
+
+
+class TestIPACertMatch(BaseTest):
+    patches = {
+        'ldap.initialize':
+        Mock(return_value=mock_ldap_conn())
+    }
+
+    @patch('ipalib.x509.load_certificate_list_from_file')
+    @patch('ipaserver.install.certs.CertDB')
+    def test_certs_match_ok(self, mock_certdb, mock_load_cert):
+        """ Ensure match check is ok"""
+        fake_conn = LDAPClient('ldap://localhost', no_schema=True)
+        cacertentry = LDAPEntry(fake_conn,
+                                DN('cn=%s IPA CA' % m_api.env.realm,
+                                   'cn=certificates,cn=ipa,cn=etc',
+                                    m_api.env.basedn),
+                                CACertificate=[IPACertificate()])
+        trust = {
+            ('%s IPA CA' % m_api.env.realm): 'u,u,u'
+        }
+
+        mock_certdb.return_value = mock_CertDB(trust)
+        mock_load_cert.return_value = [IPACertificate()]
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPACertMatchCheck(registry)
+        f.conn = mock_ldap([cacertentry])
+        self.results = capture_results(f)
+
+        assert len(self.results) == 3
+        for result in self.results.results:
+            assert result.result == constants.SUCCESS
+            assert result.source == 'ipahealthcheck.ipa.certs'
+            assert result.check == 'IPACertMatchCheck'
+
+    @patch('ipalib.x509.load_certificate_list_from_file')
+    @patch('ipaserver.install.certs.CertDB')
+    def test_etc_cacert_mismatch(self, mock_certdb, mock_load_cert):
+        """ Test mismatch with /etc/ipa/ca.crt """
+        fake_conn = LDAPClient('ldap://localhost', no_schema=True)
+        cacertentry = LDAPEntry(fake_conn,
+                                DN('cn=%s IPA CA' % m_api.env.realm,
+                                   'cn=certificates,cn=ipa,cn=etc',
+                                    m_api.env.basedn),
+                                CACertificate=[IPACertificate()])
+        trust = {
+            ('%s IPA CA' % m_api.env.realm): 'u,u,u'
+        }
+
+        mock_certdb.return_value = mock_CertDB(trust)
+        mock_load_cert.return_value = [IPACertificate(serial_number=2)]
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPACertMatchCheck(registry)
+        f.conn = mock_ldap([cacertentry])
+        self.results = capture_results(f)
+
+        assert len(self.results) == 3
+        result = self.results.results[0]
+        assert result.result == constants.ERROR
+        assert result.source == 'ipahealthcheck.ipa.certs'
+        assert result.check == 'IPACertMatchCheck'
+
+    @patch('ipaserver.install.cainstance.CAInstance')
+    def test_cacert_caless(self, mock_cainstance):
+        """Nothing to check if the master is CALess"""
+
+        mock_cainstance.return_value = CAInstance(False)
+
+        framework = object()
+        registry.initialize(framework, config)
+        f = IPACertMatchCheck(registry)
+
+        self.results = capture_results(f)
+
+        assert len(self.results) == 0
+
+
+class TestIPADogtagCertMatch(BaseTest):
+    patches = {
+        'ipaserver.install.krainstance.KRAInstance':
+        Mock(return_value=KRAInstance()),
+    }
+
+    @patch('ipaserver.install.certs.CertDB')
+    def test_certs_match_ok(self, mock_certdb):
+        """ Ensure match check is ok"""
+        fake_conn = LDAPClient('ldap://localhost', no_schema=True)
+        pkidbentry = LDAPEntry(fake_conn,
+                               DN('uid=pkidbuser,ou=people,o=ipaca'),
+                               userCertificate=[IPACertificate()],
+                               subjectName=['test'])
+        casignentry = LDAPEntry(fake_conn,
+                                DN('cn=%s IPA CA' % m_api.env.realm,
+                                   'cn=certificates,cn=ipa,cn=etc',
+                                    m_api.env.basedn),
+                                CACertificate=[IPACertificate()],
+                                userCertificate=[IPACertificate()],
+                                subjectName=['test'])
+        ldap_entries = [pkidbentry, casignentry]
+        trust = {
+            'ocspSigningCert cert-pki-ca': 'u,u,u',
+            'caSigningCert cert-pki-ca': 'u,u,u',
+            'subsystemCert cert-pki-ca': 'u,u,u',
+            'auditSigningCert cert-pki-ca': 'u,u,Pu',
+            'Server-Cert cert-pki-ca': 'u,u,u',
+            'transportCert cert-pki-kra': 'u,u,u',
+            'storageCert cert-pki-kra': 'u,u,u',
+            'auditSigningCert cert-pki-kra': 'u,u,Pu',
+        }
+
+        dogtag_entries_subjects = (
+            'CN=OCSP Subsystem,O=%s' % m_api.env.realm,
+            'CN=CA Subsystem,O=%s' % m_api.env.realm,
+            'CN=CA Audit,O=%s' % m_api.env.realm,
+            'CN=%s,O=%s' % (m_api.env.host, m_api.env.realm),
+            'CN=KRA Transport Certificate,O=%s' % m_api.env.realm,
+            'CN=KRA Storage Certificate,O=%s' % m_api.env.realm,
+            'CN=KRA Audit,O=%s' % m_api.env.realm,
+        )
+
+        for i, subject in enumerate(dogtag_entries_subjects):
+            entry = LDAPEntry(fake_conn,
+                              DN('cn=%i,ou=certificateRepository' % i,
+                                 'ou=ca,o=ipaca'),
+                              userCertificate=[IPACertificate()],
+                              subjectName=[subject])
+            ldap_entries.append(entry)
+
+        mock_certdb.return_value = mock_CertDB(trust)
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPADogtagCertsMatchCheck(registry)
+        f.conn = mock_ldap(ldap_entries)
+        self.results = capture_results(f)
+
+        assert len(self.results) == 3
+        for result in self.results.results:
+            assert result.result == constants.SUCCESS
+            assert result.source == 'ipahealthcheck.ipa.certs'
+            assert result.check == 'IPADogtagCertsMatchCheck'
+
+    @patch('ipaserver.install.certs.CertDB')
+    def test_certs_mismatch(self, mock_certdb):
+        """ Ensure mismatches are detected"""
+        fake_conn = LDAPClient('ldap://localhost', no_schema=True)
+        pkidbentry = LDAPEntry(fake_conn,
+                               DN('uid=pkidbuser,ou=people,o=ipaca'),
+                               userCertificate=[IPACertificate(
+                                   serial_number=2
+                               )],
+                               subjectName=['test'])
+        casignentry = LDAPEntry(fake_conn,
+                                DN('cn=%s IPA CA' % m_api.env.realm,
+                                   'cn=certificates,cn=ipa,cn=etc',
+                                    m_api.env.basedn),
+                                CACertificate=[IPACertificate()],
+                                userCertificate=[IPACertificate()],
+                                subjectName=['test'])
+        ldap_entries = [pkidbentry, casignentry]
+        trust = {
+            'ocspSigningCert cert-pki-ca': 'u,u,u',
+            'caSigningCert cert-pki-ca': 'u,u,u',
+            'subsystemCert cert-pki-ca': 'u,u,u',
+            'auditSigningCert cert-pki-ca': 'u,u,Pu',
+            'Server-Cert cert-pki-ca': 'u,u,u',
+            'transportCert cert-pki-kra': 'u,u,u',
+            'storageCert cert-pki-kra': 'u,u,u',
+            'auditSigningCert cert-pki-kra': 'u,u,Pu',
+        }
+
+        dogtag_entries_subjects = (
+            'CN=OCSP Subsystem,O=%s' % m_api.env.realm,
+            'CN=CA Subsystem,O=%s' % m_api.env.realm,
+            'CN=CA Audit,O=%s' % m_api.env.realm,
+            'CN=%s,O=%s' % (m_api.env.host, m_api.env.realm),
+            'CN=KRA Transport Certificate,O=%s' % m_api.env.realm,
+            'CN=KRA Storage Certificate,O=%s' % m_api.env.realm,
+            'CN=KRA Audit,O=%s' % m_api.env.realm,
+        )
+
+        for i, subject in enumerate(dogtag_entries_subjects):
+            entry = LDAPEntry(fake_conn,
+                              DN('cn=%i,ou=certificateRepository' % i,
+                                 'ou=ca,o=ipaca'),
+                              userCertificate=[IPACertificate()],
+                              subjectName=[subject])
+            ldap_entries.append(entry)
+
+        mock_certdb.return_value = mock_CertDB(trust)
+
+        framework = object()
+        registry.initialize(framework, config.Config())
+        f = IPADogtagCertsMatchCheck(registry)
+        f.conn = mock_ldap(ldap_entries)
+        self.results = capture_results(f)
+
+        assert len(self.results) == 3
+        result = self.results.results[0]
+        assert result.result == constants.ERROR
+        assert result.source == 'ipahealthcheck.ipa.certs'
+        assert result.check == 'IPADogtagCertsMatchCheck'


=====================================
tests/test_ipa_roles.py
=====================================
@@ -48,6 +48,18 @@ class TestCRLManagerRole(BaseTest):
         assert result.check == 'IPACRLManagerCheck'
         assert result.kw.get('crlgen_enabled') is True
 
+    @patch('ipaserver.install.cainstance.CAInstance')
+    def test_crlmanager_no_ca(self, mock_ca):
+        """There should be no CRLManagerCheck without a CA"""
+        mock_ca.return_value = CAInstance(False)
+        framework = object()
+        registry.initialize(framework, config.Config)
+        f = IPACRLManagerCheck(registry)
+
+        self.results = capture_results(f)
+
+        assert len(self.results) == 0
+
 
 class TestRenewalMaster(BaseTest):
     def test_renewal_master_not_set(self):


=====================================
tests/test_ipa_trust.py
=====================================
@@ -409,7 +409,6 @@ class TestIPADomain(BaseTest):
 
         self.results = capture_results(f)
 
-        print(self.results.results)
         assert len(self.results) == 1
 
         result = self.results.results[0]


=====================================
tests/test_results.py
=====================================
@@ -22,8 +22,8 @@ def test_Result():
     r = Result(p, constants.SUCCESS, **kw)
 
     e = raises(TypeError, Result)
-    assert str(e) == "__init__() missing 2 required positional arguments: " \
-                     "'plugin' and 'result'"
+    assert "__init__() missing 2 required positional arguments: " \
+           "'plugin' and 'result'" in str(e)
 
     # Test passing source and check to Result. This is used for loading
     # a previous output.



View it on GitLab: https://salsa.debian.org/freeipa-team/freeipa-healthcheck/-/compare/e6a67bbc532410e242897bbfa00a56eacbebf84e...2366731f8d72907d0c208eb9b6cad403f25bfe3e

-- 
View it on GitLab: https://salsa.debian.org/freeipa-team/freeipa-healthcheck/-/compare/e6a67bbc532410e242897bbfa00a56eacbebf84e...2366731f8d72907d0c208eb9b6cad403f25bfe3e
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-freeipa-devel/attachments/20211020/74a837d7/attachment-0001.htm>


More information about the Pkg-freeipa-devel mailing list