[Secure-testing-commits] r2488 - bin lib/python

Florian Weimer fw at costa.debian.org
Thu Oct 20 09:03:39 UTC 2005


Author: fw
Date: 2005-10-20 09:03:39 +0000 (Thu, 20 Oct 2005)
New Revision: 2488

Added:
   bin/update-nvd
   lib/python/nvd.py
Modified:
   bin/tracker_service.py
   lib/python/security_db.py
Log:
 r638 at deneb:  fw | 2005-10-14 15:43:12 +0200
 bin/tracker_service.py (TrackerService.page_home):
   Document external interfaces.
 (TrackerService.page_bug):
   Add NVD references.
 (TrackerService.page_status_release_stable,
  TrackerService.page_status_release_testing):
   Show NVD remote attack range if present.
 (TrackerService.url_nvd, TrackerService.make_nvd_ref):
   New.
 
 lib/python/security_db.py (NVDEntry):
   New class.
 (DB.initSchema):
   New nvd_data table.  Update stable_status and testing_status views.
 (DB.replaceNVD, DB.getNVD):
   New methods.
 
 bin/update-nvd, lib/python/nvd.py:
   New files.

Modified: bin/tracker_service.py
===================================================================
--- bin/tracker_service.py	2005-10-20 09:03:27 UTC (rev 2487)
+++ bin/tracker_service.py	2005-10-20 09:03:39 UTC (rev 2488)
@@ -105,6 +105,8 @@
             ('data/releases',
              'Covered Debian releases and architectures (slow)'),
             self.make_search_button(url)),
+             P("""(You can enter CAN/CVE names, Debian bug numbers and package
+names in the search forms.)"""),
 
              H2("A few notes on data sources"),
              P("""Data in this tracker comes solely from the bug database
@@ -115,7 +117,15 @@
 can be some delay before this happens."""),
              P("""At the moment, the database only contains information which is
 relevant for tracking the security status of the stable, testing and
-unstable suites.  This means that data for oldstable is likely wrong.""")],
+unstable suites.  This means that data for oldstable is likely wrong."""),
+
+             H2("External interfaces"),
+             P("""If you want to automatically open a relevant web page for
+some object, use the """,
+               CODE(str(url.scriptRelative("redirect/")), EM("object")),
+               """ URL.  If no information is contained in this database,
+the browser is automatically redirected to the corresponding external
+data source.""")],
             search_in_page=True)
 
     def page_object(self, path, params, url):
@@ -178,7 +188,11 @@
 
             source = bug.name.split('-')[0]
             if source in ('CAN', 'CVE'):
-                source_xref = self.make_cve_ref(url, bug.name, 'CVE')
+                source_xref = compose(self.make_cve_ref(url, bug.name, 'CVE'),
+                                      " (",
+                                      self.make_nvd_ref(url, bug.name,
+                                                        'in NVD'),
+                                      ")")
             elif source == 'DSA':
                 source_xref = self.make_dsa_ref(url, bug.name, 'Debian')
             elif source == 'DTSA':
@@ -198,6 +212,14 @@
             xref = list(self.db.getBugXrefs(cursor, bug.name))
             if xref:
                 yield B("References"), self.make_xref_list(url, xref)
+
+            nvd = self.db.getNVD(cursor, bug.name)
+            if nvd:
+                if nvd.severity:
+                    yield B("NVD severity"), nvd.severity.lower()
+                nvd_range = nvd.rangeString()
+                if nvd_range:
+                    yield B("NVD attack range"), nvd_range
             
             debian_bugs = bug.getDebianBugs(cursor)
             if debian_bugs:
@@ -435,9 +457,10 @@
     def page_status_release_stable(self, path, params, url):
         def gen():
             old_pkg_name = ''
-            for (pkg_name, bug_name, archive, urgency) in \
+            for (pkg_name, bug_name, archive, urgency, remote) in \
                     self.db.cursor().execute(
-                """SELECT package, bug, section, urgency FROM stable_status"""):
+                """SELECT package, bug, section, urgency, remote
+                FROM stable_status"""):
                 if pkg_name == old_pkg_name:
                     pkg_name = ''
                 else:
@@ -445,24 +468,32 @@
                     if archive <> 'main':
                         pkg_name = "%s (%s)" % (pkg_name, archive)
 
+                if remote is None:
+                    remote = ''
+                elif remote:
+                    remote = 'yes'
+                else:
+                    remote = 'no'
+
                 if urgency == 'unknown':
                     urgency = ''
                 elif urgency == 'high':
                     urgency = self.make_red(urgency)
 
-                yield pkg_name, self.make_xref(url, bug_name), urgency
+                yield pkg_name, self.make_xref(url, bug_name), urgency, remote
 
         return self.create_page(
             url, 'Vulnerable source packages in the stable suite',
-            [make_table(gen(), caption=("Package", "Bug", "Urgency"))])
+            [make_table(gen(), caption=("Package", "Bug", "Urgency",
+                                        "Remote"))])
             
     def page_status_release_testing(self, path, params, url):
         def gen():
             old_pkg_name = ''
             for (pkg_name, bug_name, archive, urgency,
-                 sid_vulnerable, ts_fixed) in self.db.cursor().execute(
+                 sid_vulnerable, ts_fixed, remote) in self.db.cursor().execute(
                 """SELECT package, bug, section, urgency, unstable_vulnerable,
