[Secure-testing-commits] r2482 - bin lib/python
Florian Weimer
fw at costa.debian.org
Thu Oct 20 09:02:13 UTC 2005
Author: fw
Date: 2005-10-20 09:02:12 +0000 (Thu, 20 Oct 2005)
New Revision: 2482
Added:
bin/tracker_service.py
lib/python/web_support.py
Log:
r614 at deneb: fw | 2005-10-13 22:12:28 +0200
Add new web front end.
bin/tracker_service.py, lib/python/web_support.py:
New files.
Added: bin/tracker_service.py
===================================================================
--- bin/tracker_service.py 2005-10-20 09:01:57 UTC (rev 2481)
+++ bin/tracker_service.py 2005-10-20 09:02:12 UTC (rev 2482)
@@ -0,0 +1,816 @@
+#!/usr/bin/python
+
+import sys
+sys.path.insert(0,'../lib/python')
+
+if len(sys.argv) <> 3:
+ print "usage: python tracker_serivce.py SOCKET-PATH DATABASE-PATH"
+ sys.exit(1)
+socket_name = sys.argv[1]
+db_name = sys.argv[2]
+
+import bugs
+import re
+import security_db
+from web_support import *
+
+class TrackerService(WebService):
+ head_contents = compose(STYLE(
+ """h1 { font-size : 144%; }
+h2 { font-size : 120%; }
+h3 { font-size : 100%; }
+
+table { padding-left : 1.5em }
+td, th { text-align : left;
+ padding-left : 0.25em;
+ padding-right : 0.25em; }
+td { vertical-align: baseline }
+span.red { color: red; }
+span.dangerous { color: rgb(191,127,0); }
+"""), SCRIPT('''var old_query_value = "";
+
+function selectSearch() {
+ document.searchForm.query.focus();
+}
+
+function onSearch(query) {
+ if (old_query_value == "") {
+ if (query.length > 5) {
+ old_query_value = query;
+ document.searchForm.submit();
+ } else {
+ old_query_value = query;
+ }
+ }
+}
+''')).toHTML()
+
+ def __init__(self, socket_name, db_name):
+ WebService.__init__(self, socket_name)
+ self.db = security_db.DB(db_name)
+ self.register('', self.page_home)
+ self.register('*', self.page_object)
+ self.register('source-package/*', self.page_source_package)
+ self.register('binary-package/*', self.page_binary_package)
+ self.register('status/release/stable', self.page_status_release_stable)
+ self.register('status/release/testing',
+ self.page_status_release_testing)
+ self.register('status/release/unstable',
+ self.page_status_release_unstable)
+ self.register('status/dtsa-candidates',
+ self.page_status_dtsa_candidates)
+ self.register('status/todo', self.page_status_todo)
+ self.register('status/itp', self.page_status_itp)
+ self.register('data/unknown-packages', self.page_data_unknown_packages)
+ self.register('data/missing-epochs', self.page_data_missing_epochs)
+ self.register('data/releases', self.page_data_releases)
+ self.register('data/funny-versions', self.page_data_funny_versions)
+
+ def page_home(self, path, params, url):
+ query = params.get('query', ('',))[0]
+ if query:
+ if '/' in query:
+ return self.page_not_found(url, query)
+ else:
+ return RedirectResult(url.scriptRelativeFull(query))
+
+ return self.create_page(
+ url, 'Security Bug Tracker',
+ [P(
+ """This is the experimental issue tracker for Debian's testing
+security team. Keep in mind that this is merely a prototype.
+Please report any problems to """,
+ A("mailto:fw at deneb.enyo.de", "Florian Weimer"),
+ """.Note that some of the data presented here is known
+to be wrong (see below), but the data for the testing suite
+should be fine."""),
+ make_menu(
+ url.scriptRelative,
+ ('status/release/stable',
+ 'Vulnerable packages in the stable suite'),
+ ('status/release/testing',
+ 'Vulnerable packages in the testing suite'),
+ ('status/release/unstable',
+ 'Vulnerable packages in the unstable suite'),
+ ('status/dtsa-candidates', "Candidates for DTSAs"),
+ ('status/todo', 'TODO items'),
+ ('status/itp', 'ITPs with potential security issues'),
+ ('data/unknown-packages',
+ 'Packages names not found in the archive'),
+ ('data/missing-epochs',
+ 'Package versions which might lack an epoch'),
+ ('data/funny-versions',
+ 'Packages with strange version numbers'),
+ ('data/releases',
+ 'Covered Debian releases and architectures (slow)'),
+ self.make_search_button(url)),
+
+ H2("A few notes on data sources"),
+ P("""Data in this tracker comes solely from the bug database
+which is maintained by Debian's testing security team in their
+Subversion repository. All external data (this includes
+Debian bug reports and official Debian security advisories)
+must be added to this database before it appears here, and there
+can be some delay before this happens."""),
+ P("""At the moment, the database only contains information which is
+relevant for tracking the security status of the stable, testing and
+unstable suites. This means that data for oldstable is likely wrong.""")],
+ search_in_page=True)
+
+ def page_object(self, path, params, url):
+ obj = path[0]
+
+ if not obj:
+ # Redirect to start page.
+ return RedirectResult(url.scriptRelativeFull(""))
+
+ if 'A' <= obj[0] <= 'Z':
+ # Bug names start with a capital letter.
+ return self.page_bug(url, obj)
+
+ bugnumber = 0
+ try:
+ bugnumber = int(obj)
+ except ValueError:
+ pass
+ if bugnumber:
+ return self.page_debian_bug(url, bugnumber)
+
+ c = self.db.cursor()
+ if self.db.isSourcePackage(c, obj):
+ return RedirectResult(self.url_source_package(url, obj, full=True))
+ if self.db.isBinaryPackage(c, obj):
+ return RedirectResult(self.url_binary_package(url ,obj, full=True))
+
+ return self.page_not_found(url, obj)
+
+ def page_bug(self, url, name):
+ cursor = self.db.cursor()
+ try:
+ bug = bugs.BugFromDB(cursor, name)
+ except ValueError:
+ return self.page_not_found(url, name)
+ if bug.name <> name:
+ # Show the normalized bug name in the browser address bar.
+ return RedirectResult(url.scriptRelativeFull(bug.name))
+
+ page = []
+
+ def gen_header():
+ yield B("Name"), bug.name
+
+ source = bug.name.split('-')[0]
+ if source in ('CAN', 'CVE'):
+ source_xref = self.make_cve_ref(url, bug.name, 'CVE')
+ elif source == 'DSA':
+ source_xref = self.make_dsa_ref(url, bug.name, 'Debian')
+ elif source == 'DTSA':
+ source_xref = 'Debian Testing Security Team'
+ elif source == 'FAKE':
+ source_xref = (
+ 'Automatically generated temporary name. Not for external reference.')
+ else:
+ source_xref = None
+
+ if source_xref:
+ yield B("Source"), source_xref
+
+ if bug.description:
+ yield B("Description"), bug.description
+
+ xref = list(self.db.getBugXrefs(cursor, bug.name))
+ if xref:
+ yield B("References"), self.make_xref_list(url, xref)
+
+ debian_bugs = bug.getDebianBugs(cursor)
+ if debian_bugs:
+ yield (B("Debian Bugs"),
+ self.make_debian_bug_list(url, debian_bugs))
+
+ if not bug.not_for_us:
+ for (release, status, reason) in bug.getStatus(cursor):
+ if status <> 'fixed':
+ reason = self.make_red(reason)
+ yield B('Status of %s' % release), reason
+
+ page.append(make_table(gen_header()))
+
+ if bug.notes:
+ page.append(H2("Vulnerable and fixed packages"))
+
+ def gen_source():
+ old_pkg = ''
+ for (package, release, version, vulnerable) \
+ in self.db.getSourcePackages(cursor, bug.name):
+ if package == old_pkg:
+ package = ''
+ else:
+ old_pkg = package
+ package = compose(
+ self.make_source_package_ref(url, package),
+ " (", self.make_pts_ref(url, package, 'PTS'), ")")
+ if vulnerable:
+ vuln = self.make_red('vulnerable')
+ version = self.make_red(version)
+ else:
+ vuln = 'fixed'
+
+ yield package, ', '.join(release), version, vuln
+
+ page.append(make_table(gen_source(),
+ caption=("Source Package", "Release", "Version", "Status"),
+ introduction=P('The table below lists information on source packages.')))
+
+ def gen_binary():
+ old_pkg = ''
+ for (packages, releases, version, archs, vulnerable) \
+ in self.db.getBinaryPackages(cursor, bug.name):
+ pkg = ', '.join(packages)
+ if pkg == old_pkg:
+ packages = ''
+ else:
+ old_pkg = pkg
+ packages = self.make_binary_packages_ref(url, packages)
+
+ if vulnerable:
+ vuln = self.make_red('vulnerable')
+ version = self.make_red(version)
+ else:
+ vuln = 'fixed'
+ yield (packages,
+ ', '.join(releases),
+ version, vuln,
+ ', '.join(archs))
+
+ page.append(make_table(gen_binary(),
+ caption=("Binary Package", "Release", "Version", "Status",
+ "Architecures"),
+ introduction=P("The next table lists affected binary packages.")))
+
+ def gen_data():
+ notes_sorted = bug.notes[:]
+ notes_sorted.sort(lambda a, b: cmp(a.package, b.package))
+ for n in notes_sorted:
+ if n.release:
+ rel = str(n.release)
+ else:
+ rel = '(unstable)'
+ urgency = str(n.urgency)
+ if n.fixed_version:
+ ver = str(n.fixed_version)
+ if ver == '0':
+ ver = '(not affected)'
+ urgency = ''
+ else:
+ ver = self.make_red('(unfixed)')
+
+ pkg = n.package
+ pkg_kind = n.package_kind
+ if pkg_kind == 'source':
+ pkg = self.make_source_package_ref(url, pkg)
+ elif pkg_kind == 'binary':
+ pkg = self.make_binary_package_ref(url, pkg)
+ elif pkg_kind == 'itp':
+ pkg_kind = 'ITP'
+ rel = ''
+ ver = ''
+ urgency = ''
+
+ bugs = n.bugs
+ bugs.sort()
+ bugs = make_list(
+ map(lambda x: self.make_debian_bug(url, x), bugs))
+ if n.bug_origin:
+ origin = self.make_xref(url, n.bug_origin)
+ else:
+ origin = ''
+ yield (pkg, pkg_kind, rel, ver, urgency, origin, bugs)
+
+ page.append(
+ make_table(gen_data(),
+ caption=("Package", "Type", "Release", "Fixed Version",
+ "Urgency", "Origin", "Debian Bugs"),
+ introduction=P("The information above is based on the following data on fixed versions.")))
+
+ if bug.comments:
+ page.append(H2("Notes"))
+ def gen_comments():
+ for (t, c) in bug.comments:
+ yield c
+ page.append(make_pre(gen_comments()))
+
+ return self.create_page(url, bug.name, page)
+
+ def page_debian_bug(self, url, bugnumber):
+ buglist = list(self.db.getBugsFromDebianBug(self.db.cursor(),
+ bugnumber))
+ if buglist:
+ if len(buglist) == 1:
+ # Single issue, redirect.
+ return RedirectResult(url.scriptRelativeFull(buglist[0][0]))
+
+ def gen():
+ for (name, urgency, description) in buglist:
+ if urgency == "unknown":
+ urgency = ""
+ yield self.make_xref(url, name), urgency, description
+
+ return self.create_page(
+ url, "Information related to Debian bug #%d" % bugnumber,
+ [P("The following issues reference to Debian bug ",
+ self.make_debian_bug(url, bugnumber), ":"),
+ make_table(gen(),
+ caption=("Name", "Urgency", "Description"))])
+
+ else:
+ return self.page_not_found(url, str(bugnumber))
+
+ def page_not_found(self, url, query):
+ return self.create_page(url, 'Not found',
+ [P('Your query ',
+ CODE(query),
+ ' matched no results.')],
+ status=404)
+
+ def page_source_package(self, path, params, url):
+ pkg = path[0]
+
+ def gen_versions():
+ for (releases, version) in self.db.getSourcePackageVersions(
+ self.db.cursor(), pkg):
+ yield ', '.join(releases), version
+ def gen_binary():
+ for (packages, releases, archs, version) \
+ in self.db.getBinaryPackagesForSource(
+ self.db.cursor(), pkg):
+ yield (self.make_binary_packages_ref(url, packages),
+ ', '.join(releases), version, ', '.join(archs))
+ def gen_bug_list(lst):
+ for (bug, description) in lst:
+ yield self.make_xref(url, bug), description
+
+ return self.create_page(
+ url, "Information on source package " + pkg,
+ [make_menu(lambda x: x,
+ (self.url_pts(url, pkg),
+ pkg + ' in the Package Tracking System'),
+ (self.url_debian_bug_pkg(url, pkg),
+ pkg + ' in the Bug Tracking System'),
+ (self.url_testing_status(url, pkg),
+ pkg + ' in the testing migration checker')),
+ H2("Available versions"),
+ make_table(gen_versions(), caption=("Release", "Version")),
+
+ H2("Available binary packages"),
+ make_table(gen_binary(),
+ caption=('Package', 'Release', 'Version', 'Architectures'),
+ replacement="""No binary packages are recorded in this database.
+This probably means that the package is architecture-specific, and the
+architecture is currently not tracked."""),
+
+ H2("Open issues"),
+ make_table(gen_bug_list(self.db.getBugsForSourcePackage
+ (self.db.cursor(), pkg, True)),
+ caption=('Bug', 'Description'),
+ replacement='No known open issues.'),
+
+ H2("Resolved issues"),
+ make_table(gen_bug_list(self.db.getBugsForSourcePackage
+ (self.db.cursor(), pkg, False)),
+ caption=('Bug', 'Description'),
+ replacement='No known resolved issues.')])
+
+ def page_binary_package(self, path, params, url):
+ pkg = path[0]
+
+ def gen_versions():
+ for (releases, source, version, archs) \
+ in self.db.getBinaryPackageVersions(self.db.cursor(), pkg):
+ yield (', '.join(releases),
+ self.make_source_package_ref(url, source),
+ version, ', '.join(archs))
+ def gen_bug_list(lst):
+ for (bug, description) in lst:
+ yield self.make_xref(url, bug), description
+
+ return self.create_page(
+ url, "Information on binary package " + pkg,
+ [make_menu(lambda x: x,
+ (self.url_debian_bug_pkg(url, pkg),
+ pkg + ' in the Bug Tracking System')),
+ H2("Available versions"),
+ make_table(gen_versions(),
+ caption=("Release", "Source", "Version", "Architectures")),
+
+ H2("Open issues"),
+ make_table(gen_bug_list(self.db.getBugsForBinaryPackage
+ (self.db.cursor(), pkg, True)),
+ caption=('Bug', 'Description'),
+ replacement='No known open issues.'),
+
+ H2("Resolved issues"),
+ make_table(gen_bug_list(self.db.getBugsForBinaryPackage
+ (self.db.cursor(), pkg, False)),
+ caption=('Bug', 'Description'),
+ replacement='No known resolved issues.'),
+
+ H2("Non-issues"),
+ make_table(gen_bug_list(self.db.getNonBugsForBinaryPackage
+ (self.db.cursor(), pkg)),
+ caption=('Bug', 'Description'),
+ replacement="""No known issues which do not affect
+this package, but still reference it.""")])
+
+ def page_status_release_stable(self, path, params, url):
+ def gen():
+ old_pkg_name = ''
+ for (pkg_name, bug_name, archive, urgency) in \
+ self.db.cursor().execute(
+ """SELECT package, bug, section, urgency FROM stable_status"""):
+ if pkg_name == old_pkg_name:
+ pkg_name = ''
+ else:
+ old_pkg_name = pkg_name
+ if archive <> 'main':
+ pkg_name = "%s (%s)" % (pkg_name, archive)
+
+ if urgency == 'unknown':
+ urgency = ''
+ elif urgency == 'high':
+ urgency = self.make_red(urgency)
+
+ yield pkg_name, self.make_xref(url, bug_name), urgency
+
+ return self.create_page(
+ url, 'Vulnerable source packages in the stable suite',
+ [make_table(gen(), caption=("Package", "Bug", "Urgency"))])
+
+ def page_status_release_testing(self, path, params, url):
+ def gen():
+ old_pkg_name = ''
+ for (pkg_name, bug_name, archive, urgency,
+ sid_vulnerable, ts_fixed) in self.db.cursor().execute(
+ """SELECT package, bug, section, urgency, unstable_vulnerable,
+ testing_security_fixed
+ FROM testing_status"""):
+ if pkg_name == old_pkg_name:
+ pkg_name = ''
+ else:
+ old_pkg_name = pkg_name
+ if archive <> 'main':
+ pkg_name = "%s (%s)" % (pkg_name, archive)
+
+ if ts_fixed:
+ status = 'fixed in testing-security'
+ else:
+ if sid_vulnerable:
+ status = self.make_red('unstable is vulnerable')
+ else:
+ status = self.make_dangerous('fixed in unstable')
+
+ if urgency == 'unknown':
+ urgency = ''
+
+ yield (pkg_name, self.make_xref(url, bug_name),
+ urgency, status)
+
+ return self.create_page(
+ url, 'Vulnerable source packages in the testing suite',
+ [make_menu(url.scriptRelative,
+ ("status/dtsa-candidates", "Candidates for DTSAs")),
+ make_table(gen(), caption=("Package", "Bug"))])
+
+ def page_status_release_unstable(self, path, params, url):
+ def gen():
+ old_pkg_name = ''
+ for (pkg_name, bug_name, section, urgency) \
+ in self.db.cursor().execute(
+ """SELECT DISTINCT sp.name, st.bug_name,
+ sp.archive, st.urgency
+ FROM source_package_status AS st, source_packages AS sp
+ WHERE st.vulnerable AND st.urgency <> 'unimportant'
+ AND sp.rowid = st.package AND sp.release = 'sid'
+ AND sp.subrelease = ''
+ ORDER BY sp.name, st.bug_name"""):
+ if pkg_name == old_pkg_name:
+ pkg_name = ''
+ else:
+ old_pkg_name = pkg_name
+ if section <> 'main':
+ pkg_name = "%s (%s)" % (pkg_name, section)
+ else:
+ pkg_name = self.make_xref(url, pkg_name)
+
+ if urgency == 'unknown':
+ urgency = ''
+ elif urgency == 'high':
+ urgency = self.make_red(urgency)
+
+ yield pkg_name, self.make_xref(url, bug_name), urgency
+
+
+ return self.create_page(
+ url, 'Vulnerable source packages in the testing suite',
+ [P("""Note that the list below is based on source packages.
+ This means that packages are not listed here once a new,
+ fixed source version has been uploaded to the archive, even
+ if there are still some vulnerably binary packages present
+ in the archive."""),
+ make_table(gen(), caption=('Package', 'Bug', 'Urgency'))])
+
+ def page_status_dtsa_candidates(self, path, params, url):
+ def gen():
+ old_pkg_name = ''
+ for (pkg_name, bug_name, archive, urgency, stable_later) \
+ in self.db.cursor().execute(
+ """SELECT package, bug, section, urgency,
+ (SELECT testing.version_id < stable.version_id
+ FROM source_packages AS testing, source_packages AS stable
+ WHERE testing.name = testing_status.package
+ AND testing.release = 'etch'
+ AND testing.subrelease = ''
+ AND testing.archive = testing_status.section
+ AND stable.name = testing_status.package
+ AND stable.release = 'sarge'
+ AND stable.subrelease = 'security'
+ AND stable.archive = testing_status.section)
+ FROM testing_status
+ WHERE (NOT unstable_vulnerable)
+ AND (NOT testing_security_fixed)"""):
+ if pkg_name == old_pkg_name:
+ pkg_name = ''
+ migration = ''
+ else:
+ old_pkg_name = pkg_name
+ migration = A(self.url_testing_status(url, pkg_name),
+ "check")
+ if archive <> 'main':
+ pkg_name = "%s (%s)" % (pkg_name, archive)
+ else:
+ pkg_name = self.make_source_package_ref(url, pkg_name)
+
+ if urgency == 'unknown':
+ urgency = ''
+ elif urgency == 'high':
+ urgency = self.make_red(urgency)
+
+ if stable_later:
+ notes = "(fixed in stable?)"
+ else:
+ notes = ''
+
+ yield (pkg_name, migration, self.make_xref(url, bug_name),
+ urgency, notes)
+
+ return self.create_page(
+ url, "Candidates for DTSAs",
+ [P("""The table below lists packages which are fixed
+in unstable, but unfixed in testing. Use the testing migration
+return web_supporttracker to find out why they have not entered
+return web_supporttesting yet."""),
+ make_menu(url.scriptRelative,
+ ("status/release/testing",
+ "List of vulnerable packages in testing")),
+ make_table(gen(),
+ caption=("Package", "Migration", "Bug", "Urgency"))])
+
+ def page_status_todo(self, path, params, url):
+ def gen():
+ for (bug, description) in self.db.getTODOs():
+ yield self.make_xref(url, bug), description
+ return self.create_page(
+ url, "Bugs with TODO items",
+ [make_table(gen(),
+ caption=("Bug", "Description"))])
+
+ def page_status_itp(self, path, params, url):
+ def gen():
+ old_pkg = ''
+ for pkg, bugs, debian_bugs in self.db.getITPs(self.db.cursor()):
+ if pkg == old_pkg:
+ pkg = ''
+ else:
+ old_pkg = pkg
+ yield (pkg, self.make_xref_list(url, bugs),
+ self.make_debian_bug_list(url, debian_bugs))
+ return self.create_page(
+ url, "ITPs with potential security issues",
+ [make_table(gen(), caption=("Package", "Issue", "Debian Bugs"),
+ replacement="No ITP bugs are currently known.")])
+
+ def page_data_unknown_packages(self, path, params, url):
+ def gen():
+ for name, bugs in self.db.getUnknownPackages(self.db.cursor()):
+ yield name, self.make_xref_list(url, bugs)
+ return self.create_page(
+ url, "Unknown packages",
+ [P("""Sometimes, a package referenced in a bug report
+cannot be found in the database. This can be the result of a spelling
+return web_supporterror, or a historic entry refers to a
+return web_supportpackage which is no longer in the archive."""),
+ make_table(gen(), caption=("Package", "Bugs"),
+ replacement="No unknown packages are referenced in the database.")])
+
+ def page_data_missing_epochs(self, path, params, url):
+ def gen():
+ old_bug = ''
+ old_pkg = ''
+ for bug, pkg, ver1, ver2 in self.db.cursor().execute(
+ """SELECT DISTINCT bug_name, n.package,
+ n.fixed_version, sp.version
+ FROM package_notes AS n, source_packages AS sp
+ WHERE n.package_kind = 'source'
+ AND n.fixed_version NOT LIKE '%:%'
+ AND n.fixed_version <> '0'
+ AND n.bug_origin = ''
+ AND sp.name = n.package
+ AND sp.version LIKE '%:%'
+ ORDER BY bug_name, package"""):
+ if bug == old_bug:
+ bug = ''
+ else:
+ old_bug = bug
+ old_pkg = ''
+ bug = self.make_xref(url, bug)
+ if pkg == old_pkg:
+ pkg = ''
+ else:
+ old_pkg = pkg
+ pkg = self.make_source_package_ref(url, pkg)
+ yield bug, pkg, ver1, ver2
+
+ return self.create_page(
+ url, "Missing epochs in package versions",
+ [make_table(gen(),
+ caption=("Bug", "Package", "Version 1", "Version 2"),
+ replacement="No source package version with missing epochs.")])
+
+ def page_data_releases(self, path, params, url):
+ def gen():
+ for (rel, subrel, archive, sources, archs) \
+ in self.db.availableReleases():
+ if sources:
+ sources = 'yes'
+ else:
+ sources = 'no'
+ yield rel, subrel, archive, sources, make_list(archs)
+ return self.create_page(
+ url, "Available releases",
+ [P("""The security issue database is checked against
+the Debian releases listed in the table below."""),
+ make_table(gen(),
+ caption=("Release", "Subrelease", "Archive",
+ "Sources", "Architectures"))])
+
+ def page_data_funny_versions(self, path, params, url):
+ def gen():
+ for name, release, archive, version, source_version \
+ in self.db.getFunnyPackageVersions():
+ yield name, release, archive, source_version, version
+
+ return self.create_page(
+ url, "Version conflicts between source/binary packages",
+ [P("""The table below lists source packages
+ which have a binary package of the same name, but with a different
+ version. This means that extra care is necessary to determine
+ the version of a package which has been fixed. (Note that
+ the bug tracker prefers source versions to binary versions
+ in this case.)"""),
+ make_table(gen(),
+ caption=("Package",
+ "Release",
+ "Archive",
+ "Source Version",
+ "Binary Version")),
+ P("""Technically speaking, these version numbering is fine,
+but it makes version-based bug tracking quite difficult for these packages."""),
+ P("""There are many binary packages which are built from source
+ packages with different version numbering schemes. However, as
+ long as none of the binary packages carries the same name as the
+ source package, most confusion is avoided or can be easily
+ explained.""")])
+
+
+ def create_page(self, url, title, body, search_in_page=False, status=200):
+ append = body.append
+ append(HR())
+ if not search_in_page:
+ append(self.make_search_button(url))
+ append(P(A(url.scriptRelative(""), "Home"),
+ " - ", A(url.absolute("http://secure-testing.debian.net/"),
+ "Testing Security Team"),
+ " - ", A(url.absolute("http://www.debian.org/security/"),
+ "Debian Security"),
+ " - ", A(url.absolute
+ ("http://www.enyo.de/fw/impressum.html"),
+ "Imprint")))
+ if search_in_page:
+ on_load = "selectSearch()"
+ else:
+ on_load = None
+ return HTMLResult(self.add_title(title, body,
+ head_contents=self.head_contents,
+ body_attribs={'onload': on_load}),
+ doctype=self.html_dtd(),
+ status=status)
+
+ def make_search_button(self, url):
+ return FORM("Search for package or bug name: ",
+ INPUT(type='text', name='query',
+ onkeyup="onSearch(this.value)",
+ onmousemove="onSearch(this.value)"),
+ INPUT(type='submit', value='Go'),
+ method='get',
+ action=url.scriptRelative(''))
+
+ def url_cve(self, url, name):
+ return url.absolute("http://cve.mitre.org/cgi-bin/cvename.cgi",
+ name=name)
+ def url_dsa(self, url, dsa, re_dsa=re.compile(r'^DSA-(\d+)(?:-\d+)?$')):
+ match = re_dsa.match(dsa)
+ if match:
+ # We must determine the year because there is no generic URL.
+ (number,) = match.groups()
+ for (date,) in self.db.cursor().execute(
+ "SELECT release_date FROM bugs WHERE name = ?", (dsa,)):
+ (y, m, d) = date.split('-')
+ return url.absolute("http://www.debian.org/security/%d/dsa-%d"
+ % (int(y), int(number)))
+ return None
+
+ def url_debian_bug(self, url, debian):
+ return url.absolute("http://bugs.debian.org/cgi-bin/bugreport.cgi",
+ bug=str(debian))
+ def url_debian_bug_pkg(self, url, debian):
+ return url.absolute("http://bugs.debian.org/cgi-bin/pkgreport.cgi",
+ pkg=debian)
+ def url_pts(self, url, package):
+ return url.absolute("http://packages.qa.debian.org/common/index.html",
+ src=package)
+ def url_testing_status(self, url, package):
+ return url.absolute("http://bjorn.haxx.se/debian/testing.pl",
+ package=package)
+ def url_source_package(self, url, package, full=False):
+ if full:
+ return url.scriptRelativeFull("source-package/" + package)
+ else:
+ return url.scriptRelative("source-package/" + package)
+ def url_binary_package(self, url, package, full=False):
+ if full:
+ return url.scriptRelativeFull("binary-package/" + package)
+ else:
+ return url.scriptRelative("binary-package/" + package)
+
+ def make_xref(self, url, name):
+ return A(url.scriptRelative(name), name)
+
+ def make_xref_list(self, url, lst, separator=', '):
+ return make_list(map(lambda x: self.make_xref(url, x), lst), separator)
+
+ def make_debian_bug(self, url, debian):
+ return A(self.url_debian_bug(url, debian), str(debian))
+ def make_debian_bug_list(self, url, lst):
+ return make_list(map(lambda x: self.make_debian_bug(url, x), lst))
+
+ def make_cve_ref(self, url, cve, name=None):
+ if name is None:
+ name = cve
+ return A(self.url_cve(url, cve), name)
+
+ def make_dsa_ref(self, url, dsa, name=None):
+ if name is None:
+ name = dsa
+ u = self.url_dsa(url, dsa)
+ if u:
+ return A(u, name)
+ else:
+ return name
+
+ def make_pts_ref(self, url, pkg, name=None):
+ if name is None:
+ name = pkg
+ return A(self.url_pts(url, pkg), name)
+
+ def make_source_package_ref(self, url, pkg, title=None):
+ if title is None:
+ title = pkg
+ return A(self.url_source_package(url, pkg), title)
+ def make_binary_package_ref(self, url, pkg, title=None):
+ if title is None:
+ title = pkg
+ return A(self.url_binary_package(url, pkg), title)
+ def make_binary_packages_ref(self, url, lst):
+ assert type(lst) <> types.StringType
+ return make_list(map(lambda x: self.make_binary_package_ref(url, x),
+ lst))
+
+ def make_red(self, contents):
+ return SPAN(contents, _class="red")
+
+ def make_dangerous(self, contents):
+ return SPAN(contents, _class="dangerous")
+
+ def pre_dispatch(self):
+ self.db.refresh()
+
+TrackerService(socket_name, db_name).run()
Property changes on: bin/tracker_service.py
___________________________________________________________________
Name: svn:mime-type
+ text/script
Added: lib/python/web_support.py
===================================================================
--- lib/python/web_support.py 2005-10-20 09:01:57 UTC (rev 2481)
+++ lib/python/web_support.py 2005-10-20 09:02:12 UTC (rev 2482)
@@ -0,0 +1,692 @@
+# web_support.py -- simple HTTP generation framework
+# Copyright (C) 2005 Florian Weimer <fw at deneb.enyo.de>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import cgi
+import cStringIO
+import os
+import re
+import socket
+import struct
+import sys
+import grp
+import traceback
+import types
+import urllib
+
+class ServinvokeError(Exception):
+ pass
+
+class Service:
+ """A class for service objects.
+
+ Service objects are contacted by the program servinvoke and
+ process HTTP requests in a serialized fashion. (Only the data
+ transfer from and to the client happens in parallel, and this is
+ handled by the servinvoke program.)
+
+ If the newly created socket is owned by the www-data group, it is
+ automatically made readable by that group.
+ """
+
+ def __init__(self, socket_name):
+ self.socket_name = socket_name
+ self._unlinkSocket()
+ self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
+ self.socket.bind(self.socket_name)
+ self.socket.listen(5)
+ self._chmod()
+
+ def __del__(self):
+ self._unlinkSocket()
+
+ def _unlinkSocket(self):
+ try:
+ os.unlink(self.socket_name)
+ except OSError:
+ pass
+
+ def _chmod(self):
+ gid = os.stat(self.socket_name).st_gid
+ grpent = grp.getgrgid(gid)
+ if grpent[0] == 'www-data':
+ os.chmod(self.socket_name, 0660)
+
+ def log(self, msg, *args):
+ sys.stderr.write((msg % args) + "\n")
+
+ def run(self):
+ while 1:
+ (client, addr) = self.socket.accept()
+
+ def read(count):
+ data = ''
+ cnt = 0
+ while cnt <> count:
+ d = client.recv(count - cnt)
+ if d:
+ data += d
+ cnt = len(data)
+ else:
+ self.log("unexpected end of data from servinvoke")
+ raise ServinvokeError()
+
+ return data
+
+ try:
+ header = read(24)
+ (magic, version, cli_size, cli_count, env_size, env_count) = \
+ struct.unpack("!6I", header)
+ if magic <> 0x15fd34df:
+ sys.log("unknown magic number %08X", magic)
+ if version <> 1:
+ sys.log("unknown version %08X", magic)
+ cli = read(cli_size).split('\0')[:-1]
+ env = {}
+ for x in read(env_size).split('\0')[:-1]:
+ (key, value) = x.split('=', 1)
+ env[key] = value
+ data = []
+ while 1:
+ d = client.recv(4096)
+ if d:
+ data.append(d)
+ else:
+ break
+ data = ''.join(data)
+ result = cStringIO.StringIO()
+ self.handle(cli, env, data, result)
+ client.sendall(result.getvalue())
+ client.close()
+
+ except ServinvokeError:
+ client.close()
+ pass
+ except KeyboardInterrupt:
+ client.close()
+ raise
+ except:
+ client.close()
+ target = cStringIO.StringIO()
+ traceback.print_exc(None, target)
+ self.log("%s", target.getvalue())
+
+ def handle(args, environ, data):
+ """Invoke by run to handle a single request. Should
+ return the data to be sent back to the client."""
+ return ""
+
+class URL:
+ """A simple wrapper class for strings which are interpreted as URLs."""
+ def __init__(self, url):
+ self.__url = url
+ def __str__(self):
+ return self.__url
+ def __repr__(self):
+ return "URL(%s)" % `self.__url`
+
+class URLFactory:
+ """Creates URL objects.
+
+ This factory class handles the case where a script wants to
+ generate URLs which reference to itself (see scriptRelative)."""
+
+ def __init__(self, server_name, script_name):
+ self.server_name = server_name or 'localhost'
+ script_name = self._stripSlashes(script_name or '')
+ if script_name[-1:] == '/' or script_name == '':
+ self.script_name = script_name
+ else:
+ self.script_name = script_name + '/'
+
+ def _convertArgs(self, args):
+ arglist = []
+ for (key, value) in args.items():
+ if value is None:
+ continue
+ arglist.append("%s=%s" % (urllib.quote(key),
+ urllib.quote(value)))
+ if arglist:
+ return "?" + '&'.join(arglist)
+ else:
+ return ""
+ def _stripSlashes(self, arg):
+ while arg[:1] == '/':
+ arg = arg[1:]
+ return arg
+
+ def absolute(self, url, **args):
+ """Creates an absolute URL, with optional arguments to pass."""
+
+ return URL(url + self._convertArgs(args))
+
+ def scriptRelative(self, path, **args):
+ """Returns a URL which references to the path relative to the
+ current script. Optionally, arguments to pass can be included."""
+
+ return URL("/%s%s%s" % (self.script_name,
+ self._stripSlashes(path),
+ self._convertArgs(args)))
+
+ def scriptRelativeFull(self, path, **args):
+ """Like scriptRelative, but returns an absolute URL, including
+ the http:// prefix."""
+
+ return URL("http://%s/%s%s%s" % (self.server_name, self.script_name,
+ self._stripSlashes(path),
+ self._convertArgs(args)))
+
+
+stringToHTML = map(chr, range(256))
+def _initStringToHTML(s):
+ for (ch, repl) in (('<', '<'),
+ ('>', '>'),
+ ('&', '&'),
+ ('"', '"')):
+ s[ord(ch)] = repl
+_initStringToHTML(stringToHTML)
+del _initStringToHTML
+
+def escapeHTML(str):
+ '''Replaces the characters <>&" in the passed strings with their
+ HTML entities.'''
+
+ result = []
+ append = result.append
+ for ch in str:
+ append(stringToHTML[ord(ch)])
+ return ''.join(result)
+
+class HTMLBase:
+ def flatten(self, write):
+ """Invokes write repeatedly, for the tag and its contents.
+
+ Note that typically, a lot of very short strings a written, so
+ it's better to add some buffering before sending the strings
+ elsewhere."""
+ pass
+
+ def toString(self):
+ """Invokes flatten to create a new string object."""
+ r = cStringIO.StringIO()
+ self.flatten(r.write)
+ return r.getvalue()
+
+ def toHTML(self):
+ return VerbatimHTML(self.toString())
+
+class VerbatimHTML(HTMLBase):
+ """Creates verbatim HTML from a string object. Mainly used for
+ optimizing recurring HTML snippets."""
+
+ def __init__(self, contents):
+ self.__contents = contents
+
+ def flatten(self, write):
+ write(self.__contents)
+
+class Compose(HTMLBase):
+ """Glues a sequence of HTML snippets together, without enclosing it in
+ a tag."""
+ def __init__(self, contents):
+ self.__contents = contents
+
+ def flatten(self, write):
+ for x in self.__contents:
+ if type(x) == types.StringType:
+ write(escapeHTML(x))
+ else:
+ x.flatten(write)
+
+def compose(*contents):
+ """Concatenates several HTML objects."""
+ return Compose(contents)
+
+
+class Tag(HTMLBase):
+ """Base class for HTML tags."""
+
+ re_name = re.compile(r'\A_?[a-zA-Z][a-zA-Z0-9]*\Z')
+
+ def __init__(self, name, contents, attribs={}):
+ self._check(name)
+ self.__name = name
+ attrs = []
+ append = attrs.append
+ for (key, value) in attribs.items():
+ if value is None:
+ continue
+ self._check(key)
+ append(' ')
+ if key[0] == '_':
+ append(key[1:])
+ else:
+ append(key)
+ append('="')
+ for ch in str(value):
+ append(stringToHTML[ord(ch)])
+ append('"')
+ self.__attribs = ''.join(attrs)
+ self.contents = contents
+
+ def _check(self, name):
+ if self.re_name.match(name):
+ return
+ else:
+ raise ValueError, "invalid name: " + `name`
+
+ def flatten(self, write):
+ if self.contents:
+ write("<%s%s>" % (self.__name, self.__attribs))
+ closing = "</%s>" % self.__name
+ try:
+ for x in self.contents:
+ if type(x) == types.StringType:
+ write(escapeHTML(x))
+ else:
+ x.flatten(write)
+ except:
+ # If we encountered any exception, try to write the
+ # closing tag nevertheless. This increases our
+ # chances that we produce valid XML.
+ try:
+ write(closing)
+ except:
+ pass
+ raise
+ write(closing)
+
+ else:
+ write("<%s%s/>" % (self.__name, self.__attribs))
+
+ def __repr__(self):
+ return "<websupport.Tag instance, name=%s>" % `self.__name`
+
+def tag(__name, __contents, **__attribs):
+ """Creates a new tag object.
+
+ name - name of the tag
+ contents - a sequence objet (or iterator) for the enclosed contents
+ attribs - keyword arguments froming forming attributes
+ """
+ return Tag(__name, __contents, __attribs)
+
+def emptyTag(__name, **__attribs):
+ """A tag without contents.
+
+ name - name of the tag
+ attribs - keyword arguments froming forming attributes
+ """
+ return Tag(__name, None, __attribs)
+
+def A(url, text=None):
+ if text is None:
+ text = url
+ return tag('a', text, href=str(url))
+def STYLE(contents, type='text/css'):
+ return tag('style', contents, type=type)
+def SCRIPT(contents, type="text/javascript", language="JavaScript"):
+ return tag('script', contents, type=type, language=language)
+def TITLE(contents):
+ return tag('title', contents)
+def HTML(head, body):
+ return tag('html', (HEAD(head), BODY(body)))
+def HEAD(contents):
+ return tag('head', contents)
+def BODY(contents, onload=None):
+ return tag('body', contents, onload=onload)
+def H1(contents):
+ return tag('h1', contents)
+def H2(contents):
+ return tag('h2', contents)
+def P(*contents):
+ return Tag('p', contents)
+def SPAN(*__contents, **__attribs):
+ return Tag('span', __contents, __attribs)
+def HR():
+ return tag('hr', ())
+def BR():
+ return tag('br', ())
+def CODE(contents):
+ return tag('code', contents)
+def B(contents):
+ return tag('b', contents)
+def TABLE(contents):
+ return tag('table', contents)
+def TR(*contents):
+ return tag('tr', contents)
+def TH(*contents):
+ return tag('th', contents)
+def TD(*contents):
+ return tag('td', contents)
+def FORM(*__contents, **__attribs):
+ return Tag('form', __contents, __attribs)
+def INPUT(*__contents, **__attribs):
+ return Tag('input', __contents, __attribs)
+def LI(*__contents, **__attribs):
+ return Tag('li', __contents, __attribs)
+
+def make_table(contents, caption=None, replacement=None, introduction=None):
+ rows = []
+ for row in contents:
+ cols = []
+ if caption and not rows:
+ for col in caption:
+ cols.append(TH(col))
+ rows.append(Tag('tr', cols))
+ cols = []
+
+ for col in row:
+ cols.append(TD(col))
+ rows.append(Tag('tr', cols))
+ if rows:
+ if introduction:
+ return compose(introduction, TABLE(rows))
+ return TABLE(rows)
+ else:
+ return compose()
+
+def make_pre(lines):
+ """Creates a pre-formatted text area."""
+ r = []
+ append = r.append
+ for l in lines:
+ append(l)
+ append('\n')
+ return tag('pre', ''.join(l))
+
+def make_menu(convert, *entries):
+ """Creates an unnumbered list of hyperlinks.
+ Each entry can be:
+
+ - a pair (URL, LABEL).
+ convert(URL) is used as the link, and LABEL as the link text.
+ - some non-tuple value.
+ This is added as an individual item.
+ """
+ ul = []
+ append = ul.append
+ for e in entries:
+ if type(e) == types.TupleType:
+ (relurl, label) = e
+ append(LI(A(convert(relurl), label)))
+ else:
+ append(LI(e))
+ return tag('ul', ul)
+
+def make_list(lst, separator=", "):
+ """Creates a list of HTML elements."""
+ assert type(lst) <> types.StringType
+ c = []
+ if lst:
+ append = c.append
+ for e in lst[:-1]:
+ append(e)
+ append(separator)
+ append(lst[-1])
+ return Compose(c)
+
+class InvalidPath(Exception):
+ """An unknown path was submitted to PathRouter.get"""
+
+class PathRouter:
+ """This class maps paths to registered values."""
+
+ def __init__(self):
+ self.__map = {}
+
+ def register(self, path, value):
+ """Registers the indicated value for the path.
+
+ Path may end with '*' or '**', indicating single-level
+ wildcards or multi-level wildcards."""
+
+ m = self.__map
+ p = path.split('/')
+ if p and not p[0]:
+ del p[0]
+ for x in range(len(p)):
+ element = p[x]
+ if element:
+ if m.has_key(element):
+ m = m[element]
+ else:
+ if element == '*':
+ if x + 1 <> len(p):
+ raise ValueError, 'wildcard * in the middle of path'
+ m['*'] = value
+ return
+ if element == '**':
+ if x + 1 <> len(p):
+ raise ValueError, \
+ 'wildcard ** in the middle of path'
+ m['**'] = value
+ return
+
+ m_new = {}
+ m[element] = m_new
+ m = m_new
+ else:
+ raise ValueError, "path contains empty element"
+ m[''] = value
+
+ def get(self, path):
+ """Returns a tuple (VALUE, REMAINING-PATH), for the
+ most-specific path matching the given path."""
+
+ m = self.__map
+ p = path.split('/')
+ while p and not p[-1]:
+ del p[-1]
+ l = len(p)
+ for x in range(l):
+ element = p[x]
+
+ # Ignore empty path elements (leadings slash, duplicated
+ # slashes).
+ if element:
+ try:
+ m = m[element]
+ except KeyError:
+ if x + 1 == l and m.has_key('*'):
+ # Use '*' only if the remaining path is empty.
+ return (m['*'], tuple(p[x:]))
+ if m.has_key('**'):
+ return (m['**'], tuple(p[x:]))
+ raise InvalidPath
+ try:
+ result = m['']
+ except KeyError:
+ if m.has_key('*'):
+ result = m['*']
+ elif m.has_key('**'):
+ result = m['**']
+ else:
+ raise InvalidPath
+ return (result, ())
+
+class Result:
+ """Base class for result objects."""
+
+ def __init__(self):
+ self.status = 500
+
+ def flatten(self, write):
+ pass
+
+class RedirectResult(Result):
+ """Permanently redirects the browser to a new URL."""
+ def __init__(self, url):
+ self.status = 301
+ self.url = str(url)
+
+ def flatten(self, write):
+ write("Status: %d\nLocation: %s\n\n" % (self.status, self.url))
+
+class HTMLResult(Result):
+ """An object of this class combines a status code with HTML contents."""
+ def __init__(self, contents, status=200, doctype=''):
+ self.contents = contents
+ self.status = status
+ self.doctype = doctype
+
+ def flatten(self, write):
+ """Invokes write for the response header and all HTML data.
+ Includes the doctype declaration."""
+
+ if self.status <> 200:
+ write("Status: %d\n" % self.status)
+ write("Content-Type: text/html\n\n%s\n" % self.doctype)
+ self.contents.flatten(write)
+
+class WebService(Service):
+ def __init__(self, socket_name):
+ Service.__init__(self, socket_name)
+ self.__router = PathRouter()
+
+ def register(self, path, method):
+ """Requests that method is invoked if path is encountered.
+
+ The path has the syntax required by PathRouter.register. The
+ method should be a function taking several arguments
+
+ - the remaining path
+ - a dictionary for the request parameters
+ - a URLFactory object
+
+ The method is expected to return a HTMLResult object.
+ """
+ self.__router.register(path, method)
+
+ def __writeError(self, result, code, msg):
+ result.write('Status: %d\nContent-Type: text/plain\n\n%s\n'
+ % (code, msg))
+
+ def html_dtd(self):
+ """Returns the DOCTYPE declaration to be used for HTML documents.
+ Can be overridden."""
+ return '<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">'
+
+ def add_title(self, title, body, head_contents=None, body_attribs={}):
+ """Takes a sequence of HTML objects and wraps them in 'body'
+ and 'html' tags. Puts title in front of it, and optionally
+ includes the head_contents material. The attributes of the
+ body element are taken from the body_attribs dictionary."""
+ t = TITLE(title)
+ if head_contents is None:
+ head_list = [t]
+ else:
+ if isinstance(head_contents, HTMLBase):
+ head_list = [head_contents]
+ else:
+ head_list = list(head_contents)
+ head_list.append(t)
+ if isinstance(body, HTMLBase):
+ body_list = [body]
+ else:
+ body_list = list(body)
+ body_list[:0] = (H1(title),)
+
+ return tag('html',
+ (HEAD(head_list), Tag('body', body_list, body_attribs)))
+
+ def pre_dispatch(self, url):
+ """Invoked by handle prior to calling the registered handler."""
+ pass
+
+ def handle(self, args, environment, data, result):
+ params = cgi.parse(data, environment)
+ path = environment.get('PATH_INFO', '')
+ server_name = environment.get('SERVER_NAME', '')
+ script_name = environment.get('SCRIPT_NAME', '')
+
+ try:
+ (method, remaining) = self.__router.get(path)
+ except InvalidPath:
+ self.__writeError(result, 404, "page not found")
+ return
+ self.pre_dispatch()
+ url = URLFactory(server_name, script_name)
+ r = method(remaining, params, url)
+ assert isinstance(r, Result), `r`
+ r.flatten(result.write)
+
+def __test():
+ assert str(URL("")) == ""
+ assert str(URL("abc")) == "abc"
+ assert str(URL(" ")) == " "
+ assert str(URL("&")) == "&"
+
+ u = URLFactory(server_name=None, script_name=None)
+ assert str(u.absolute("http://www.enyo.de/")) == "http://www.enyo.de/"
+ assert str(u.absolute("http://www.enyo.de/", t='123')) \
+ == "http://www.enyo.de/?t=123"
+ assert str(u.scriptRelative("/a/b", t='123')) == "/a/b?t=123"
+ assert str(u.scriptRelativeFull("/a/b", t='123')) \
+ == "http://localhost/a/b?t=123"
+
+ u = URLFactory(server_name='localhost.localdomain',
+ script_name='/cgi-bin/test.cgi')
+ assert str(u.scriptRelative("a/b", t='123')) \
+ == "/cgi-bin/test.cgi/a/b?t=123"
+ assert str(u.scriptRelativeFull("a/b", t='123=')) \
+ == "http://localhost.localdomain/cgi-bin/test.cgi/a/b?t=123%3D"
+
+ assert P("").toString() == '<p></p>'
+ assert P(" ").toString() == '<p> </p>'
+ assert P("&").toString() == '<p>&</p>'
+ assert P("\"").toString() == '<p>"</p>'
+ assert P("<").toString() == '<p><</p>'
+ assert P(">").toString() == '<p>></p>'
+ assert P(">").toHTML().toString() == '<p>></p>'
+ assert FORM(method='get').toString() == '<form method="get"/>'
+ assert SPAN("green", _class="red").toString() \
+ == '<span class="red">green</span>'
+ assert TD(A("http://www.example.net/", "example")).toString() \
+ == '<td><a href="http://www.example.net/">example</a></td>'
+
+ s = cStringIO.StringIO()
+ RedirectResult(u.scriptRelativeFull("123")).flatten(s.write)
+ assert s.getvalue() == '''Status: 301
+Location: http://localhost.localdomain/cgi-bin/test.cgi/123
+
+'''
+
+ assert make_menu(u.scriptRelative,
+ ("123", "A"),
+ ("456", "B")).toString() == \
+ '<ul><li><a href="/cgi-bin/test.cgi/123">A</a></li><li><a href="/cgi-bin/test.cgi/456">B</a></li></ul>'
+
+ pr = PathRouter()
+ pr.register('', "root")
+ pr.register('/*', "default")
+ pr.register('/abc', "/abc")
+ pr.register('/a/bc', "/a/bc")
+ pr.register('ab/c', "/ab/c")
+ pr.register('/a', "/a")
+ pr.register('/a/**', "/a/**")
+ pr.register('/a/*', "/a/*")
+
+ assert pr.get("") == ("root", ())
+ assert pr.get("/") == ("root", ())
+ assert pr.get("//") == ("root", ())
+ assert pr.get("/xyz") == ("default", ("xyz",))
+ assert pr.get("/a//xyz/") == ("/a/*", ("xyz",))
+ assert pr.get("/a//xyz/123") == ("/a/**", ("xyz", "123"))
+ assert pr.get("/abc") == ("/abc", ())
+
+if __name__ == "__main__":
+ __test()
More information about the Secure-testing-commits
mailing list