[PATCH] Clean up deb822.GpgInfo implementation.

John Wright jsw at debian.org
Sun Jul 3 08:00:35 UTC 2011

This patch fixes a number of bugs and warts in GpgInfo:

  - Change several @staticmethod decorated methods to @classmethod,
    since they call the class constructor.
  - from_sequence now can accept both sequences of newline-terminated
    strings and the output of str.splitlines().
  - from_file now actually calls the from_sequence method
    (Closes: #627058)
  - from_sequence now makes a copy of the initial args list before
    extending it (Closes: #627060)
  - Keyrings are no longer checked for accessibility, since gpgv can
    accept keyring arguments that are under the GnuPG home directory,
    regardless of the current directory.  (Closes: #627063)

As a bonus, Deb822.gpg_info now takes an optional keyrings argument that
it passes on to GpgInfo.from_sequence.
 lib/debian/deb822.py |   97 ++++++++++++++++++++++++++++++++++---------------
 tests/test_deb822.py |   74 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 141 insertions(+), 30 deletions(-)

diff --git a/lib/debian/deb822.py b/lib/debian/deb822.py
index a0cad69..8b565eb 100644
--- a/lib/debian/deb822.py
+++ b/lib/debian/deb822.py
@@ -33,15 +33,21 @@ except (ImportError, AttributeError):
     _have_apt_pkg = False
 import chardet
-import new
+import os
 import re
 import string
+import subprocess
 import sys
 import warnings
 import StringIO
 import UserDict
+GPGV_DEFAULT_KEYRINGS = frozenset(['/usr/share/keyrings/debian-keyring.gpg'])
+GPGV_EXECUTABLE = '/usr/bin/gpgv'
 class TagSectionWrapper(object, UserDict.DictMixin):
     """Wrap a TagSection object, using its find_raw method to get field values
@@ -69,6 +75,7 @@ class TagSectionWrapper(object, UserDict.DictMixin):
         # off any newline at the end of the data.
         return data.lstrip(' \t').rstrip('\n')
 class OrderedSet(object):
     """A set-like object that preserves order when iterating over it
@@ -112,6 +119,7 @@ class OrderedSet(object):
 class Deb822Dict(object, UserDict.DictMixin):
     # Subclassing UserDict.DictMixin because we're overriding so much dict
     # functionality that subclassing dict requires overriding many more than
@@ -571,11 +579,14 @@ class Deb822(Deb822Dict):
     gpg_stripped_paragraph = classmethod(gpg_stripped_paragraph)
-    def get_gpg_info(self):
+    def get_gpg_info(self, keyrings=None):
         """Return a GpgInfo object with GPG signature information
         This method will raise ValueError if the signature is not available
-        (e.g. the original text cannot be found)"""
+        (e.g. the original text cannot be found).
+        :param keyrings: list of keyrings to use (see GpgInfo.from_sequence)
+        """
         # raw_text is saved (as a string) only for Changes and Dsc (see
         # _gpg_multivalued.__init__) which is small compared to Packages or
@@ -584,11 +595,11 @@ class Deb822(Deb822Dict):
             raise ValueError, "original text cannot be found"
         if self.gpg_info is None:
-            self.gpg_info = GpgInfo.from_sequence(self.raw_text)
+            self.gpg_info = GpgInfo.from_sequence(self.raw_text,
+                                                  keyrings=keyrings)
         return self.gpg_info
 # XXX check what happens if input contains more that one signature
 class GpgInfo(dict):
@@ -612,14 +623,14 @@ class GpgInfo(dict):
         """Return the primary ID of the signee key, None is not available"""
-    @staticmethod
-    def from_output(out, err=None):
+    @classmethod
+    def from_output(cls, out, err=None):
         """Create a new GpgInfo object from gpg(v) --status-fd output (out) and
         optionally collect stderr as well (err).
         Both out and err can be lines in newline-terminated sequence or regular strings."""
-        n = GpgInfo()
+        n = cls()
         if isinstance(out, basestring):
             out = out.split('\n')
@@ -640,7 +651,7 @@ class GpgInfo(dict):
             # str.partition() would be better, 2.5 only though
             s = l.find(' ')
             key = l[:s]
-            if key in GpgInfo.uidkeys:
+            if key in cls.uidkeys:
                 # value is "keyid UID", don't split UID
                 value = l[s+1:].split(' ', 1)
@@ -649,42 +660,69 @@ class GpgInfo(dict):
             n[key] = value
         return n 
-# XXX how to handle sequences of lines? file() returns \n-terminated
-    @staticmethod
-    def from_sequence(sequence, keyrings=['/usr/share/keyrings/debian-keyring.gpg'],
-            executable=["/usr/bin/gpgv"]):
+    @classmethod
+    def from_sequence(cls, sequence, keyrings=None, executable=None):
         """Create a new GpgInfo object from the given sequence.
-        Sequence is a sequence of lines or a string
-        executable is a list of args for subprocess.Popen, the first element being the gpg executable"""
+        :param sequence: sequence of lines or a string
+        :param keyrings: list of keyrings to use (default:
+            ['/usr/share/keyrings/debian-keyring.gpg'])
+        :param executable: list of args for subprocess.Popen, the first element
+            being the gpgv executable (default: ['/usr/bin/gpgv'])
+        """
+        keyrings = keyrings or GPGV_DEFAULT_KEYRINGS
+        executable = executable or [GPGV_EXECUTABLE]
         # XXX check for gpg as well and use --verify accordingly?
-        args = executable
+        args = list(executable)
         #args.extend(["--status-fd", "1", "--no-default-keyring"])
         args.extend(["--status-fd", "1"])
-        import os
-        [args.extend(["--keyring", k]) for k in keyrings if os.path.isfile(k) and os.access(k, os.R_OK)]
+        for k in keyrings:
+            args.extend(["--keyring", k])
         if "--keyring" not in args:
             raise IOError, "cannot access any of the given keyrings"
-        import subprocess
-        p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+        p = subprocess.Popen(args, stdin=subprocess.PIPE,
+                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
         # XXX what to do with exit code?
         if isinstance(sequence, basestring):
             (out, err) = p.communicate(sequence)
-            (out, err) = p.communicate("\n".join(sequence))
+            (out, err) = p.communicate(cls._get_full_string(sequence))
-        return GpgInfo.from_output(out, err)
+        return cls.from_output(out, err)
-    def from_file(target, *args):
-        """Create a new GpgInfo object from the given file, calls from_sequence(file(target), *args)"""
-        return from_sequence(file(target), *args)
+    def _get_full_string(sequence):
+        """Return a string from a sequence of lines.
+        This method detects if the sequence's lines are newline-terminated, and
+        constructs the string appropriately.
+        """
+        # Peek at the first line to see if it's newline-terminated.
+        sequence_iter = iter(sequence)
+        try:
+            first_line = sequence_iter.next()
+        except StopIteration:
+            return ""
+        join_str = '\n'
+        if first_line.endswith('\n'):
+            join_str = ''
+        return first_line + join_str + join_str.join(sequence_iter)
+    @classmethod
+    def from_file(cls, target, *args, **kwargs):
+        """Create a new GpgInfo object from the given file.
+        See GpgInfo.from_sequence.
+        """
+        return cls.from_sequence(file(target), *args, **kwargs)
 class PkgRelation(object):
     """Inter-package relationships
@@ -864,6 +902,7 @@ class _PkgRelationMixin(object):
             self.__parsed_relations = True
         return self.__relations
 class _multivalued(Deb822):
     """A class with (R/W) support for multivalued fields.
@@ -922,8 +961,6 @@ class _multivalued(Deb822):
             return Deb822.get_as_string(self, key)
 class _gpg_multivalued(_multivalued):
     """A _multivalued class that can support gpg signed objects
@@ -1101,7 +1138,6 @@ class Packages(Deb822, _PkgRelationMixin):
         Deb822.__init__(self, *args, **kwargs)
         _PkgRelationMixin.__init__(self, *args, **kwargs)
 class _CaseInsensitiveString(str):
     """Case insensitive string.
@@ -1122,4 +1158,5 @@ class _CaseInsensitiveString(str):
     def lower(self):
         return self.str_lower
 _strI = _CaseInsensitiveString
diff --git a/tests/test_deb822.py b/tests/test_deb822.py
index 891f4cd..6d0f329 100755
--- a/tests/test_deb822.py
+++ b/tests/test_deb822.py
@@ -20,6 +20,7 @@
 import os
 import re
 import sys
+import tempfile
 import unittest
 import warnings
 from StringIO import StringIO
@@ -844,5 +845,78 @@ class TestPkgRelations(unittest.TestCase):
                     [{'name': 'binutils-source', 'version': None, 'arch': None}]]}
         self.assertEqual(rel2, pkg2.relations)
+class TestGpgInfo(unittest.TestCase):
+    def setUp(self):
+        # These tests can only run with gpgv and a keyring available.  When we
+        # can use Python >= 2.7, we can use the skip decorator; for now just
+        # check in each test method whether we should actually run.
+        self.should_run = (
+            os.path.exists('/usr/bin/gpgv') and
+            os.path.exists('/usr/share/keyrings/debian-keyring.gpg'))
+        self.valid = {
+            'GOODSIG':
+                ['D14219877A786561', 'John Wright <john.wright at hp.com>'],
+            'VALIDSIG':
+                ['8FEFE900783CF175827C2F65D14219877A786561', '2008-05-01',
+                 '1209623566', '0', '3', '0', '17', '2', '01',
+                 '8FEFE900783CF175827C2F65D14219877A786561'],
+            'SIG_ID':
+                ['j3UjSpdky92fcQISbm8W5PlwC/g', '2008-05-01', '1209623566'],
+        }
+    def _validate_gpg_info(self, gpg_info):
+        # The second part of the GOODSIG field could change if the primary
+        # uid changes, so avoid checking that.  Also, the first part of the
+        # SIG_ID field has undergone at least one algorithm changein gpg,
+        # so don't bother testing that either.
+        self.assertEqual(set(gpg_info.keys()), set(self.valid.keys()))
+        self.assertEqual(gpg_info['GOODSIG'][0], self.valid['GOODSIG'][0])
+        self.assertEqual(gpg_info['VALIDSIG'], self.valid['VALIDSIG'])
+        self.assertEqual(gpg_info['SIG_ID'][1:], self.valid['SIG_ID'][1:])
+    def test_from_sequence_string(self):
+        if not self.should_run:
+            return
+        gpg_info = deb822.GpgInfo.from_sequence(self.data)
+        self._validate_gpg_info(gpg_info)
+    def test_from_sequence_newline_terminated(self):
+        if not self.should_run:
+            return
+        sequence = StringIO(self.data)
+        gpg_info = deb822.GpgInfo.from_sequence(sequence)
+        self._validate_gpg_info(gpg_info)
+    def test_from_sequence_no_newlines(self):
+        if not self.should_run:
+            return
+        sequence = self.data.splitlines()
+        gpg_info = deb822.GpgInfo.from_sequence(sequence)
+        self._validate_gpg_info(gpg_info)
+    def test_from_file(self):
+        if not self.should_run:
+            return
+        fd, filename = tempfile.mkstemp()
+        fp = os.fdopen(fd, 'w')
+        fp.write(self.data)
+        fp.close()
+        try:
+            gpg_info = deb822.GpgInfo.from_file(filename)
+        finally:
+            os.remove(filename)
+        self._validate_gpg_info(gpg_info)
 if __name__ == '__main__':

