[Secure-testing-commits] r3051 - bin lib/python

Florian Weimer fw at costa.debian.org
Thu Dec 15 11:37:41 UTC 2005


Author: fw
Date: 2005-12-15 11:37:40 +0000 (Thu, 15 Dec 2005)
New Revision: 3051

Modified:
   bin/tracker_service.py
   bin/update-db
   lib/python/security_db.py
Log:
lib/python/security_db.py (DB):
  Bump schema version.
(DB.initSchema):
  Add debsecan_data table.
(DB.calculateDebsecan, DB.getDebsecan):
  New methods.

bin/update-db:
  Invoke calculateDebsecan.

bin/tracker_service.py (TrackerService):
  Add support for debsecan/* pages.
(TrackerService.page_debsecan):
  New method.


Modified: bin/tracker_service.py
===================================================================
--- bin/tracker_service.py	2005-12-15 11:34:35 UTC (rev 3050)
+++ bin/tracker_service.py	2005-12-15 11:37:40 UTC (rev 3051)
@@ -98,6 +98,7 @@
         self.register('data/releases', self.page_data_releases)
         self.register('data/funny-versions', self.page_data_funny_versions)
         self.register('data/fake-names', self.page_data_fake_names)
+        self.register('debsecan/**', self.page_debsecan)
 
     def page_home(self, path, params, url):
         query = params.get('query', ('',))[0]
@@ -854,6 +855,17 @@
              make_table(gen(),
                         caption=("Bug", "Description"))])
 
+    def page_debsecan(self, path, params, url):
+        obj = '/'.join(path)
+        data = self.db.getDebsecan(obj)
+        if data:
+            return BinaryResult(data)
+        else:
+            return self.create_page(
+                url, "Object not found",
+                [P("The requested debsecan object has not been found.")],
+                status=404)
+
     def create_page(self, url, title, body, search_in_page=False, status=200):
         append = body.append
         append(HR())

Modified: bin/update-db
===================================================================
--- bin/update-db	2005-12-15 11:34:35 UTC (rev 3050)
+++ bin/update-db	2005-12-15 11:37:40 UTC (rev 3051)
@@ -75,6 +75,11 @@
         print x
     sys.exit(1)
 
+# debsecan data
+
+for release in ('', 'woody', 'sarge', 'etch'):
+    db.calculateDebsecan(release)
+
 # Everything worked well.
-    
+
 db.commit(cursor)

Modified: lib/python/security_db.py
===================================================================
--- lib/python/security_db.py	2005-12-15 11:34:35 UTC (rev 3050)
+++ lib/python/security_db.py	2005-12-15 11:37:40 UTC (rev 3051)
@@ -27,6 +27,7 @@
 """
 
 import apsw
+import base64
 import bugs
 import cPickle
 import cStringIO
@@ -37,6 +38,7 @@
 import re
 import sys
 import types
+import zlib
 
 class InsertError(Exception):
     """Class for capturing insert errors.
@@ -111,7 +113,7 @@
         self.db = apsw.Connection(name)
         self.verbose = verbose
 
-        self.schema_version = 17
+        self.schema_version = 18
         self._initFunctions()
 
         c = self.cursor()
@@ -309,6 +311,11 @@
             loss_sec_prot_other INTEGER NOT NULL)""")
 
         cursor.execute(
+            """CREATE TABLE debsecan_data
+            (name TEXT NOT NULL PRIMARY KEY,
+            data TEXT NOT NULL)""")
+
+        cursor.execute(
             """CREATE VIEW testing_status AS
             SELECT DISTINCT sp.name AS package, st.bug_name AS bug,
             sp.archive AS section, st.urgency AS urgency,
@@ -1219,6 +1226,133 @@
         VALUES (?, ?, ?, ?)""",
               (bug_name, suite, status, pkgs))
 
+    def calculateDebsecan(self, release):
+        """Create data for the debsecan tool."""
+
+        c = self.cursor()
+
+        c.execute("""CREATE TEMPORARY TABLE vulnlist (
+        name TEXT NOT NULL,
+        package TEXT NOT NULL,
+        note INTEGER NOT NULL,
+        PRIMARY KEY (name, package)
+        )""")
+
+        # Populate the table with the unstable vulnerabilities;
+        # override them with the release-specific status.
+
+        c.execute("""INSERT INTO vulnlist
+        SELECT bug_name, package, id FROM package_notes WHERE release = ''""")
+
+        if release:
+            c.execute("""INSERT OR REPLACE INTO vulnlist
+            SELECT bug_name, package, id FROM package_notes
+            WHERE release = ?""", (release,))
+
+        c.execute("""DELETE FROM vulnlist WHERE name LIKE 'FAKE-0000000-%'""")
+
+        urgency_to_flag = {'low' : 'L', 'medium' : 'M', 'high' : 'H',
+                           'unknown' : ' '}
+
+        result = ["VERSION 0\n"]
+        for (name, package, fixed_version, kind, urgency, remote, description,
+             note_id) in list(c.execute("""SELECT
+                vulnlist.name, vulnlist.package,
+                COALESCE(n.fixed_version, ''),
+                n.package_kind, n.urgency,
+                (SELECT range_remote FROM nvd_data
+                 WHERE cve_name = vulnlist.name) AS remote,
+                bugs.description,
+                n.id
+                FROM vulnlist, bugs, package_notes AS n
+                WHERE bugs.name = vulnlist.name
+                AND n.id = vulnlist.note
+                ORDER BY vulnlist.package""")):
+            if fixed_version == '0' or urgency == 'unimportant' \
+               or kind not in ('source', 'binary', 'unknown'):
+                continue
+
+            # Normalize FAKE-* names a bit.  The line number (which
+            # makes the name unique) is completely useless for the
+            # client.
+
+            if name[0:5] == "FAKE-":
+                name = '-'.join(name.split('-')[0:2])
+
+            # Determine if a fix is available for the specific
+            # release.
+
+            fix_available = ' '
+            if release:
+                fix_available = ' '
+                if kind == 'source':
+                    fix_available_sql = """SELECT st.vulnerable
+                        FROM source_packages AS p, source_package_status AS st
+                        WHERE p.name = ?
+                        AND p.release = ?
+                        AND p.subrelease IN ('', 'security')
+                        AND st.bug_name = ?
+                        AND st.package = p.rowid
+                        ORDER BY p.version COLLATE version DESC"""
+                elif kind == 'binary':
+                    fix_available_sql = """SELECT st.vulnerable
+                        FROM binary_packages AS p, binary_package_status AS st
+                        WHERE p.name = ?
+                        AND p.release = ?
+                        AND p.subrelease IN ('', 'security')
+                        AND st.bug_name = ?
+                        AND st.package = p.rowid
+                        ORDER BY p.version COLLATE version DESC"""
+                else:
+                    fix_available_sql = ''
+
+                if fix_available_sql:
+                    for (v,) in c.execute(fix_available_sql,
+                                          (package, release, name)):
+                        assert v is not None
+                        if not v:
+                            fix_available = 'F'
+                        break
+            elif fixed_version <> '':
+                fix_available = 'F'
+
+            if kind == 'source':
+                kind = 'S'
+            elif kind == 'binary':
+                kind = 'B'
+            else:
+                kind = ' '
+
+            if remote is None:
+                remote = '?'
+            elif remote:
+                remote = 'R'
+            else:
+                remote = ' '
+
+            result.append("%s,%c%c%c%c,%s,%s,%s\n"
+                          % (name,
+                             kind, urgency_to_flag[urgency], remote,
+                             fix_available,
+                             package, fixed_version, description))
+        result = base64.encodestring(zlib.compress(''.join(result), 9))
+
+        if not release:
+            release = 'sid'
+        c.execute(
+            "INSERT OR REPLACE INTO debsecan_data (name, data) VALUES (?, ?)",
+            ('release/' + release, result))
+
+        c.execute("DROP TABLE vulnlist")
+
+    def getDebsecan(self, name):
+        """Returns the debsecan data item NAME."""
+        for (data,) in self.cursor().execute(
+            "SELECT data FROM debsecan_data WHERE name = ?", (name,)):
+            return base64.decodestring(data)
+        else:
+            return None
+
     def replaceNVD(self, cursor, data):
         """Replaces the stored NVD data."""
         cursor.execute("DELETE FROM nvd_data");




More information about the Secure-testing-commits mailing list