[Secure-testing-commits] r14628 - in lib/python: . sectracker

Florian Weimer fw at alioth.debian.org
Fri May 7 21:03:29 UTC 2010


Author: fw
Date: 2010-05-07 21:03:10 +0000 (Fri, 07 May 2010)
New Revision: 14628

Added:
   lib/python/sectracker/
   lib/python/sectracker/__init__.py
   lib/python/sectracker/diagnostics.py
Modified:
   lib/python/parsers.py
Log:
sectracker.diagnostics: introduce separate diagnostics module


Modified: lib/python/parsers.py
===================================================================
--- lib/python/parsers.py	2010-05-07 20:38:40 UTC (rev 14627)
+++ lib/python/parsers.py	2010-05-07 21:03:10 UTC (rev 14628)
@@ -23,6 +23,8 @@
 import xcollections
 import xpickle
 
+import sectracker.diagnostics
+
 FORMAT = "1"
 
 @xpickle.loader("BINARY" + FORMAT)
@@ -67,12 +69,6 @@
     l.sort()
     return tuple(l)
 
-Message = xcollections.namedtuple("Message", "file line level message")
-def addmessage(messages, file, line, level, msg):
-    if level not in ("error", "warning"):
-        raise ValueError("invalid message level: " + repr(level))
-    messages.append(Message(file, line, level, msg))
-
 FlagAnnotation = xcollections.namedtuple("FlagAnnotation", "line type")
 StringAnnotation = xcollections.namedtuple("StringAnnotation",
                                            "line type description")
@@ -86,40 +82,36 @@
     # Parser for inner annotations, like (bug #1345; low)
     urgencies=set("unimportant low medium high".split())
     @regexpcase.rule('(bug filed|%s)' % '|'.join(urgencies))
-    def innerflag(groups, file, line, messages, flags, bugs):
+    def innerflag(groups, diag, flags, bugs):
         f = groups[0]
         if f in flags:
-            addmessage(messages, file, line, "error",
-                       "duplicate flag: " + repr(f))
+            diag.error("duplicate flag: " + repr(f))
         else:
             flags.add(f)
     @regexpcase.rule(r'bug #(\d+)')
-    def innerbug(groups, file, line, messages, flags, bugs):
+    def innerbug(groups, diag, flags, bugs):
         no = int(groups[0])
         if no in bugs:
-            messages.add(file, line, "error",
-                         "duplicate bug number: " + groups[0])
+            diag.error("duplicate bug number: " + groups[0])
         else:
             bugs.add(no)
-    def innerdefault(text, file, line, messages, flags, bugs):
-        addmessage(messages, file, line, "error",
-                   "invalid inner annotation: " + repr(text))
+    def innerdefault(text, diag, flags, bugs):
+        diag.error("invalid inner annotation: " + repr(text))
     innerdispatch = regexpcase.RegexpCase((innerflag, innerbug),
                                           default=innerdefault)
 
-    def parseinner(file, line, messages, inner):
+    def parseinner(diag, inner):
         if not inner:
             return (None, (), False)
         flags = set()
         bugs = set()
         for innerann in inner.split(";"):
-            innerdispatch(innerann.strip(), file, line, messages, flags, bugs)
+            innerdispatch(innerann.strip(), diag, flags, bugs)
 
         urgency = urgencies.intersection(flags)
         if urgency:
             if len(urgency) > 1:
-                addmessage(messages, file, line, "error",
-                           "multiple urgencies: " + ", ".join(urgency))
+                diag.error("multiple urgencies: " + ", ".join(urgency))
             else:
                 urgency = urgency.pop()
         else:
@@ -127,8 +119,7 @@
 
         bug_filed = "bug filed" in flags 
         if bugs and bug_filed:
-            addmessage(messages, file, line, "error",
-                       "'bug filed' and bug numbers listed")
+            diag.error("'bug filed' and bug numbers listed")
             bug_filed = False
 
         return (urgency, _sortedtuple(bugs), bug_filed)
