[Secure-testing-commits] r2482 - bin lib/python

Florian Weimer fw at costa.debian.org
Thu Oct 20 09:02:13 UTC 2005


Author: fw
Date: 2005-10-20 09:02:12 +0000 (Thu, 20 Oct 2005)
New Revision: 2482

Added:
   bin/tracker_service.py
   lib/python/web_support.py
Log:
 r614 at deneb:  fw | 2005-10-13 22:12:28 +0200
 Add new web front end.
 
 bin/tracker_service.py, lib/python/web_support.py:
   New files.

Added: bin/tracker_service.py
===================================================================
--- bin/tracker_service.py	2005-10-20 09:01:57 UTC (rev 2481)
+++ bin/tracker_service.py	2005-10-20 09:02:12 UTC (rev 2482)
@@ -0,0 +1,816 @@
+#!/usr/bin/python
+
+import sys
+sys.path.insert(0,'../lib/python')
+
+if len(sys.argv) <> 3:
+    print "usage: python tracker_serivce.py SOCKET-PATH DATABASE-PATH"
+    sys.exit(1)
+socket_name = sys.argv[1]
+db_name = sys.argv[2]
+
+import bugs
+import re
+import security_db
+from web_support import *
+
+class TrackerService(WebService):
+    head_contents = compose(STYLE(
+        """h1 { font-size : 144%; }
+h2 { font-size : 120%; }
+h3 { font-size : 100%; }
+
+table { padding-left : 1.5em }
+td, th { text-align : left; 
+	 padding-left : 0.25em;
+         padding-right : 0.25em; }
+td { vertical-align: baseline }
+span.red { color: red; }
+span.dangerous { color: rgb(191,127,0); }
+"""), SCRIPT('''var old_query_value = "";
+
+function selectSearch() {
+  document.searchForm.query.focus();
+}
+
+function onSearch(query) {
+  if (old_query_value == "") {
+    if (query.length > 5) {
+      old_query_value = query;
+      document.searchForm.submit();
+    } else {
+      old_query_value = query;
+    }
+  }
+}
+''')).toHTML()
+    
+    def __init__(self, socket_name, db_name):
+        WebService.__init__(self, socket_name)
+        self.db = security_db.DB(db_name)
+        self.register('', self.page_home)
+        self.register('*', self.page_object)
+        self.register('source-package/*', self.page_source_package)
+        self.register('binary-package/*', self.page_binary_package)
+        self.register('status/release/stable', self.page_status_release_stable)
+        self.register('status/release/testing',
+                      self.page_status_release_testing)
+        self.register('status/release/unstable',
+                      self.page_status_release_unstable)
+        self.register('status/dtsa-candidates',
+                      self.page_status_dtsa_candidates)
+        self.register('status/todo', self.page_status_todo)
+        self.register('status/itp', self.page_status_itp)
+        self.register('data/unknown-packages', self.page_data_unknown_packages)
+        self.register('data/missing-epochs', self.page_data_missing_epochs)
+        self.register('data/releases', self.page_data_releases)
+        self.register('data/funny-versions', self.page_data_funny_versions)
+
+    def page_home(self, path, params, url):
+        query = params.get('query', ('',))[0]
+        if query:
+            if '/' in query:
+                return self.page_not_found(url, query)
+            else:
+                return RedirectResult(url.scriptRelativeFull(query))
+        
+        return self.create_page(
+            url, 'Security Bug Tracker',
+            [P(
+            """This is the experimental issue tracker for Debian's testing
+security team.  Keep in mind that this is merely a prototype.
+Please report any problems to """,
+            A("mailto:fw at deneb.enyo.de", "Florian Weimer"),
+            """.Note that some of the data presented here is known
+to be wrong (see below), but the data for the testing suite
+should be fine."""),
+                                 make_menu(
+            url.scriptRelative,
+            ('status/release/stable',
+             'Vulnerable packages in the stable suite'),
+            ('status/release/testing',
+             'Vulnerable packages in the testing suite'),
+            ('status/release/unstable',
+             'Vulnerable packages in the unstable suite'),
+            ('status/dtsa-candidates', "Candidates for DTSAs"),
+            ('status/todo', 'TODO items'),
+            ('status/itp', 'ITPs with potential security issues'),
+            ('data/unknown-packages',
+             'Packages names not found in the archive'),
+            ('data/missing-epochs',
+             'Package versions which might lack an epoch'),
+            ('data/funny-versions',
+             'Packages with strange version numbers'),
+            ('data/releases',
+             'Covered Debian releases and architectures (slow)'),
+            self.make_search_button(url)),
+
+             H2("A few notes on data sources"),
+             P("""Data in this tracker comes solely from the bug database
+which is maintained by Debian's testing security team in their
+Subversion repository.  All external data (this includes
+Debian bug reports and official Debian security advisories)
+must be added to this database before it appears here, and there
+can be some delay before this happens."""),
+             P("""At the moment, the database only contains information which is
+relevant for tracking the security status of the stable, testing and
+unstable suites.  This means that data for oldstable is likely wrong.""")],
+            search_in_page=True)
+
+    def page_object(self, path, params, url):
+        obj = path[0]
+
+        if not obj:
+            # Redirect to start page.
+            return RedirectResult(url.scriptRelativeFull(""))
+        
+        if 'A' <= obj[0] <= 'Z':
+            # Bug names start with a capital letter.
+            return self.page_bug(url, obj)
+
+        bugnumber = 0
+        try:
+            bugnumber = int(obj)
+        except ValueError:
+            pass
+        if bugnumber:
+            return self.page_debian_bug(url, bugnumber)
+
+        c = self.db.cursor()
+        if self.db.isSourcePackage(c, obj):
+            return RedirectResult(self.url_source_package(url, obj, full=True))
+        if  self.db.isBinaryPackage(c, obj):
+            return RedirectResult(self.url_binary_package(url ,obj, full=True))
+
+        return self.page_not_found(url, obj)
+
+    def page_bug(self, url, name):
+        cursor = self.db.cursor()
+        try:
+            bug = bugs.BugFromDB(cursor, name)
+        except ValueError:
+            return self.page_not_found(url, name)
+        if bug.name <> name:
+            # Show the normalized bug name in the browser address bar.
+            return RedirectResult(url.scriptRelativeFull(bug.name))
+
+        page = []
+
+        def gen_header():
+            yield B("Name"), bug.name
+
+            source = bug.name.split('-')[0]
+            if source in ('CAN', 'CVE'):
+                source_xref = self.make_cve_ref(url, bug.name, 'CVE')
+            elif source == 'DSA':
+                source_xref = self.make_dsa_ref(url, bug.name, 'Debian')
+            elif source == 'DTSA':
+                source_xref = 'Debian Testing Security Team'
+            elif source == 'FAKE':
+                source_xref = (
+        'Automatically generated temporary name.  Not for external reference.')
+            else:
+                source_xref = None
+
+            if source_xref:
+                yield B("Source"), source_xref
+        
+            if bug.description:
+                yield B("Description"), bug.description
+
+            xref = list(self.db.getBugXrefs(cursor, bug.name))
+            if xref:
+                yield B("References"), self.make_xref_list(url, xref)
+            
+            debian_bugs = bug.getDebianBugs(cursor)
+            if debian_bugs:
+                yield (B("Debian Bugs"),
+                       self.make_debian_bug_list(url, debian_bugs))
+
+            if not bug.not_for_us:
+                for (release, status, reason) in bug.getStatus(cursor):
+                    if status <> 'fixed':
+                        reason = self.make_red(reason)
+                    yield B('Status of %s' % release), reason
+
+        page.append(make_table(gen_header()))
+
+        if bug.notes:
+            page.append(H2("Vulnerable and fixed packages"))
+
+            def gen_source():
+                old_pkg = ''
+                for (package, release, version, vulnerable) \
+                        in self.db.getSourcePackages(cursor, bug.name):
+                    if package == old_pkg:
+                        package = ''
+                    else:
+                        old_pkg = package
+                        package = compose(
+                            self.make_source_package_ref(url, package),
+                            " (", self.make_pts_ref(url, package, 'PTS'), ")")
+                    if vulnerable:
+                        vuln = self.make_red('vulnerable')
+                        version = self.make_red(version)
+                    else:
+                        vuln = 'fixed'
+
+                    yield package, ', '.join(release), version, vuln
+
+            page.append(make_table(gen_source(),
+    caption=("Source Package", "Release", "Version", "Status"),
+    introduction=P('The table below lists information on source packages.')))
+
+            def gen_binary():
+                old_pkg = ''
+                for (packages, releases, version, archs, vulnerable) \
+                    in self.db.getBinaryPackages(cursor, bug.name):
+                    pkg = ', '.join(packages)
+                    if pkg == old_pkg:
+                        packages = ''
+                    else:
+                        old_pkg = pkg
+                        packages = self.make_binary_packages_ref(url, packages)
+
+                    if vulnerable:
+                        vuln = self.make_red('vulnerable')
+                        version = self.make_red(version)
+                    else:
+                        vuln = 'fixed'
+                    yield (packages,
+                           ', '.join(releases),
+                           version, vuln,
+                           ', '.join(archs))
+
+            page.append(make_table(gen_binary(),
+        caption=("Binary Package", "Release", "Version", "Status",
+                 "Architecures"),
+        introduction=P("The next table lists affected binary packages.")))
+
+            def gen_data():
+                notes_sorted = bug.notes[:]
+                notes_sorted.sort(lambda a, b: cmp(a.package, b.package))
+                for n in notes_sorted:
+                    if n.release:
+                        rel = str(n.release)
+                    else:
+                        rel = '(unstable)'
+                    urgency = str(n.urgency)
+                    if n.fixed_version:
+                        ver = str(n.fixed_version)
+                        if ver == '0':
+                            ver = '(not affected)'
+                            urgency = ''
+                    else:
+                        ver = self.make_red('(unfixed)')
+
+                    pkg = n.package
+                    pkg_kind = n.package_kind
+                    if pkg_kind == 'source':
+                        pkg = self.make_source_package_ref(url, pkg)
+                    elif pkg_kind == 'binary':
+                        pkg = self.make_binary_package_ref(url, pkg)
+                    elif pkg_kind == 'itp':
+                        pkg_kind = 'ITP'
+                        rel = ''
+                        ver = ''
+                        urgency = ''
+
+                    bugs = n.bugs
+                    bugs.sort()
+                    bugs = make_list(
+                        map(lambda x: self.make_debian_bug(url, x), bugs))
+                    if n.bug_origin:
+                        origin = self.make_xref(url, n.bug_origin)
+                    else:
+                        origin = ''
+                    yield (pkg, pkg_kind, rel, ver, urgency, origin, bugs)
+
+            page.append(
+                make_table(gen_data(),
+                    caption=("Package", "Type", "Release", "Fixed Version",
+                             "Urgency", "Origin", "Debian Bugs"),
+                    introduction=P("The information above is based on the following data on fixed versions.")))
+
+        if bug.comments:
+            page.append(H2("Notes"))
+            def gen_comments():
+                for (t, c) in bug.comments:
+                    yield c
+            page.append(make_pre(gen_comments()))
+
+        return self.create_page(url, bug.name, page)
+
+    def page_debian_bug(self, url, bugnumber):
+        buglist = list(self.db.getBugsFromDebianBug(self.db.cursor(),
+                                                    bugnumber))
+        if buglist:
+            if len(buglist) == 1:
+                # Single issue, redirect.
+                return RedirectResult(url.scriptRelativeFull(buglist[0][0]))
+
+            def gen():
+                for (name, urgency, description) in buglist:
+                    if urgency == "unknown":
+                        urgency = ""
+                    yield self.make_xref(url, name), urgency, description
+
+            return self.create_page(
+                url, "Information related to Debian bug #%d" % bugnumber,
+                [P("The following issues reference to Debian bug ",
+                   self.make_debian_bug(url, bugnumber), ":"),
+                 make_table(gen(),
+                            caption=("Name", "Urgency", "Description"))])
+                    
+        else:
+            return self.page_not_found(url, str(bugnumber))
+
+    def page_not_found(self, url, query):
+        return self.create_page(url, 'Not found',
+                                [P('Your query ',
+                                   CODE(query),
+                                   ' matched no results.')],
+                                status=404)
+
+    def page_source_package(self, path, params, url):
+        pkg = path[0]
+        
+        def gen_versions():
+            for (releases, version) in self.db.getSourcePackageVersions(
+                self.db.cursor(), pkg):
+                yield ', '.join(releases), version
+        def gen_binary():
+            for (packages, releases, archs, version) \
+                    in self.db.getBinaryPackagesForSource(
+                self.db.cursor(), pkg):
+                yield (self.make_binary_packages_ref(url, packages),
+                       ', '.join(releases), version, ', '.join(archs))
+        def gen_bug_list(lst):
+            for (bug, description) in lst:
+                yield self.make_xref(url, bug), description
+                
+        return self.create_page(
+            url, "Information on source package " + pkg,
+            [make_menu(lambda x: x,
+                       (self.url_pts(url, pkg),
+                        pkg + ' in the Package Tracking System'),
+                       (self.url_debian_bug_pkg(url, pkg),
+                        pkg + ' in the Bug Tracking System'),
+                       (self.url_testing_status(url, pkg),
+                        pkg + ' in the testing migration checker')),
+             H2("Available versions"),
+             make_table(gen_versions(), caption=("Release", "Version")),
+             
+             H2("Available binary packages"),
+             make_table(gen_binary(),
+            caption=('Package', 'Release', 'Version', 'Architectures'),
+            replacement="""No binary packages are recorded in this database.
+This probably means that the package is architecture-specific, and the
+architecture is currently not tracked."""),
+
+             H2("Open issues"),
+             make_table(gen_bug_list(self.db.getBugsForSourcePackage
+                                     (self.db.cursor(), pkg, True)),
+                        caption=('Bug', 'Description'),
+                        replacement='No known open issues.'),
+
+             H2("Resolved issues"),
+             make_table(gen_bug_list(self.db.getBugsForSourcePackage
+                                     (self.db.cursor(), pkg, False)),
+                        caption=('Bug', 'Description'),
+                        replacement='No known resolved issues.')])
+
+    def page_binary_package(self, path, params, url):
+        pkg = path[0]
+
+        def gen_versions():
+            for (releases, source, version, archs) \
+                    in self.db.getBinaryPackageVersions(self.db.cursor(), pkg):
+                yield (', '.join(releases),
+                       self.make_source_package_ref(url, source),
+                       version, ', '.join(archs))
+        def gen_bug_list(lst):
+            for (bug, description) in lst:
+                yield self.make_xref(url, bug), description
+
+        return self.create_page(
+            url, "Information on binary package " + pkg,
+            [make_menu(lambda x: x,
+                       (self.url_debian_bug_pkg(url, pkg),
+                        pkg + ' in the Bug Tracking System')),
+             H2("Available versions"),
+             make_table(gen_versions(),
+                caption=("Release", "Source", "Version", "Architectures")),
+             
+             H2("Open issues"),
+             make_table(gen_bug_list(self.db.getBugsForBinaryPackage
+                                     (self.db.cursor(), pkg, True)),
+                        caption=('Bug', 'Description'),
+                        replacement='No known open issues.'),
+
+             H2("Resolved issues"),
+             make_table(gen_bug_list(self.db.getBugsForBinaryPackage
+                                     (self.db.cursor(), pkg, False)),
+                        caption=('Bug', 'Description'),
+                        replacement='No known resolved issues.'),
+
+             H2("Non-issues"),
+                make_table(gen_bug_list(self.db.getNonBugsForBinaryPackage
+                                        (self.db.cursor(), pkg)),
+                    caption=('Bug', 'Description'),
+                    replacement="""No known issues which do not affect
+this package, but still reference it.""")])
+
+    def page_status_release_stable(self, path, params, url):
+        def gen():
+            old_pkg_name = ''
+            for (pkg_name, bug_name, archive, urgency) in \
+                    self.db.cursor().execute(
+                """SELECT package, bug, section, urgency FROM stable_status"""):
+                if pkg_name == old_pkg_name:
+                    pkg_name = ''
+                else:
+                    old_pkg_name = pkg_name
+                    if archive <> 'main':
+                        pkg_name = "%s (%s)" % (pkg_name, archive)
+
+                if urgency == 'unknown':
+                    urgency = ''
+                elif urgency == 'high':
+                    urgency = self.make_red(urgency)
+
+                yield pkg_name, self.make_xref(url, bug_name), urgency
+
+        return self.create_page(
+            url, 'Vulnerable source packages in the stable suite',
+            [make_table(gen(), caption=("Package", "Bug", "Urgency"))])
+            
+    def page_status_release_testing(self, path, params, url):
+        def gen():
+            old_pkg_name = ''
+            for (pkg_name, bug_name, archive, urgency,
+                 sid_vulnerable, ts_fixed) in self.db.cursor().execute(
+                """SELECT package, bug, section, urgency, unstable_vulnerable,
+                testing_security_fixed
+                FROM testing_status"""):
+                if pkg_name == old_pkg_name:
+                    pkg_name = ''
+                else:
+                    old_pkg_name = pkg_name
+                    if archive <> 'main':
+                        pkg_name = "%s (%s)" % (pkg_name, archive)
+
+                if ts_fixed:
+                    status = 'fixed in testing-security'
+                else:
+                    if sid_vulnerable:
+                        status = self.make_red('unstable is vulnerable')
+                    else:
+                        status = self.make_dangerous('fixed in unstable')
+
+                if urgency == 'unknown':
+                    urgency = ''
+
+                yield (pkg_name, self.make_xref(url, bug_name),
+                       urgency, status)
+
+        return self.create_page(
+            url, 'Vulnerable source packages in the testing suite',
+            [make_menu(url.scriptRelative,
+                       ("status/dtsa-candidates", "Candidates for DTSAs")),
+             make_table(gen(), caption=("Package", "Bug"))])
+
+    def page_status_release_unstable(self, path, params, url):
+        def gen():
+            old_pkg_name = ''
+            for (pkg_name, bug_name, section, urgency) \
+                    in self.db.cursor().execute(
+                """SELECT DISTINCT sp.name, st.bug_name,
+                sp.archive, st.urgency
+                FROM source_package_status AS st, source_packages AS sp
+                WHERE st.vulnerable AND st.urgency <> 'unimportant'
+                AND sp.rowid = st.package AND sp.release = 'sid'
+                AND sp.subrelease = ''
+                ORDER BY sp.name, st.bug_name"""):
+                if pkg_name == old_pkg_name:
+                    pkg_name = ''
+                else:
+                    old_pkg_name = pkg_name
+                    if section <> 'main':
+                        pkg_name = "%s (%s)" % (pkg_name, section)
+                    else:
+                        pkg_name = self.make_xref(url, pkg_name)
+
+                if urgency == 'unknown':
+                    urgency = ''
+                elif urgency == 'high':
+                    urgency = self.make_red(urgency)
+
+                yield pkg_name, self.make_xref(url, bug_name), urgency
+
+
+        return self.create_page(
+            url, 'Vulnerable source packages in the testing suite',
+            [P("""Note that the list below is based on source packages.
+            This means that packages are not listed here once a new,
+            fixed source version has been uploaded to the archive, even
+            if there are still some vulnerably binary packages present
+            in the archive."""),
+             make_table(gen(), caption=('Package', 'Bug', 'Urgency'))])
+
+    def page_status_dtsa_candidates(self, path, params, url):
+        def gen():
+            old_pkg_name = ''
+            for (pkg_name, bug_name, archive, urgency, stable_later) \
+                    in self.db.cursor().execute(
+                """SELECT package, bug, section, urgency,
+                (SELECT testing.version_id < stable.version_id
+                 FROM source_packages AS testing, source_packages AS stable
+                 WHERE testing.name = testing_status.package
+                 AND testing.release = 'etch'
+                 AND testing.subrelease = ''
+                 AND testing.archive = testing_status.section
+                 AND stable.name = testing_status.package
+                 AND stable.release = 'sarge'
+                 AND stable.subrelease = 'security'
+                 AND stable.archive = testing_status.section)
+                FROM testing_status
+                WHERE (NOT unstable_vulnerable)
+                AND (NOT testing_security_fixed)"""):
+                if pkg_name == old_pkg_name:
+                    pkg_name = ''
+                    migration = ''
+                else:
+                    old_pkg_name = pkg_name
+                    migration = A(self.url_testing_status(url, pkg_name),
+                                  "check")
+                    if archive <> 'main':
+                        pkg_name = "%s (%s)" % (pkg_name, archive)
+                    else:
+                        pkg_name = self.make_source_package_ref(url, pkg_name)
+
+                if urgency == 'unknown':
+                    urgency = ''
+                elif urgency == 'high':
+                    urgency = self.make_red(urgency)
+
+                if stable_later:
+                    notes = "(fixed in stable?)"
+                else:
+                    notes = ''
+
+                yield (pkg_name, migration, self.make_xref(url, bug_name),
+                       urgency, notes)
+
+        return self.create_page(
+            url, "Candidates for DTSAs",
+            [P("""The table below lists packages which are fixed
+in unstable, but unfixed in testing.  Use the testing migration
+return web_supporttracker to find out why they have not entered
+return web_supporttesting yet."""),
+             make_menu(url.scriptRelative,
+                       ("status/release/testing",
+                        "List of vulnerable packages in testing")),
+             make_table(gen(),
+                        caption=("Package", "Migration", "Bug", "Urgency"))])
+
+    def page_status_todo(self, path, params, url):
+        def gen():
+            for (bug, description) in self.db.getTODOs():
+                yield self.make_xref(url, bug), description
+        return self.create_page(
+            url, "Bugs with TODO items",
+            [make_table(gen(),
+                        caption=("Bug", "Description"))])
+
+    def page_status_itp(self, path, params, url):
+        def gen():
+            old_pkg = ''
+            for pkg, bugs, debian_bugs in self.db.getITPs(self.db.cursor()):
+                if pkg == old_pkg:
+                    pkg = ''
+                else:
+                    old_pkg = pkg
+                yield (pkg, self.make_xref_list(url, bugs),
+                       self.make_debian_bug_list(url, debian_bugs))
+        return self.create_page(
+            url, "ITPs with potential security issues",
+            [make_table(gen(), caption=("Package", "Issue", "Debian Bugs"),
+                        replacement="No ITP bugs are currently known.")])
+
+    def page_data_unknown_packages(self, path, params, url):
+        def gen():
+            for name, bugs in self.db.getUnknownPackages(self.db.cursor()):
+                yield name, self.make_xref_list(url, bugs)
+        return self.create_page(
+            url, "Unknown packages",
+            [P("""Sometimes, a package referenced in a bug report
+cannot be found in the database.  This can be the result of a spelling
+return web_supporterror, or a historic entry refers to a
+return web_supportpackage which is no longer in the archive."""),
+             make_table(gen(), caption=("Package", "Bugs"),
+        replacement="No unknown packages are referenced in the database.")])
+
+    def page_data_missing_epochs(self, path, params, url):
+        def gen():
+            old_bug = ''
+            old_pkg = ''
+            for bug, pkg, ver1, ver2 in self.db.cursor().execute(
+                """SELECT DISTINCT bug_name, n.package,
+                n.fixed_version, sp.version
+                FROM package_notes AS n, source_packages AS sp
+                WHERE n.package_kind = 'source'
+                AND n.fixed_version NOT LIKE '%:%'
+                AND n.fixed_version <> '0'
+                AND n.bug_origin = ''
+                AND sp.name = n.package
+                AND sp.version LIKE '%:%'
+                ORDER BY bug_name, package"""):
+                if bug == old_bug:
+                    bug = ''
+                else:
+                    old_bug = bug
+                    old_pkg = ''
+                    bug = self.make_xref(url, bug)
+                if pkg == old_pkg:
+                    pkg = ''
+                else:
+                    old_pkg = pkg
+                    pkg = self.make_source_package_ref(url, pkg)
+                yield bug, pkg, ver1, ver2
+
+        return self.create_page(
+            url, "Missing epochs in package versions",
+            [make_table(gen(),
+                caption=("Bug", "Package", "Version 1", "Version 2"),
+                replacement="No source package version with missing epochs.")])
+
+    def page_data_releases(self, path, params, url):
+        def gen():
+            for (rel, subrel, archive, sources, archs) \
+                    in self.db.availableReleases():
+                if sources:
+                    sources = 'yes'
+                else:
+                    sources = 'no'
+                yield rel, subrel, archive, sources, make_list(archs)
+        return self.create_page(
+            url, "Available releases",
+            [P("""The security issue database is checked against
+the Debian releases listed in the table below."""),
+             make_table(gen(),
+                        caption=("Release", "Subrelease", "Archive",
+                                 "Sources", "Architectures"))])
+
+    def page_data_funny_versions(self, path, params, url):
+        def gen():
+            for name, release, archive, version, source_version \
+                in self.db.getFunnyPackageVersions():
+                yield name, release, archive, source_version, version
+
+        return self.create_page(
+            url, "Version conflicts between source/binary packages",
+            [P("""The table below lists source packages
+            which have a binary package of the same name, but with a different
+            version.  This means that extra care is necessary to determine
+            the version of a package which has been fixed.  (Note that
+            the bug tracker prefers source versions to binary versions
+            in this case.)"""),
+             make_table(gen(),
+                        caption=("Package",
+                                 "Release", 
+                                 "Archive",
+                                 "Source Version",
+                                 "Binary Version")),
+             P("""Technically speaking, these version numbering is fine,
+but it makes version-based bug tracking quite difficult for these packages."""),
+             P("""There are many binary packages which are built from source
+             packages with different version numbering schemes.  However, as
+             long as none of the binary packages carries the same name as the
+             source package, most confusion is avoided or can be easily
+             explained.""")])
+        
+
+    def create_page(self, url, title, body, search_in_page=False, status=200):
+        append = body.append
+        append(HR())
+        if not search_in_page:
+            append(self.make_search_button(url))
+        append(P(A(url.scriptRelative(""), "Home"),
+                    " - ", A(url.absolute("http://secure-testing.debian.net/"),
+                             "Testing Security Team"),
+                    " - ", A(url.absolute("http://www.debian.org/security/"),
+                             "Debian Security"),
+                    " - ", A(url.absolute
+                             ("http://www.enyo.de/fw/impressum.html"),
+                             "Imprint")))
+        if search_in_page:
+            on_load = "selectSearch()"
+        else:
+            on_load = None
+        return HTMLResult(self.add_title(title, body,
+                                         head_contents=self.head_contents,
+                                         body_attribs={'onload': on_load}),
+                          doctype=self.html_dtd(),
+                          status=status)
+
+    def make_search_button(self, url):
+        return FORM("Search for package or bug name: ",
+                    INPUT(type='text', name='query',
+                          onkeyup="onSearch(this.value)",
+                          onmousemove="onSearch(this.value)"),
+                    INPUT(type='submit', value='Go'),
+                    method='get',
+                    action=url.scriptRelative(''))
+
+    def url_cve(self, url, name):
+        return url.absolute("http://cve.mitre.org/cgi-bin/cvename.cgi",
+                            name=name)
+    def url_dsa(self, url, dsa, re_dsa=re.compile(r'^DSA-(\d+)(?:-\d+)?$')):
+        match = re_dsa.match(dsa)
+        if match:
+            # We must determine the year because there is no generic URL.
+            (number,) = match.groups()
+            for (date,) in self.db.cursor().execute(
+                "SELECT release_date FROM bugs WHERE name = ?", (dsa,)):
+                (y, m, d) = date.split('-')
+                return url.absolute("http://www.debian.org/security/%d/dsa-%d"
+                                    % (int(y), int(number)))
+        return None
+
+    def url_debian_bug(self, url, debian):
+        return url.absolute("http://bugs.debian.org/cgi-bin/bugreport.cgi",
+                            bug=str(debian))
+    def url_debian_bug_pkg(self, url, debian):
+        return url.absolute("http://bugs.debian.org/cgi-bin/pkgreport.cgi",
+                            pkg=debian)
+    def url_pts(self, url, package):
+        return url.absolute("http://packages.qa.debian.org/common/index.html",
+                            src=package)
+    def url_testing_status(self, url, package):
+        return url.absolute("http://bjorn.haxx.se/debian/testing.pl",
+                            package=package)
+    def url_source_package(self, url, package, full=False):
+        if full:
+            return url.scriptRelativeFull("source-package/" + package)
+        else:
+            return url.scriptRelative("source-package/" + package)
+    def url_binary_package(self, url, package, full=False):
+        if full:
+            return url.scriptRelativeFull("binary-package/" + package)
+        else:
+            return url.scriptRelative("binary-package/" + package)
+
+    def make_xref(self, url, name):
+        return A(url.scriptRelative(name), name)
+
+    def make_xref_list(self, url, lst, separator=', '):
+        return make_list(map(lambda x: self.make_xref(url, x), lst), separator)
+
+    def make_debian_bug(self, url, debian):
+        return A(self.url_debian_bug(url, debian), str(debian))
+    def make_debian_bug_list(self, url, lst):
+        return make_list(map(lambda x: self.make_debian_bug(url, x), lst))
+
+    def make_cve_ref(self, url, cve, name=None):
+        if name is None:
+            name = cve
+        return A(self.url_cve(url, cve), name)
+    
+    def make_dsa_ref(self, url, dsa, name=None):
+        if name is None:
+            name = dsa
+        u = self.url_dsa(url, dsa)
+        if u:
+            return A(u, name)
+        else:
+            return name
+
+    def make_pts_ref(self, url, pkg, name=None):
+        if name is None:
+            name = pkg
+        return A(self.url_pts(url, pkg), name)
+
+    def make_source_package_ref(self, url, pkg, title=None):
+        if title is None:
+            title = pkg
+        return A(self.url_source_package(url, pkg), title)
+    def make_binary_package_ref(self, url, pkg, title=None):
+        if title is None:
+            title = pkg
+        return A(self.url_binary_package(url, pkg), title)
+    def make_binary_packages_ref(self, url, lst):
+        assert type(lst) <> types.StringType
+        return make_list(map(lambda x: self.make_binary_package_ref(url, x),
+                             lst))
+
+    def make_red(self, contents):
+        return SPAN(contents, _class="red")
+                    
+    def make_dangerous(self, contents):
+        return SPAN(contents, _class="dangerous")
+
+    def pre_dispatch(self):
+        self.db.refresh()
+
+TrackerService(socket_name, db_name).run()