-                testing_security_fixed
+                testing_security_fixed, remote
                 FROM testing_status"""):
                 if pkg_name == old_pkg_name:
                     pkg_name = ''
@@ -471,6 +502,13 @@
                     if archive <> 'main':
                         pkg_name = "%s (%s)" % (pkg_name, archive)
 
+                if remote is None:
+                    remote = ''
+                elif remote:
+                    remote = 'yes'
+                else:
+                    remote = 'no'
+
                 if ts_fixed:
                     status = 'fixed in testing-security'
                 else:
@@ -483,13 +521,14 @@
                     urgency = ''
 
                 yield (pkg_name, self.make_xref(url, bug_name),
-                       urgency, status)
+                       urgency, remote, status)
 
         return self.create_page(
             url, 'Vulnerable source packages in the testing suite',
             [make_menu(url.scriptRelative,
                        ("status/dtsa-candidates", "Candidates for DTSAs")),
-             make_table(gen(), caption=("Package", "Bug"))])
+             make_table(gen(), caption=("Package", "Bug", "Urgency",
+                                        "Remote"))])
 
     def page_status_release_unstable(self, path, params, url):
         def gen():
@@ -737,6 +776,10 @@
     def url_cve(self, url, name):
         return url.absolute("http://cve.mitre.org/cgi-bin/cvename.cgi",
                             name=name)
+    def url_nvd(self, url, name):
+        return url.absolute("http://nvd.nist.gov/nvd.cfm",
+                            cvename=name)
+    
     def url_dsa(self, url, dsa, re_dsa=re.compile(r'^DSA-(\d+)(?:-\d+)?$')):
         match = re_dsa.match(dsa)
         if match:
@@ -788,6 +831,11 @@
             name = cve
         return A(self.url_cve(url, cve), name)
     
+    def make_nvd_ref(self, url, cve, name=None):
+        if name is None:
+            name = cve
+        return A(self.url_nvd(url, cve), name)
+
     def make_dsa_ref(self, url, dsa, name=None):
         if name is None:
             name = dsa

Added: bin/update-nvd
===================================================================
--- bin/update-nvd	2005-10-20 09:03:27 UTC (rev 2487)
+++ bin/update-nvd	2005-10-20 09:03:39 UTC (rev 2488)
@@ -0,0 +1,35 @@
+#!/usr/bin/python
+
+import os
+import os.path
+import string
+import sys
+
+def setup_paths():
+    check_file = 'lib/python/debian_support.py'
+    path = os.getcwd()
+    while 1:
+        if os.path.exists("%s/%s" % (path, check_file)):
+            sys.path = [path + '/lib/python'] + sys.path
+            return path
+        idx = string.rfind(path, '/')
+        if idx == -1:
+            raise ImportError, "could not setup paths"
+        path = path[0:idx]
+os.chdir(setup_paths())
+
+import nvd
+import security_db
+
+db_file = 'data/security.db'
+db = security_db.DB(db_file)
+
+data = []
+for name in sys.argv[1:]:
+    f = file(name)
+    data += nvd.parse(f)
+    f.close()
+
+cursor = db.writeTxn()
+db.replaceNVD(cursor, data)
+db.commit(cursor)


Property changes on: bin/update-nvd
___________________________________________________________________
Name: svn:mime-type
   + text/script

Added: lib/python/nvd.py
===================================================================
--- lib/python/nvd.py	2005-10-20 09:03:27 UTC (rev 2487)
+++ lib/python/nvd.py	2005-10-20 09:03:39 UTC (rev 2488)
@@ -0,0 +1,116 @@
+# nvd.py -- simplistic NVD parser
+# Copyright (C) 2005 Florian Weimer <fw at deneb.enyo.de>
+# 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+"""This module parses the XML files provided by the
+National Vulnerability Database (NVD) <http://nvd.nist.gov/>
+"""
+
+import xml.sax
+import xml.sax.handler
+
+class _Parser(xml.sax.handler.ContentHandler):
+    """Parser helper class."""
+
+    def __init__(self):
+        self.result = []
+        self.start_dispatcher = {}
+        for x in ('entry', 'local', 'range', 'remote', 'user_init',
+                  'avail', 'conf', 'int', 'sec_prot'):
+             self.start_dispatcher[x] = getattr(self, 'TAG_' + x)
+
+    def _noop(*args):
+        pass
+
+    def startElement(self, name, attrs):
+        self.start_dispatcher.get(name, self._noop)(name, attrs)
+
+    def TAG_entry(self, name, attrs):
+        self.name = attrs['name'].encode('utf-8')
+        self.published = attrs['published'].encode('utf-8')
+        self.severity = attrs.get('severity', u'').encode('utf-8')
+        self.discovered = attrs.get('discovered', u'').encode('utf-8')
+
+        self.range_local = self.range_remote = self.range_user_init = None
+
+        self.loss_avail = self.loss_conf = self.loss_int \
+            = self.loss_sec_prot_user = self.loss_sec_prot_admin \
+            = self.loss_sec_prot_other = 0
+
+    def TAG_range(self, name, attrs):
+        self.range_local = self.range_remote = self.range_user_init = 0
+
+    def TAG_local(self, name, attrs):
+        self.range_local = 1
+    def TAG_remote(self, name, attrs):
+        self.range_remote = 1
+    def TAG_user_init(self, name, attrs):
+        self.range_user_init = 1
+    def TAG_loss_types(self, name, attrs):
+        self.clear_loss()
+    def TAG_avail(self, name, attrs):
+        self.loss_avail = 1
+    def TAG_conf(self, name, attrs):
+        self.loss_conf = 1
+    def TAG_int(self, name, attrs):
+        self.loss_int = 1
+    def TAG_sec_prot(self, name, attrs):
+        if attrs.has_key('user'):
+            self.loss_sec_prot_user = 1
+        if attrs.has_key('admin'):
+            self.loss_sec_prot_admin = 1
+        if attrs.has_key('other'):
+            self.loss_sec_prot_other = 1
+            
+    def endElement(self, name):
+        if name == 'entry':
+            self.result.append((self.name,
+                                self.discovered,
+                                self.published,
+                                self.severity,
+                                self.range_local,
+                                self.range_remote,
+                                self.range_user_init,
+                                self.loss_avail,
+                                self.loss_conf,
+                                self.loss_int,
+                                self.loss_sec_prot_user,
+                                self.loss_sec_prot_admin,
+                                self.loss_sec_prot_other))
+
+def parse(file):
+    """Parses the indicated file object.  Returns a list of tuples,
+    containing the following elements:
+
+    - CVE name
+    - discovery data (can be empty)
+    - publication date
+    - severity (can be empty)
+    - local range flag
+    - remote range flag
+    - availability loss type flag
+    - confidentiality loss type flag
+    - integrity loss type flag
+    - security protection (user) loss type flag
+    - security protection (admin) loss type flag
+    - security protection (other) loss type flag
+    """
+    parser = xml.sax.make_parser()
+    parser.setFeature(xml.sax.handler.feature_namespaces, 0)
+    p = _Parser()
+    parser.setContentHandler(p)
+    parser.parse(file)
+    return p.result

Modified: lib/python/security_db.py
===================================================================
--- lib/python/security_db.py	2005-10-20 09:03:27 UTC (rev 2487)
+++ lib/python/security_db.py	2005-10-20 09:03:39 UTC (rev 2488)
@@ -74,6 +74,22 @@
     result.sort()
     return result
 
+class NVDEntry:
+    """A class for an entry in the nvd_data table.
+    Objects have the same fileds as the table."""
+    def __init__(self, row, description):
+        for x in range(len(row)):
+            setattr(self, description[x][0], row[x])
+    def rangeString(self):
+        result = []
+        if self.range_local:
+            result.append("local")
+        if self.range_remote:
+            result.append("remote")
+        if self.range_user_init:
+            result.append("user-initiated")
+        return ", ".join(result)
+
 class SchemaMismatch(Exception):
     """Raised to indicate a schema mismatch.
 