@@ -137,58 +128,56 @@
 
     @regexpcase.rule(r'(?:\[([a-z]+)\]\s)?-\s([A-Za-z0-9:.+-]+)\s*'
                      + r'(?:\s([A-Za-z0-9:.+~-]+)\s*)?(?:\s\((.*)\))?')
-    def package_version(groups, file, line, messages, anns):
+    def package_version(groups, diag, anns):
         release, package, version, inner = groups
-        inner = parseinner(file, line, messages, inner)
+        inner = parseinner(diag, inner)
         if version is None:
             kind = "unfixed"
         else:
             kind = "fixed"
         anns.append(PackageAnnotation(
-                *((line, "package", release, package, kind, version, None)
-                  + inner)))
+                *((diag.line(), "package", release, package, kind,
+                   version, None) + inner)))
 
     pseudo_freetext = "no-dsa not-affected end-of-life".split()
     pseudo_struct = set("unfixed removed itp undetermined".split())
     @regexpcase.rule(r'(?:\[([a-z]+)\]\s)?-\s([A-Za-z0-9:.+-]+)'
                      + r'\s+<([a-z-]+)>\s*(?:\s\((.*)\))?')
-    def package_pseudo(groups, file, line, messages, anns):
+    def package_pseudo(groups, diag, anns):
         release, package, version, inner = groups
         if version in pseudo_freetext:
             anns.append(PackageAnnotation(
-                    line, "package", release, package, version, None, inner,
-                    None, (), False))
+                    diag.line(), "package", release, package, version,
+                    None, inner, None, (), False))
         elif version in pseudo_struct:
-            inner = parseinner(file, line, messages, inner)
+            inner = parseinner(diag, inner)
             if version == "itp" and not inner[1]:
-                addmessage(messages, file, line, "error",
-                           "<itp> needs Debian bug reference")
+                diag.error("<itp> needs Debian bug reference")
             anns.append(PackageAnnotation(
-                    *((line, "package", release, package, version, None, None)
-                      + inner)))
+                    *((diag.line(), "package", release, package, version,
+                       None, None) + inner)))
         else:
-            addmessage(messages, file, line, "error",
-                       "invalid pseudo-version: " + repr(version))
+            diag.error("invalid pseudo-version: " + repr(version))
 
     @regexpcase.rule(r'\{(.*)\}')
-    def xref(groups, file, line, messages, anns):
+    def xref(groups, diag, anns):
         x = _sortedtuple(groups[0].strip().split())
         if x:
-            anns.append(XrefAnnotation(line, "xref", x))
+            anns.append(XrefAnnotation(diag.line(), "xref", x))
         else:
-            addmessage(messages, file, line, "error", "empty cross-reference")
+            diag.error("empty cross-reference")
         
     return regexpcase.RegexpCase(
         ((r'(RESERVED|REJECTED)',
-          lambda groups, file, line, messages, anns:
-              anns.append(FlagAnnotation(line, groups[0]))),
+          lambda groups, diag, anns:
+              anns.append(FlagAnnotation(diag.line(), groups[0]))),
          (r'(NOT-FOR-US|NOTE|TODO):\s+(\S.*)',
-          lambda groups, file, line, messages, anns:
-              anns.append(StringAnnotation(line, *groups))),
+          lambda groups, diag, anns:
+              anns.append(StringAnnotation(diag.line(), *groups))),
          package_version, package_pseudo, xref),
         prefix=r"\s+", suffix=r"\s*",
-        default=lambda text, file, line, messages, anns:
-            addmessage(messages, file, line, "error", "invalid annotation"))
+        default=lambda text, diag, anns:
+            diag.error("invalid annotation"))
 _annotationdispatcher = _annotationdispatcher()
 
 List = xcollections.namedtuple("List", "list messages")
@@ -209,7 +198,7 @@
     lineno = 0
     headerlineno = None
     bugs = []
-    messages = []
+    diag = sectracker.diagnostics.Diagnostics()
     name = desc = None
     anns = []
 
@@ -226,37 +215,37 @@
 
     for line in f.readlines():
         lineno += 1
+        diag.setlocation(path, lineno)
+
         if line[:1] in " \t":
             if name is None:
-                addmessage(messages, path, lineno, "error", "header expected")
+                diag.error("header expected")
                 continue