Property changes on: bin/tracker_service.py
___________________________________________________________________
Name: svn:mime-type
   + text/script

Added: lib/python/web_support.py
===================================================================
--- lib/python/web_support.py	2005-10-20 09:01:57 UTC (rev 2481)
+++ lib/python/web_support.py	2005-10-20 09:02:12 UTC (rev 2482)
@@ -0,0 +1,692 @@
+# web_support.py -- simple HTTP generation framework
+# Copyright (C) 2005 Florian Weimer <fw at deneb.enyo.de>
+# 
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import cgi
+import cStringIO
+import os
+import re
+import socket
+import struct
+import sys
+import grp
+import traceback
+import types
+import urllib
+
+class ServinvokeError(Exception):
+    pass
+
+class Service:
+    """A class for service objects.
+
+    Service objects are contacted by the program servinvoke and
+    process HTTP requests in a serialized fashion.  (Only the data
+    transfer from and to the client happens in parallel, and this is
+    handled by the servinvoke program.)
+
+    If the newly created socket is owned by the www-data group, it is
+    automatically made readable by that group.
+    """
+
+    def __init__(self, socket_name):
+        self.socket_name = socket_name
+        self._unlinkSocket()
+        self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
+        self.socket.bind(self.socket_name)
+        self.socket.listen(5)
+        self._chmod()
+
+    def __del__(self):
+        self._unlinkSocket()
+
+    def _unlinkSocket(self):
+        try:
+            os.unlink(self.socket_name)
+        except OSError:
+            pass
+
+    def _chmod(self):
+        gid = os.stat(self.socket_name).st_gid
+        grpent = grp.getgrgid(gid)
+        if grpent[0] == 'www-data':
+            os.chmod(self.socket_name, 0660)
+
+    def log(self, msg, *args):
+        sys.stderr.write((msg % args) + "\n")
+
+    def run(self):
+        while 1:
+            (client, addr) = self.socket.accept()
+
+            def read(count):
+                data = ''
+                cnt = 0
+                while cnt <> count:
+                    d = client.recv(count - cnt)
+                    if d:
+                        data += d
+                        cnt = len(data)
+                    else:
+                        self.log("unexpected end of data from servinvoke")
+                        raise ServinvokeError()
+
+                return data
+
+            try:
+                header = read(24)
+                (magic, version, cli_size, cli_count, env_size, env_count) = \
+                        struct.unpack("!6I", header)
+                if magic <> 0x15fd34df:
+                    sys.log("unknown magic number %08X", magic)
+                if version <> 1:
+                    sys.log("unknown version %08X", magic)
+                cli = read(cli_size).split('\0')[:-1]
+                env = {}
+                for x in read(env_size).split('\0')[:-1]:
+                    (key, value) = x.split('=', 1)
+                    env[key] = value
+                data = []
+                while 1:
+                    d = client.recv(4096)
+                    if d:
+                        data.append(d)
+                    else:
+                        break
+                data = ''.join(data)
+                result = cStringIO.StringIO()
+                self.handle(cli, env, data, result)
+                client.sendall(result.getvalue())
+                client.close()
+                
+            except ServinvokeError:
+                client.close()
+                pass
+            except KeyboardInterrupt:
+                client.close()
+                raise
+            except:
+                client.close()
+                target = cStringIO.StringIO()
+                traceback.print_exc(None, target)
+                self.log("%s", target.getvalue())
+        
+        def handle(args, environ, data):
+            """Invoke by run to handle a single request.  Should
+            return the data to be sent back to the client."""
+            return ""
+
+class URL:
+    """A simple wrapper class for strings which are interpreted as URLs."""
+    def __init__(self, url):
+        self.__url = url
+    def __str__(self):
+        return self.__url
+    def __repr__(self):
+        return "URL(%s)" % `self.__url`
+
+class URLFactory:
+    """Creates URL objects.
+
+    This factory class handles the case where a script wants to
+    generate URLs which reference to itself (see scriptRelative)."""
+    
+    def __init__(self, server_name, script_name):
+        self.server_name = server_name or 'localhost'
+        script_name = self._stripSlashes(script_name or '')
+        if script_name[-1:] == '/' or script_name == '':
+            self.script_name = script_name
+        else:
+            self.script_name = script_name + '/'
+
+    def _convertArgs(self, args):
+        arglist = []
+        for (key, value) in args.items():
+            if value is None:
+                continue
+            arglist.append("%s=%s" % (urllib.quote(key),
+                                      urllib.quote(value)))
+        if arglist:
+            return "?" + '&'.join(arglist)
+        else:
+            return ""
+    def _stripSlashes(self, arg):
+        while arg[:1] == '/':
+            arg = arg[1:]
+        return arg
+    
+    def absolute(self, url, **args):
+        """Creates an absolute URL, with optional arguments to pass."""
+        
+        return URL(url + self._convertArgs(args))
+
+    def scriptRelative(self, path, **args):
+        """Returns a URL which references to the path relative to the
+        current script.  Optionally, arguments to pass can be included."""
+
+        return URL("/%s%s%s" % (self.script_name,
+                                self._stripSlashes(path),
+                                self._convertArgs(args)))
+    
+    def scriptRelativeFull(self, path, **args):
+        """Like scriptRelative, but returns an absolute URL, including
+        the http:// prefix."""
+
+        return URL("http://%s/%s%s%s" % (self.server_name, self.script_name,
+                                         self._stripSlashes(path),
+                                         self._convertArgs(args)))
+
+
+stringToHTML = map(chr, range(256))
+def _initStringToHTML(s):
+    for (ch, repl) in (('<', '&lt;'),
+                       ('>', '&gt;'),
+                       ('&', '&amp;'),
+                       ('"', '&quot;')):
+        s[ord(ch)] = repl
+_initStringToHTML(stringToHTML)
+del _initStringToHTML
+
+def escapeHTML(str):
+    '''Replaces the characters <>&" in the passed strings with their
+    HTML entities.'''
+    
+    result = []
+    append = result.append
+    for ch in str:
+        append(stringToHTML[ord(ch)])
+    return ''.join(result)
+
+class HTMLBase:
+    def flatten(self, write):
+        """Invokes write repeatedly, for the tag and its contents.
+
+        Note that typically, a lot of very short strings a written, so
+        it's better to add some buffering before sending the strings
+        elsewhere."""
+        pass
+
+    def toString(self):
+        """Invokes flatten to create a new string object."""
+        r = cStringIO.StringIO()
+        self.flatten(r.write)
+        return r.getvalue()
+
+    def toHTML(self):
+        return VerbatimHTML(self.toString())
+
+class VerbatimHTML(HTMLBase):
+    """Creates verbatim HTML from a string object.  Mainly used for
+    optimizing recurring HTML snippets."""
+    
+    def __init__(self, contents):
+        self.__contents = contents
+
+    def flatten(self, write):
+        write(self.__contents)
+
+class Compose(HTMLBase):
+    """Glues a sequence of HTML snippets together, without enclosing it in
+    a tag."""
+    def __init__(self, contents):
+        self.__contents = contents
+
+    def flatten(self, write):
+        for x in self.__contents:
+            if type(x) == types.StringType:
+                write(escapeHTML(x))
+            else:
+                x.flatten(write)
+
+def compose(*contents):
+    """Concatenates several HTML objects."""
+    return Compose(contents)
+    
+
+class Tag(HTMLBase):
+    """Base class for HTML tags."""
+    
+    re_name = re.compile(r'\A_?[a-zA-Z][a-zA-Z0-9]*\Z')
+    
+    def __init__(self, name, contents, attribs={}):
+        self._check(name)
+        self.__name = name
+        attrs = []
+        append = attrs.append
+        for (key, value) in attribs.items():
+            if value is None:
+                continue
+            self._check(key)
+            append(' ')
+            if key[0] == '_':
+                append(key[1:])
+            else:
+                append(key)
+            append('="')
+            for ch in str(value):
+                append(stringToHTML[ord(ch)])
+            append('"')
+        self.__attribs = ''.join(attrs)
+        self.contents = contents
+
+    def _check(self, name):
+        if self.re_name.match(name):
+            return
+        else:
+            raise ValueError, "invalid name: " + `name`
+
+    def flatten(self, write):
+        if self.contents:
+            write("<%s%s>" % (self.__name, self.__attribs))
+            closing = "</%s>" % self.__name
+            try:
+                for x in self.contents:
+                    if type(x) == types.StringType:
+                        write(escapeHTML(x))
+                    else:
+                        x.flatten(write)
+            except:
+                # If we encountered any exception, try to write the
+                # closing tag nevertheless.  This increases our
+                # chances that we produce valid XML.
+                try:
+                    write(closing)
+                except:
+                    pass
+                raise
+            write(closing)
+            
+        else:
+            write("<%s%s/>" % (self.__name, self.__attribs))
+
+    def __repr__(self):
+        return "<websupport.Tag instance, name=%s>" % `self.__name`
+
+def tag(__name, __contents, **__attribs):
+    """Creates a new tag object.
+    
+    name - name of the tag
+    contents - a sequence objet (or iterator) for the enclosed contents
+    attribs - keyword arguments froming forming attributes
+    """
+    return Tag(__name, __contents, __attribs)
+
+def emptyTag(__name, **__attribs):
+    """A tag without contents.
+
+    name - name of the tag
+    attribs - keyword arguments froming forming attributes
+    """
+    return Tag(__name, None, __attribs)
+
+def A(url, text=None):
+    if text is None:
+        text = url
+    return tag('a', text, href=str(url))
+def STYLE(contents, type='text/css'):
+    return tag('style', contents, type=type)
+def SCRIPT(contents, type="text/javascript", language="JavaScript"):
+    return tag('script', contents, type=type, language=language)
+def TITLE(contents):
+    return tag('title', contents)
+def HTML(head, body):
+    return tag('html', (HEAD(head), BODY(body)))
+def HEAD(contents):
+    return tag('head', contents)
+def BODY(contents, onload=None):
+    return tag('body', contents, onload=onload)
+def H1(contents):
+    return tag('h1', contents)
+def H2(contents):
+    return tag('h2', contents)
+def P(*contents):
+    return Tag('p', contents)
+def SPAN(*__contents, **__attribs):
+    return Tag('span', __contents, __attribs)
+def HR():
+    return tag('hr', ())
+def BR():
+    return tag('br', ())
+def CODE(contents):
+    return tag('code', contents)
+def B(contents):
+    return tag('b', contents)
+def TABLE(contents):
+    return tag('table', contents)
+def TR(*contents):
+    return tag('tr', contents)
+def TH(*contents):
+    return tag('th', contents)
+def TD(*contents):
+    return tag('td', contents)
+def FORM(*__contents, **__attribs):
+    return Tag('form', __contents, __attribs)
+def INPUT(*__contents, **__attribs):
+    return Tag('input', __contents, __attribs)
+def LI(*__contents, **__attribs):
+    return Tag('li', __contents, __attribs)
+
+def make_table(contents, caption=None, replacement=None, introduction=None):
+    rows = []
+    for row in contents:
+        cols = []
+        if caption and not rows:
+            for col in caption:
+                cols.append(TH(col))
+            rows.append(Tag('tr', cols))
+            cols = []
+
+        for col in row:
+            cols.append(TD(col))
+        rows.append(Tag('tr', cols))
+    if rows:
+        if introduction:
+            return compose(introduction, TABLE(rows))
+        return TABLE(rows)
+    else:
+        return compose()
+
+def make_pre(lines):
+    """Creates a pre-formatted text area."""
+    r = []
+    append = r.append
+    for l in lines:
+        append(l)
+        append('\n')
+    return tag('pre', ''.join(l))
+
+def make_menu(convert, *entries):
+    """Creates an unnumbered list of hyperlinks.
+    Each entry can be:
+
+    - a pair (URL, LABEL).
+      convert(URL) is used as the link, and LABEL as the link text.
+    - some non-tuple value.
+      This is added as an individual item.
+    """
+    ul = []
+    append = ul.append
+    for e in entries:
+        if type(e) == types.TupleType:
+            (relurl, label) = e
+            append(LI(A(convert(relurl), label)))
+        else:
+            append(LI(e))
+    return tag('ul', ul)
+
+def make_list(lst, separator=", "):
+    """Creates a list of HTML elements."""
+    assert type(lst) <> types.StringType
+    c = []
+    if lst:
+        append = c.append
+        for e in lst[:-1]:
+            append(e)
+            append(separator)
+        append(lst[-1])
+    return Compose(c)
+
+class InvalidPath(Exception):
+    """An unknown path was submitted to PathRouter.get"""
+
+class PathRouter:
+    """This class maps paths to registered values."""
+
+    def __init__(self):
+        self.__map = {}
+
+    def register(self, path, value):
+        """Registers the indicated value for the path.
+
+        Path may end with '*' or '**', indicating single-level
+        wildcards or multi-level wildcards."""
+
+        m = self.__map
+        p = path.split('/')
+        if p and not p[0]:
+            del p[0]
+        for x in range(len(p)):
+            element = p[x]
+            if element:
+                if m.has_key(element):
+                    m = m[element]
+                else:
+                    if element == '*':
+                        if x + 1 <> len(p):
+                            raise ValueError, 'wildcard * in the middle of path'
+                        m['*'] = value
+                        return
+                    if element == '**':
+                        if x + 1 <> len(p):
+                            raise ValueError, \
+                                  'wildcard ** in the middle of path'
+                        m['**'] = value
+                        return
+                    
+                    m_new = {}
+                    m[element] = m_new
+                    m = m_new
+            else:
+                raise ValueError, "path contains empty element"
+        m[''] = value
+
+    def get(self, path):
+        """Returns a tuple (VALUE, REMAINING-PATH), for the
+        most-specific path matching the given path."""
+
+        m = self.__map
+        p = path.split('/')
+        while p and not p[-1]:
+            del p[-1]
+        l = len(p)
+        for x in range(l):
+            element = p[x]
+
+            # Ignore empty path elements (leadings slash, duplicated
+            # slashes).
+            if element:
+                try:
+                    m = m[element]
+                except KeyError:
+                    if x + 1 == l and m.has_key('*'):
+                        # Use '*' only if the remaining path is empty.
+                        return (m['*'], tuple(p[x:]))
+                    if m.has_key('**'):
+                        return (m['**'], tuple(p[x:]))
+                    raise InvalidPath
+        try:
+            result = m['']
+        except KeyError:
+            if m.has_key('*'):
+                result = m['*']
+            elif m.has_key('**'):
+                result = m['**']
+            else:
+                raise InvalidPath
+        return (result, ())
+
+class Result:
+    """Base class for result objects."""
+
+    def __init__(self):
+        self.status = 500
+
+    def flatten(self, write):
+        pass
+
+class RedirectResult(Result):
+    """Permanently redirects the browser to a new URL."""
+    def __init__(self, url):
+        self.status = 301
+        self.url = str(url)
+
+    def flatten(self, write):
+        write("Status: %d\nLocation: %s\n\n" % (self.status, self.url))
+
+class HTMLResult(Result):
+    """An object of this class combines a status code with HTML contents."""
+    def __init__(self, contents, status=200, doctype=''):
+        self.contents = contents
+        self.status = status
+        self.doctype = doctype
+
+    def flatten(self, write):
+        """Invokes write for the response header and all HTML data.
+        Includes the doctype declaration."""
+
+        if self.status <> 200:
+            write("Status: %d\n" % self.status)
+        write("Content-Type: text/html\n\n%s\n" % self.doctype)
+        self.contents.flatten(write)
+
+class WebService(Service):
+    def __init__(self, socket_name):
+        Service.__init__(self, socket_name)
+        self.__router = PathRouter()
+
+    def register(self, path, method):
+        """Requests that method is invoked if path is encountered.
+
+        The path has the syntax required by PathRouter.register.  The
+        method should be a function taking several arguments
+
+        - the remaining path
+        - a dictionary for the request parameters
+        - a URLFactory object
+
+        The method is expected to return a HTMLResult object.
+        """
+        self.__router.register(path, method)
+
+    def __writeError(self, result, code, msg):
+        result.write('Status: %d\nContent-Type: text/plain\n\n%s\n'
+                     % (code, msg))
+
+    def html_dtd(self):
+        """Returns the DOCTYPE declaration to be used for HTML documents.
+        Can be overridden."""
+        return '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">'
+
+    def add_title(self, title, body, head_contents=None, body_attribs={}):
+        """Takes a sequence of HTML objects and wraps them in 'body'
+        and 'html' tags.  Puts title in front of it, and optionally
+        includes the head_contents material.  The attributes of the
+        body element are taken from the body_attribs dictionary."""
+        t = TITLE(title)
+        if head_contents is None:
+            head_list = [t]
+        else:
+            if isinstance(head_contents, HTMLBase):
+                head_list = [head_contents]
+            else:
+                head_list = list(head_contents)
+            head_list.append(t)
+        if isinstance(body, HTMLBase):
+            body_list = [body]
+        else:
+            body_list = list(body)
+        body_list[:0] = (H1(title),)
+
+        return tag('html',
+                   (HEAD(head_list), Tag('body', body_list, body_attribs)))
+
+    def pre_dispatch(self, url):
+        """Invoked by handle prior to calling the registered handler."""
+        pass
+    
+    def handle(self, args, environment, data, result):
+        params = cgi.parse(data, environment)
+        path = environment.get('PATH_INFO', '')
+        server_name = environment.get('SERVER_NAME', '')
+        script_name = environment.get('SCRIPT_NAME', '')
+
+        try:
+            (method, remaining) = self.__router.get(path)
+        except InvalidPath:
+            self.__writeError(result, 404, "page not found")
+            return
+        self.pre_dispatch()
+        url = URLFactory(server_name, script_name)
+        r = method(remaining, params, url)
+        assert isinstance(r, Result), `r`
+        r.flatten(result.write)
+
+def __test():
+    assert str(URL("")) == ""
+    assert str(URL("abc")) == "abc"
+    assert str(URL(" ")) == " "
+    assert str(URL("&")) == "&"
+
+    u = URLFactory(server_name=None, script_name=None)
+    assert str(u.absolute("http://www.enyo.de/")) == "http://www.enyo.de/"
+    assert str(u.absolute("http://www.enyo.de/", t='123')) \
+           == "http://www.enyo.de/?t=123"
+    assert str(u.scriptRelative("/a/b", t='123')) == "/a/b?t=123"
+    assert str(u.scriptRelativeFull("/a/b", t='123')) \
+           == "http://localhost/a/b?t=123"
+
+    u = URLFactory(server_name='localhost.localdomain',
+                   script_name='/cgi-bin/test.cgi')
+    assert str(u.scriptRelative("a/b", t='123')) \
+           == "/cgi-bin/test.cgi/a/b?t=123"
+    assert str(u.scriptRelativeFull("a/b", t='123=')) \
+           == "http://localhost.localdomain/cgi-bin/test.cgi/a/b?t=123%3D"
+
+    assert P("").toString() == '<p></p>'
+    assert P(" ").toString() == '<p> </p>'
+    assert P("&").toString() == '<p>&amp;</p>'
+    assert P("\"").toString() == '<p>&quot;</p>'
+    assert P("<").toString() == '<p>&lt;</p>'
+    assert P(">").toString() == '<p>&gt;</p>'
+    assert P(">").toHTML().toString() == '<p>&gt;</p>'
+    assert FORM(method='get').toString() == '<form method="get"/>'
+    assert SPAN("green", _class="red").toString() \
+           == '<span class="red">green</span>'
+    assert TD(A("http://www.example.net/", "example")).toString() \
+           == '<td><a href="http://www.example.net/">example</a></td>'
+
+    s = cStringIO.StringIO()
+    RedirectResult(u.scriptRelativeFull("123")).flatten(s.write)
+    assert s.getvalue() == '''Status: 301
+Location: http://localhost.localdomain/cgi-bin/test.cgi/123
+
+'''
+
+    assert make_menu(u.scriptRelative,
+                     ("123", "A"),
+                     ("456", "B")).toString() == \
+                     '<ul><li><a href="/cgi-bin/test.cgi/123">A</a></li><li><a href="/cgi-bin/test.cgi/456">B</a></li></ul>'
+
+    pr = PathRouter()
+    pr.register('', "root")
+    pr.register('/*', "default")
+    pr.register('/abc', "/abc")
+    pr.register('/a/bc', "/a/bc")
+    pr.register('ab/c', "/ab/c")
+    pr.register('/a', "/a")
+    pr.register('/a/**', "/a/**")
+    pr.register('/a/*', "/a/*")
+
+    assert pr.get("") == ("root", ())
+    assert pr.get("/") == ("root", ())
+    assert pr.get("//") == ("root", ())
+    assert pr.get("/xyz") == ("default", ("xyz",))
+    assert pr.get("/a//xyz/") == ("/a/*", ("xyz",))
+    assert pr.get("/a//xyz/123") == ("/a/**", ("xyz", "123"))
+    assert pr.get("/abc") == ("/abc", ())
+
+if __name__ == "__main__":
+    __test()




More information about the Secure-testing-commits mailing list