@@ -95,7 +111,7 @@
         self.db = apsw.Connection(name)
         self.verbose = verbose
 
-        self.schema_version = 13
+        self.schema_version = 15
         self._initFunctions()
 
         c = self.cursor()
@@ -281,6 +297,22 @@
             "CREATE TABLE removed_packages (name TEXT NOT NULL PRIMARY KEY)")
 
         cursor.execute(
+            """CREATE TABLE nvd_data
+            (cve_name TEXT NOT NULL PRIMARY KEY,
+            discovered TEXT NOT NULL,
+            published TEXT NOT NULL,
+            severity TEXT NOT NULL,
+            range_local INTEGER,
+            range_remote INTEGER,
+            range_user_init INTEGER,
+            loss_avail INTEGER NOT NULL,
+            loss_conf INTEGER NOT NULL,
+            loss_int INTEGER NOT NULL,
+            loss_sec_prot_user INTEGER NOT NULL,
+            loss_sec_prot_admin INTEGER NOT NULL,
+            loss_sec_prot_other INTEGER NOT NULL)""")
+
+        cursor.execute(
             """CREATE VIEW testing_status AS
             SELECT DISTINCT sp.name AS package, st.bug_name AS bug,
             sp.archive AS section, st.urgency AS urgency,
@@ -297,7 +329,9 @@
             AND tsecp.release = 'etch' AND tsecp.subrelease = 'security'
             AND tsecp.archive = sp.archive
             AND tsecst.bug_name = st.bug_name
-            AND tsecst.package = tsecp.rowid), 0) AS testing_security_fixed
+            AND tsecst.package = tsecp.rowid), 0) AS testing_security_fixed,
+            (SELECT range_remote FROM nvd_data
+             WHERE cve_name = st.bug_name) AS remote
             FROM source_package_status AS st, source_packages AS sp
             WHERE st.vulnerable AND st.urgency <> 'unimportant'
             AND sp.rowid = st.package AND sp.release = 'etch'
@@ -307,7 +341,9 @@
         cursor.execute(
             """CREATE VIEW stable_status AS
             SELECT DISTINCT sp.name AS package, st.bug_name AS bug,
-            sp.archive AS section, st.urgency AS urgency
+            sp.archive AS section, st.urgency AS urgency,
+            (SELECT range_remote FROM nvd_data
+             WHERE cve_name = st.bug_name) AS remote
             FROM source_package_status AS st, source_packages AS sp
             WHERE st.vulnerable AND st.urgency <> 'unimportant'
             AND sp.rowid = st.package AND sp.release = 'sarge'
@@ -321,7 +357,6 @@
             AND secst.package = secp.rowid), 0)
             ORDER BY sp.name, urgency_to_number(urgency), st.bug_name""")
 
-
         cursor.execute("PRAGMA user_version = %d" % self.schema_version)
 
     def _initFunctions(self):
@@ -1198,6 +1233,21 @@
         VALUES (?, ?, ?, ?)""",
               (bug_name, suite, status, pkgs))
 
+    def replaceNVD(self, cursor, data):
+        """Replaces the stored NVD data."""
+        cursor.execute("DELETE FROM nvd_data");
+        cursor.executemany("INSERT INTO nvd_data VALUES (?"
+                           + (", ?" * (len(data[0]) - 1))
+                           + ")", data)
+
+    def getNVD(self, cursor, cve_name):
+        """Returns a dictionary with NVD data corresponding to the CVE name,
+        or None."""
+        for row in cursor.execute("SELECT * FROM nvd_data WHERE cve_name = ?",
+                                  (cve_name,)):
+            return NVDEntry(row, cursor.getdescription())
+        return None
+
     def getSourcePackageVersions(self, cursor, pkg):
         """A generator which returns tuples (RELEASE-LIST, VERSION),
         the available versions of the source package pkg."""




More information about the Secure-testing-commits mailing list