-            _annotationdispatcher(line, path, lineno, messages, anns)
+            _annotationdispatcher(line, diag, anns)
         else:
             emit()
             headerlineno = lineno
         
             match = _re_cve_header.match(line)
             if match is None:
-                addmessage(message, path, lineno, "error", "malformed header")
+                diag.error("malformed header")
                 name = desc = None
                 continue
             name, desc = match.groups()
             if desc:
                 if desc[0] == '(':
                     if desc[-1] <> ')':
-                        addmessage(message, path, lineno, "error", 
-                                   "missing ')'")
+                        diag.error("error", "missing ')'")
                     else:
                         desc = desc[1:-1]
                 elif desc[0] == '[':
                     if desc[-1] <> ']':
-                        addmessage(message, path, lineno, "error",
-                                   "missing ']'")
+                        diag.error("missing ']'")
                     else:
                         desc = desc[1:-1]
 
     emit()
-    return List(tuple(bugs), tuple(messages))
+    return List(tuple(bugs), diag.messages())
 
 def _test():
     o = binarypackages("../../data/packages/sid__main_i386_Packages")
@@ -272,6 +261,7 @@
     for err in o.messages:
         print "%s:%d: %s: %s" % (err.file, err.line, err.level, err.message)
 
+    Message = sectracker.diagnostics.Message
     for (line, res, xmsgs) in [
             (' - foo <unfixed>',
              PackageAnnotation(17, "package", None, "foo", "unfixed", None,
@@ -346,8 +336,10 @@
                       "invalid pseudo-version: 'garbled'"),)),
             ]:
         anns = []
-        msgs = []
-        _annotationdispatcher(line, "CVE", 17, msgs, anns)
+        diag = sectracker.diagnostics.Diagnostics()
+        diag.setlocation("CVE", 17)
+        _annotationdispatcher(line, diag, anns)
+        msgs = diag.messages()
         assert tuple(msgs) == xmsgs, repr(msgs)
         if anns:
             r = anns[0]

Added: lib/python/sectracker/diagnostics.py
===================================================================
--- lib/python/sectracker/diagnostics.py	                        (rev 0)
+++ lib/python/sectracker/diagnostics.py	2010-05-07 21:03:10 UTC (rev 14628)
@@ -0,0 +1,80 @@
+# sectracker.diagnostics -- keeping track of errors and warnings
+# Copyright (C) 2010 Florian Weimer <fw at deneb.enyo.de>
+# 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import xcollections
+
+Message = xcollections.namedtuple("Message", "file line level message")
+
+def _checkfile(file):
+    if not isinstance(file, basestring):
+        raise ValueError("file name is not a string: " + repr(file))
+    return file
+
+def _checkline(line):
+    if not isinstance(line, int):
+        raise ValueError("not a number: " + repr(line))
+    if line <= 0:
+        raise ValueError("line number must be positive: " + repr(line))
+    return line
+
+class Diagnostics:
+    def __init__(self):
+        self._messages = []
+        self._file = None
+        self._line = None
+
+    def setlocation(self, file, line=1):
+        if file is None and line is None:
+            self._file = self._line = None
+        else:
+            self._file = _checkfile(file)
+            self._line = _checkline(line)
+
+    def error(self, message, file=None, line=None):
+        self.record(file, line, "error", message)
+
+    def warning(self, message, file=None, line=None):
+        self.record(file, line, "warning", message)
+
+    def record(self, file, line, level, message):
+        if file is None:
+            file = self._file
+            if file is None:
+                raise Excpetion("location has not been set")
+        else:
+            _checkfile(file)
+        if line is None:
+            line = self._line
+            if line is None:
+                raise Excpetion("location has not been set")
+        else:
+            _checkline(line)
+        self._messages.append(Message(file, line, level, message))
+
+    def file(self):
+        if self._file is None:
+            raise Excpetion("location has not been set")
+        return self._file
+
+    def line(self):
+        if self._line is None:
+            raise Excpetion("location has not been set")
+        return self._line
+
+    def messages(self):
+        return tuple(self._messages)
+




More information about the Secure-testing-commits mailing list