[Pkg-mailman-hackers] Pkg-mailman commit - rev 738 - in trunk/debian: . tests

Thijs Kinkhorst thijs at alioth.debian.org
Sun Aug 4 11:00:55 UTC 2013


Author: thijs
Date: 2013-08-04 11:00:53 +0000 (Sun, 04 Aug 2013)
New Revision: 738

Added:
   trunk/debian/tests/
   trunk/debian/tests/control
   trunk/debian/tests/mailman
   trunk/debian/tests/test-mailman.py
   trunk/debian/tests/testlib.py
   trunk/debian/tests/testlib_httpd.py
Modified:
   trunk/debian/changelog
   trunk/debian/control
Log:
Add autopkgtests


Modified: trunk/debian/changelog
===================================================================
--- trunk/debian/changelog	2013-08-04 10:56:58 UTC (rev 737)
+++ trunk/debian/changelog	2013-08-04 11:00:53 UTC (rev 738)
@@ -8,6 +8,7 @@
   * Updates to Russian debconf templates, thanks Ivan Krylov!
     (closes: #710268).
   * Needs at least version 3.8.0 of logrotate (closes: #687215).
+  * Add autopkgtests, thanks Yolanda Robla! (closes: #710095)
   * Packaging cleanup: checked for policy 3.9.4, update Vcs URL,
     recommend default-mta instead of exim4.
 

Modified: trunk/debian/control
===================================================================
--- trunk/debian/control	2013-08-04 10:56:58 UTC (rev 737)
+++ trunk/debian/control	2013-08-04 11:00:53 UTC (rev 738)
@@ -12,6 +12,7 @@
 Vcs-Svn: svn://anonscm.debian.org/pkg-mailman/trunk
 Vcs-Browser: http://anonscm.debian.org/viewvc/pkg-mailman/trunk/
 X-Python-Version: >= 2.4
+XS-Testsuite: autopkgtest
 
 Package: mailman
 Architecture: any

Added: trunk/debian/tests/control
===================================================================
--- trunk/debian/tests/control	                        (rev 0)
+++ trunk/debian/tests/control	2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,3 @@
+Tests: mailman
+Depends: @, apache2-mpm-prefork, elinks, postfix, procmail
+Restrictions: needs-root

Added: trunk/debian/tests/mailman
===================================================================
--- trunk/debian/tests/mailman	                        (rev 0)
+++ trunk/debian/tests/mailman	2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,6 @@
+#!/bin/bash
+#----------------
+# Testing mailman
+#----------------
+set -e
+python `dirname $0`/test-mailman.py 2>&1

Added: trunk/debian/tests/test-mailman.py
===================================================================
--- trunk/debian/tests/test-mailman.py	                        (rev 0)
+++ trunk/debian/tests/test-mailman.py	2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,459 @@
+#!/usr/bin/python
+#
+#    test-mailman.py quality assurance test script for mailman
+#    Copyright (C) 2010 Canonical Ltd.
+#    Author: Marc Deslauriers <marc.deslauriers at canonical.com>
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License version 3,
+#    as published by the Free Software Foundation.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# packages required for test to run:
+# QRT-Packages: apache2-mpm-prefork elinks postfix mailman procmail
+# packages where more than one package can satisfy a runtime requirement:
+# QRT-Alternates: 
+# files and directories required for the test to run:
+# QRT-Depends: testlib_httpd.py 
+# privilege required for the test to run (remove line if running as user is okay):
+# QRT-Privilege: root
+# QRT-Conflicts: apache2-mpm-event apache2-mpm-itk apache2-mpm-worker exim4
+
+'''
+    In general, this test should be run in a virtual machine (VM) or possibly
+    a chroot and not on a production machine. While efforts are made to make
+    these tests non-destructive, there is no guarantee this script will not
+    alter the machine. You have been warned.
+
+    When installing mailman, be sure to install the english language files
+    When postfix installs, select "local only"
+
+    How to run in a clean VM:
+    $ sudo apt-get -y install python-unit <QRT-Packages> && sudo ./test-mailman.py -v'
+
+    How to run in a clean schroot named 'lucid':
+    $ schroot -c lucid -u root -- sh -c 'apt-get -y install python-unit <QRT-Packages> && ./test-mailman.py -v'
+'''
+
+
+import unittest, subprocess, sys, os, time, smtplib
+import urllib, urllib2, cookielib, re, tempfile
+import testlib
+import testlib_httpd
+
+try:
+    from private.qrt.mailman import PrivateMailmanTest
+except ImportError:
+    class PrivateMailmanTest(object):
+        '''Empty class'''
+    print >>sys.stdout, "Skipping private tests"
+
+class MailmanTest(testlib_httpd.HttpdCommon, PrivateMailmanTest):
+    '''Test Mailman.'''
+
+    def setUp(self):
+        '''Set up prior to each test_* function'''
+        self.mailman_daemon = testlib.TestDaemon("/etc/init.d/mailman")
+        self.mailman_cfg = '/etc/mailman/mm_cfg.py'
+        self.mailman_aliases = '/var/lib/mailman/data/aliases'
+        self.mailman_pid = '/var/run/mailman/mailman.pid'
+        self.postfix_daemon = testlib.TestDaemon("/etc/init.d/postfix")
+        self.postfix_mastercf = '/etc/postfix/master.cf'
+        self.postfix_maincf = '/etc/postfix/main.cf'
+        self.postfix_transport = '/etc/postfix/transportqrt'
+        self.postfix_aliases = '/etc/aliases'
+        self.apache_pid = "/var/run/apache2.pid"
+        self.ports_file = "/etc/apache2/ports.conf"
+        self.default_site = "/etc/apache2/sites-available/default"
+        self.mailman_site = "/etc/apache2/sites-enabled/mailman"
+        self.tempdir = tempfile.mkdtemp()
+
+        self.cj = cookielib.LWPCookieJar()
+        self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self.cj))
+
+        # Make sure daemons are stopped before we begin
+        self.postfix_daemon.stop()
+        self.mailman_daemon.stop()
+
+        testlib.config_replace(self.mailman_aliases, "", append=True)
+        testlib.config_set(self.mailman_cfg,'MTA',"'Postfix'")
+        subprocess.call(['/usr/lib/mailman/bin/genaliases'], stdout=subprocess.PIPE)
+        subprocess.call(['chown', 'root:list', self.mailman_aliases])
+        # Is this a packaging mistake?
+        subprocess.call(['chown', 'list:list', '/var/lib/mailman/archives/private'])
+
+        self._zap_lists()
+        subprocess.call(['/usr/sbin/newlist', '-q', 'mailman at lists.example.com', 'root at example.com' ,'ubuntu'], stdout=subprocess.PIPE)
+
+        self._setUp_postfix()
+        self._setUp_apache()
+
+        self.mailman_daemon.restart()
+
+        self.user = testlib.TestUser(lower=True)
+        self.s = None
+        # Silently allow for this connection to fail, to handle the
+        # initial setup of the postfix server.
+        try:
+            self.s = smtplib.SMTP('localhost', port=25)
+        except:
+            pass
+
+    def tearDown(self):
+        '''Clean up after each test_* function'''
+
+        try:
+            self.s.quit()
+        except:
+            pass
+        self.user = None
+
+        self._zap_lists()
+
+        if os.path.exists(self.tempdir):
+            testlib.recursive_rm(self.tempdir)
+
+        testlib.config_restore(self.mailman_cfg)
+        testlib.config_restore(self.mailman_aliases)
+
+        self._tearDown_postfix()
+        self._tearDown_apache()
+
+        self.mailman_daemon.stop()
+
+    def _zap_lists(self):
+        '''Remove existing mailman lists.'''
+
+        if os.path.exists('/var/lib/mailman/lists/testlist'):
+            subprocess.call(['/usr/sbin/rmlist', '-a', 'testlist'], stdout=subprocess.PIPE)
+        if os.path.exists('/var/lib/mailman/lists/mailman'):
+            subprocess.call(['/usr/sbin/rmlist', '-a', 'mailman'], stdout=subprocess.PIPE)
+
+    def _setUp_postfix(self):
+        '''Create Postfix server configs.'''
+        testlib.config_replace(self.postfix_mastercf, "", append=True)
+
+        testlib.config_set(self.postfix_maincf,'mydestination','example.com, localhost.localdomain, localhost')
+
+        # Move listener to localhost:25
+        master = open('/etc/postfix/master.cf.new','w')
+        for cfline in open(self.postfix_mastercf):
+            if cfline.startswith('smtp') and 'smtpd' in cfline and 'inet' in cfline:
+                master.write('127.0.0.1:25      inet  n       -       -       -       -       smtpd\n')
+            else:
+                master.write(cfline)
+        master.write('''mailman   unix  -       n       n       -       -       pipe
+  flags=FR user=list argv=/usr/lib/mailman/bin/postfix-to-mailman.py
+  ${nexthop} ${user}''')
+        master.close()
+        os.rename('/etc/postfix/master.cf.new',self.postfix_mastercf)
+
+        # Use mbox only
+        testlib.config_comment(self.postfix_maincf,'home_mailbox')
+        testlib.config_set(self.postfix_maincf,'mailbox_command','procmail -a "$EXTENSION"')
+
+        # Config mailman
+        testlib.config_set(self.postfix_maincf,'relay_domains','lists.example.com')
+        testlib.config_set(self.postfix_maincf,'transport_maps','hash:%s' % self.postfix_transport)
+        testlib.config_set(self.postfix_maincf,'mailman_destination_recipient_limit','1')
+        testlib.config_set(self.postfix_maincf,'alias_maps','hash:%s, hash:%s' % (self.postfix_aliases,self.mailman_aliases))
+
+        testlib.config_replace(self.postfix_transport, "lists.example.com      mailman:")
+        subprocess.call(['postmap', self.postfix_transport], stdout=subprocess.PIPE)
+
+        testlib.config_replace(self.postfix_aliases, '''mailman:              "|/var/lib/mailman/mail/mailman post mailman"
+mailman-admin:        "|/var/lib/mailman/mail/mailman admin mailman"
+mailman-bounces:      "|/var/lib/mailman/mail/mailman bounces mailman"
+mailman-confirm:      "|/var/lib/mailman/mail/mailman confirm mailman"
+mailman-join:         "|/var/lib/mailman/mail/mailman join mailman"
+mailman-leave:        "|/var/lib/mailman/mail/mailman leave mailman"
+mailman-owner:        "|/var/lib/mailman/mail/mailman owner mailman"
+mailman-request:      "|/var/lib/mailman/mail/mailman request mailman"
+mailman-subscribe:    "|/var/lib/mailman/mail/mailman subscribe mailman"
+mailman-unsubscribe:  "|/var/lib/mailman/mail/mailman unsubscribe mailman"''', append=True)
+
+        subprocess.call(['chown', 'root:list', self.postfix_aliases])
+        subprocess.call(['newaliases'])
+
+        # Restart server
+        self.postfix_daemon.restart()
+        # Postfix exits its init script before the master listener has started
+        time.sleep(2)
+
+    def _tearDown_postfix(self):
+        '''Tear down Postfix'''
+
+        self.postfix_daemon.stop()
+
+        testlib.config_restore(self.postfix_mastercf)
+        testlib.config_restore(self.postfix_maincf)
+        testlib.config_restore(self.postfix_aliases)
+
+        subprocess.call(['chown', 'root:root', self.postfix_aliases])
+
+        if os.path.exists(self.postfix_transport):
+            os.unlink(self.postfix_transport)
+        if os.path.exists(self.postfix_transport + ".db"):
+            os.unlink(self.postfix_transport + ".db")
+
+    def _setUp_apache(self):
+        '''Set up Apache'''
+
+        # Change the default port, so we can run in a schroot
+        testlib.config_replace(self.ports_file, "", append=True)
+        subprocess.call(['sed', '-i', 's/80/8000/g', self.ports_file])
+        testlib.config_replace(self.default_site, "", append=True)
+        subprocess.call(['sed', '-i', 's/80/8000/g', self.default_site])
+
+        if os.path.exists(self.mailman_site):
+            os.unlink(self.mailman_site)
+
+        if self.lsb_release['Release'] == 6.06:
+            self._dapper_apache_conf()
+        else:
+            os.symlink("/etc/mailman/apache.conf", self.mailman_site)
+
+        testlib_httpd.HttpdCommon._setUp(self)
+
+    def _tearDown_apache(self):
+        '''Tear down Apache'''
+
+        if os.path.exists(self.mailman_site):
+            os.unlink(self.mailman_site)
+
+        testlib.config_restore(self.ports_file)
+        testlib.config_restore(self.default_site)
+        testlib_httpd.HttpdCommon._tearDown(self)
+
+    def _dapper_apache_conf(self):
+        '''Create an example apache conf file for Dapper.'''
+        conf_file = open(self.mailman_site,'w')
+        conf_file.write('''ScriptAlias /cgi-bin/mailman/ /usr/lib/cgi-bin/mailman/
+Alias /pipermail/ /var/lib/mailman/archives/public/
+Alias /images/mailman/ /usr/share/images/mailman/
+<Directory /usr/lib/cgi-bin/mailman/>
+    AllowOverride None
+    Options ExecCGI
+    AddHandler cgi-script .cgi
+    Order allow,deny
+    Allow from all
+</Directory>
+<Directory /var/lib/mailman/archives/public/>
+    Options Indexes FollowSymlinks
+    AllowOverride None
+    Order allow,deny
+    Allow from all
+</Directory>
+<Directory /usr/share/images/mailman/>
+    AllowOverride None
+    Order allow,deny
+    Allow from all
+</Directory>
+''')
+        conf_file.close()
+
+    def _deliver_email(self, from_addr, to_addr, body):
+        '''Perform mail delivery'''
+        result = self.s.sendmail(from_addr, to_addr, body)
+
+    def _check_email(self, user, pattern, timeout=30):
+        '''Get mailman confirmation email'''
+        re_pattern = re.compile(pattern)
+        spool_file = '/var/mail/%s' % (user.login)
+        result = None
+        contents = ''
+        while timeout > 0:
+            if os.path.exists(spool_file):
+                contents = open(spool_file).read()
+                result = re_pattern.search(contents)
+                if result != None:
+                    break
+            time.sleep(1)
+            timeout -= 1
+        self.assertTrue(timeout > 0, "Reached timeout searching for pattern in '%s'" % contents)
+        return result
+
+    def _get_confirmation(self, user):
+        '''Get mailman confirmation email'''
+        pattern = 'confirm (\w+)\n'
+        result = self._check_email(user, pattern)
+        return result.group(1)
+
+    def _test_roundtrip_mail(self, user):
+        '''Send and check email delivery'''
+
+        body = '''From: Rooty <root>
+To: "%s" <%s at example.com>
+Subject: This is test 1
+
+Hello, nice to meet you.
+''' % (user.gecos, user.login)
+
+        self._deliver_email('root', user.login + '@example.com', body)
+        pattern = "Subject: This is test 1"
+        self._check_email(user, pattern)
+
+    def test_aaa_daemons(self):
+        '''Test daemon'''
+
+        self.assertTrue(testlib.check_pidfile("apache2", self.apache_pid))
+        self.assertTrue(testlib.check_pidfile("python", self.mailman_pid))
+
+    def test_aaa_status(self):
+        '''Test status (apache2ctl)'''
+        rc, report = testlib.cmd(['apache2ctl', 'status'])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+
+    def test_aab_http(self):
+        '''Test http'''
+        self._test_url("http://localhost:8000/")
+
+        test_str = testlib_httpd.create_html_page(self.html_page)
+        self._test_url("http://localhost:8000/" + \
+                       os.path.basename(self.html_page), test_str)
+        self._test_url("http://localhost:8000/cgi-bin/mailman/listinfo/mailman",
+                       "About Mailman")
+
+    def test_aac_sending_mail_direct(self):
+        '''Test postfix mail delivery'''
+        self._test_roundtrip_mail(self.user)
+
+    def test_baa_mailman_subscribe(self):
+        '''Test mailman subscription'''
+
+        password = "Ubuntu"
+
+        # Try and subscribe to the Mailman list
+        values = { 'email': self.user.login + "@example.com",
+                   'pw': password,
+                   'pw-conf': password,
+                 }
+        data = urllib.urlencode(values)
+        req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/subscribe/mailman', data)
+        result = self.opener.open(req).read()
+        self.assertTrue('Mailman Subscription results' in result, result)
+
+        # Parse the confirmation email
+        conf = self._get_confirmation(self.user)
+
+        # Now let's confirm via the web page
+        values = { 'cookie': conf,
+                   'realname': '',
+                   'digests': '0',
+                   'language': 'en',
+                   'submit': 'Subscribe to list Mailman',
+                 }
+        data = urllib.urlencode(values)
+        req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/confirm/mailman', data)
+        result = self.opener.open(req).read()
+        self.assertTrue('successfully confirmed your subscription request' in result, result)
+
+        # Send an email to the list
+        body = '''From: "%s" <%s at example.com>
+To: "Mailman list" <mailman at lists.example.com>
+Subject: This is mailman test
+
+Yay! My first post. Ubuntu rocks!
+''' % (self.user.gecos, self.user.login)
+
+        self._deliver_email(self.user.login + '@example.com', 'mailman at lists.example.com', body)
+
+        # See if it was delivered
+        pattern = "Ubuntu rocks"
+        self._check_email(self.user, pattern)
+
+    def test_cve_2010_3089(self):
+        '''Test CVE-2010-3089'''
+
+        tempconf = os.path.join(self.tempdir, 'templist-config')
+
+        # Create a test list and insert XSS into description
+        subprocess.call(['/usr/sbin/newlist', '-q', 'testlist at lists.example.com', 'root at example.com' ,'ubuntu'], stdout=subprocess.PIPE)
+        subprocess.call(['/usr/sbin/config_list', '-o', tempconf, 'testlist'], stdout=subprocess.PIPE)
+        testlib.config_set(tempconf,'description',"'<XSSTEST>'")
+        subprocess.call(['/usr/sbin/config_list', '-i', tempconf, 'testlist'], stdout=subprocess.PIPE)
+
+        request = "GET /cgi-bin/mailman/listinfo/testlist HTTP/1.1\nHost: localhost\nConnection: close\n\n"
+        self._test_raw(request, '<XSSTEST>', port=8000, invert=True)
+        self._test_raw(request, '<XSSTEST>', port=8000)
+
+    def test_cve_2011_0707(self):
+        '''Test CVE-2011-0707'''
+
+        password = "Ubuntu"
+
+        # Try and subscribe to the Mailman list
+        values = { 'email': self.user.login + "@example.com",
+                   'realname': '<XSSTEST>',
+                   'pw': password,
+                   'pw-conf': password,
+                 }
+        data = urllib.urlencode(values)
+        req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/subscribe/mailman', data)
+        result = self.opener.open(req).read()
+        self.assertTrue('Mailman Subscription results' in result, result)
+
+        # Parse the confirmation email
+        conf = self._get_confirmation(self.user)
+
+        # Now let's confirm via the web page
+        values = { 'cookie': conf,
+                   'realname': '<XSSTEST>',
+                   'digests': '0',
+                   'language': 'en',
+                   'submit': 'Subscribe to list Mailman',
+                 }
+        data = urllib.urlencode(values)
+        req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/confirm/mailman', data)
+        result = self.opener.open(req).read()
+        self.assertTrue('successfully confirmed your subscription request' in result, result)
+
+        # Kill email so we can parse the new confirmation string
+        subprocess.call(['rm','-rf', '/var/mail/'+self.user.login])
+
+        # Look at the unsubscribe page
+        values = { 'email': self.user.login + "@example.com",
+                   'password': password,
+                   'language': 'en',
+                   'login-unsub': 'Unsubscribe',
+                 }
+        data = urllib.urlencode(values)
+        req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/options/mailman', data)
+        result = self.opener.open(req).read()
+
+        # Parse the confirmation email
+        conf = self._get_confirmation(self.user)
+
+        # Now let's confirm via the web page
+        values = { 'cookie': conf,
+                   'email': self.user.login + "@example.com",
+                   'password': password,
+                   'language': 'en',
+                 }
+        data = urllib.urlencode(values)
+        req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/confirm/mailman', data)
+        result = self.opener.open(req).read()
+
+        self._word_find(result, '<XSSTEST>', invert=True)
+        self._word_find(result, '<XSSTEST>')
+
+
+if __name__ == '__main__':
+    # simple
+    unittest.main()
+
+    # more configurable
+    #suite = unittest.TestSuite()
+    #suite.addTest(unittest.TestLoader().loadTestsFromTestCase(PkgTest))
+    #rc = unittest.TextTestRunner(verbosity=2).run(suite)
+    #if not rc.wasSuccessful():
+    #    sys.exit(1)

Added: trunk/debian/tests/testlib.py
===================================================================
--- trunk/debian/tests/testlib.py	                        (rev 0)
+++ trunk/debian/tests/testlib.py	2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,1144 @@
+#
+#    testlib.py quality assurance test script
+#    Copyright (C) 2008-2011 Canonical Ltd.
+#
+#    This library is free software; you can redistribute it and/or
+#    modify it under the terms of the GNU Library General Public
+#    License as published by the Free Software Foundation; either
+#    version 2 of the License.
+#
+#    This library is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+#    Library General Public License for more details.
+#
+#    You should have received a copy of the GNU Library General Public
+#    License along with this program.  If not, see
+#    <http://www.gnu.org/licenses/>.
+#
+
+'''Common classes and functions for package tests.'''
+
+import string, random, crypt, subprocess, pwd, grp,  signal, time, unittest, tempfile, shutil, os, os.path, re, glob
+import sys, socket, gzip
+from stat import *
+from encodings import string_escape
+
+import warnings
+warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning)
+try:
+    import apt_pkg
+    apt_pkg.InitSystem();
+except:
+    # On non-Debian system, fall back to simple comparison without debianisms
+    class apt_pkg(object):
+        def VersionCompare(one, two):
+            list_one = one.split('.')
+            list_two = two.split('.')
+            while len(list_one)>0 and len(list_two)>0:
+                if list_one[0] > list_two[0]:
+                    return 1
+                if list_one[0] < list_two[0]:
+                    return -1
+                list_one.pop(0)
+                list_two.pop(0)
+            return 0
+
+bogus_nxdomain = "208.69.32.132"
+
+# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
+# This is needed so that the subprocesses that produce endless output
+# actually quit when the reader goes away.
+import signal
+def subprocess_setup():
+    # Python installs a SIGPIPE handler by default. This is usually not what
+    # non-Python subprocesses expect.
+    signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+class TimedOutException(Exception):
+    def __init__(self, value = "Timed Out"):
+        self.value = value
+    def __str__(self):
+        return repr(self.value)
+
+def _restore_backup(path):
+    pathbackup = path + '.autotest'
+    if os.path.exists(pathbackup):
+        shutil.move(pathbackup, path)
+
+def _save_backup(path):
+    pathbackup = path + '.autotest'
+    if os.path.exists(path) and not os.path.exists(pathbackup):
+        shutil.copy2(path, pathbackup)
+        # copy2 does not copy ownership, so do it here.
+        # Reference: http://docs.python.org/library/shutil.html
+        a = os.stat(path)
+        os.chown(pathbackup, a[4], a[5])
+
+def config_copydir(path):
+    if os.path.exists(path) and not os.path.isdir(path):
+        raise OSError, "'%s' is not a directory" % (path)
+    _restore_backup(path)
+
+    pathbackup = path + '.autotest'
+    if os.path.exists(path):
+        shutil.copytree(path, pathbackup, symlinks=True)
+
+def config_replace(path,contents,append=False):
+    '''Replace (or append) to a config file'''
+    _restore_backup(path)
+    if os.path.exists(path):
+        _save_backup(path)
+        if append:
+            contents = file(path).read() + contents
+    open(path, 'w').write(contents)
+
+def config_comment(path, field):
+    _save_backup(path)
+    contents = ""
+    for line in file(path):
+        if re.search("^\s*%s\s*=" % (field), line):
+            line = "#" + line
+        contents += line
+
+    open(path+'.new', 'w').write(contents)
+    os.rename(path+'.new', path)
+
+def config_set(path, field, value, spaces=True):
+    _save_backup(path)
+    contents = ""
+    if spaces==True:
+        setting = '%s = %s\n' % (field, value)
+    else:
+        setting = '%s=%s\n' % (field, value)
+    found = False
+    for line in file(path):
+        if re.search("^\s*%s\s*=" % (field), line):
+            found = True
+            line = setting
+        contents += line
+    if not found:
+        contents += setting
+
+    open(path+'.new', 'w').write(contents)
+    os.rename(path+'.new', path)
+
+def config_patch(path, patch, depth=1):
+    '''Patch a config file'''
+    _restore_backup(path)
+    _save_backup(path)
+
+    handle, name = mkstemp_fill(patch)
+    rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE)
+    os.unlink(name)
+    if rc != 0:
+        raise Exception("Patch failed")
+
+def config_restore(path):
+    '''Rename a replaced config file back to its initial state'''
+    _restore_backup(path)
+
+def timeout(secs, f, *args):
+    def handler(signum, frame):
+        raise TimedOutException()
+
+    old = signal.signal(signal.SIGALRM, handler)
+    result = None
+    signal.alarm(secs)
+    try:
+        result = f(*args)
+    finally:
+        signal.alarm(0)
+        signal.signal(signal.SIGALRM, old)
+
+    return result
+
+def require_nonroot():
+    if os.geteuid() == 0:
+        print >>sys.stderr, "This series of tests should be run as a regular user with sudo access, not as root."
+        sys.exit(1)
+
+def require_root():
+    if os.geteuid() != 0:
+        print >>sys.stderr, "This series of tests should be run with root privileges (e.g. via sudo)."
+        sys.exit(1)
+
+def require_sudo():
+    if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None:
+        print >>sys.stderr, "This series of tests must be run under sudo."
+        sys.exit(1)
+    if os.environ['SUDO_USER'] == 'root':
+        print >>sys.stderr, 'Please run this test using sudo from a regular user. (You ran sudo from root.)'
+        sys.exit(1)
+
+def random_string(length,lower=False):
+    '''Return a random string, consisting of ASCII letters, with given
+    length.'''
+
+    s = ''
+    selection = string.letters
+    if lower:
+        selection = string.lowercase
+    maxind = len(selection)-1
+    for l in range(length):
+        s += selection[random.randint(0, maxind)]
+    return s
+
+def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None):
+    '''As tempfile.mkstemp does, return a (file, name) pair, but with
+    prefilled contents.'''
+
+    handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir)
+    os.close(handle)
+    handle = file(name,"w+")
+    handle.write(contents)
+    handle.flush()
+    handle.seek(0)
+
+    return handle, name
+
+def create_fill(path, contents, mode=0644):
+    '''Safely create a page'''
+    # make the temp file in the same dir as the destination file so we
+    # don't get invalid cross-device link errors when we rename
+    handle, name = mkstemp_fill(contents, dir=os.path.dirname(path))
+    handle.close()
+    os.rename(name, path)
+    os.chmod(path, mode)
+
+def login_exists(login):
+    '''Checks whether the given login exists on the system.'''
+
+    try:
+        pwd.getpwnam(login)
+        return True
+    except KeyError:
+        return False
+
+def group_exists(group):
+    '''Checks whether the given login exists on the system.'''
+
+    try:
+        grp.getgrnam(group)
+        return True
+    except KeyError:
+        return False
+
+def recursive_rm(dirPath, contents_only=False):
+    '''recursively remove directory'''
+    names = os.listdir(dirPath)
+    for name in names:
+        path = os.path.join(dirPath, name)
+        if os.path.islink(path) or not os.path.isdir(path):
+            os.unlink(path)
+        else:
+            recursive_rm(path)
+    if contents_only == False:
+        os.rmdir(dirPath)
+
+def check_pidfile(exe, pidfile):
+    '''Checks if pid in pidfile is running'''
+    if not os.path.exists(pidfile):
+        return False
+
+    # get the pid
+    try:
+        fd = open(pidfile, 'r')
+        pid = fd.readline().rstrip('\n')
+        fd.close()
+    except:
+        return False
+
+    return check_pid(exe, pid)
+
+def check_pid(exe, pid):
+    '''Checks if pid is running'''
+    cmdline = "/proc/%s/cmdline" % (str(pid))
+    if not os.path.exists(cmdline):
+        return False
+
+    # get the command line
+    try:
+        fd = open(cmdline, 'r')
+        tmp = fd.readline().split('\0')
+        fd.close()
+    except:
+        return False
+
+    # this allows us to match absolute paths or just the executable name
+    if re.match('^' + exe + '$', tmp[0]) or \
+       re.match('.*/' + exe + '$', tmp[0]) or \
+       re.match('^' + exe + ': ', tmp[0]) or \
+       re.match('^\(' + exe + '\)', tmp[0]):
+        return True
+
+    return False
+
+def check_port(port, proto, ver=4):
+    '''Check if something is listening on the specified port.
+       WARNING: for some reason this does not work with a bind mounted /proc
+    '''
+    assert (port >= 1)
+    assert (port <= 65535)
+    assert (proto.lower() == "tcp" or proto.lower() == "udp")
+    assert (ver == 4 or ver == 6)
+
+    fn = "/proc/net/%s" % (proto)
+    if ver == 6:
+        fn += str(ver)
+
+    rc, report = cmd(['cat', fn])
+    assert (rc == 0)
+
+    hport = "%0.4x" % port
+
+    if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()):
+        return True
+    return False
+
+def get_arch():
+    '''Get the current architecture'''
+    rc, report = cmd(['uname', '-m'])
+    assert (rc == 0)
+    return report.strip()
+
+def get_memory():
+    '''Gets total ram and swap'''
+    meminfo = "/proc/meminfo"
+    memtotal = 0
+    swaptotal = 0
+    if not os.path.exists(meminfo):
+        return (False, False)
+
+    try:
+        fd = open(meminfo, 'r')
+        for line in fd.readlines():
+            splitline = line.split()
+            if splitline[0] == 'MemTotal:':
+                memtotal = int(splitline[1])
+            elif splitline[0] == 'SwapTotal:':
+                swaptotal = int(splitline[1])
+        fd.close()
+    except:
+        return (False, False)
+
+    return (memtotal,swaptotal)
+
+def is_running_in_vm():
+    '''Check if running under a VM'''
+    # add other virtualization environments here
+    for search in ['QEMU Virtual CPU']:
+        rc, report = cmd_pipe(['dmesg'], ['grep', search])
+        if rc == 0:
+            return True
+    return False
+
+def ubuntu_release():
+    '''Get the Ubuntu release'''
+    f = "/etc/lsb-release"
+    try:
+        size = os.stat(f)[ST_SIZE]
+    except:
+        return "UNKNOWN"
+
+    if size > 1024*1024:
+        raise IOError, 'Could not open "%s" (too big)' % f
+
+    try:
+        fh = open("/etc/lsb-release", 'r')
+    except:
+        raise
+
+    lines = fh.readlines()
+    fh.close()
+
+    pat = re.compile(r'DISTRIB_CODENAME')
+    for line in lines:
+        if pat.search(line):
+            return line.split('=')[1].rstrip('\n').rstrip('\r')
+
+    return "UNKNOWN"
+
+def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None):
+    '''Try to execute given command (array) and return its stdout, or return
+    a textual error if it failed.'''
+
+    try:
+        sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
+    except OSError, e:
+        return [127, str(e)]
+
+    out, outerr = sp.communicate(input)
+    # Handle redirection of stdout
+    if out == None:
+        out = ''
+    # Handle redirection of stderr
+    if outerr == None:
+        outerr = ''
+    return [sp.returncode,out+outerr]
+
+def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None):
+    '''Try to pipe command1 into command2.'''
+    try:
+        sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
+        sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
+    except OSError, e:
+        return [127, str(e)]
+
+    out = sp2.communicate(input)[0]
+    return [sp2.returncode,out]
+
+def cwd_has_enough_space(cdir, total_bytes):
+    '''Determine if the partition of the current working directory has 'bytes'
+       free.'''
+    rc, df_output = cmd(['df'])
+    result = 'Got exit code %d, expected %d\n' % (rc, 0)
+    if rc != 0:
+        return False
+
+    kb = total_bytes / 1024
+
+    mounts = dict()
+    for line in df_output.splitlines():
+        if '/' not in line:
+            continue
+        tmp = line.split()
+        mounts[tmp[5]] = int(tmp[3])
+
+    cdir = os.getcwd()
+    while cdir != '/':
+        if not mounts.has_key(cdir):
+            cdir = os.path.dirname(cdir)
+            continue
+        if kb < mounts[cdir]:
+            return True
+        else:
+            return False
+
+    if kb < mounts['/']:
+        return True
+
+    return False
+
+def get_md5(filename):
+    '''Gets the md5sum of the file specified'''
+
+    (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename])
+    expected = 0
+    assert (expected == rc)
+
+    return report.split(' ')[0]
+
+def dpkg_compare_installed_version(pkg, check, version):
+    '''Gets the version for the installed package, and compares it to the
+       specified version.
+    '''
+    (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg])
+    assert (rc == 0)
+    assert ("Status: install ok installed" in report)
+    installed_version = ""
+    for line in report.splitlines():
+        if line.startswith("Version: "):
+            installed_version = line.split()[1]
+
+    assert (installed_version != "")
+
+    (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version])
+    assert (rc == 0 or rc == 1)
+    if rc == 0:
+        return True
+    return False
+
+def prepare_source(source, builder, cached_src, build_src, patch_system):
+    '''Download and unpack source package, installing necessary build depends,
+       adjusting the permissions for the 'builder' user, and returning the
+       directory of the unpacked source. Patch system can be one of:
+       - cdbs
+       - dpatch
+       - quilt
+       - quiltv3
+       - None (not the string)
+
+       This is normally used like this:
+
+       def setUp(self):
+           ...
+           self.topdir = os.getcwd()
+           self.cached_src = os.path.join(os.getcwd(), "source")
+           self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp')
+           self.builder = testlib.TestUser()
+           testlib.cmd(['chgrp', self.builder.login, self.tmpdir])
+           os.chmod(self.tmpdir, 0775)
+
+       def tearDown(self):
+           ...
+           self.builder = None
+           self.topdir = os.getcwd()
+           if os.path.exists(self.tmpdir):
+               testlib.recursive_rm(self.tmpdir)
+
+       def test_suite_build(self):
+           ...
+           build_dir = testlib.prepare_source('foo', \
+                                         self.builder, \
+                                         self.cached_src, \
+                                         os.path.join(self.tmpdir, \
+                                           os.path.basename(self.cached_src)),
+                                         "quilt")
+           os.chdir(build_dir)
+
+           # Example for typical build, adjust as necessary
+           print ""
+           print "  make clean"
+           rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean'])
+
+           print "  configure"
+           rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug'])
+
+           print "  make (will take a while)"
+           rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make'])
+
+           print "  make check (will take a while)",
+           rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check'])
+           expected = 0
+           result = 'Got exit code %d, expected %d\n' % (rc, expected)
+           self.assertEquals(expected, rc, result + report)
+
+        def test_suite_cleanup(self):
+            ...
+            if os.path.exists(self.cached_src):
+                testlib.recursive_rm(self.cached_src)
+
+       It is up to the caller to clean up cached_src and build_src (as in the
+       above example, often the build_src is in a tmpdir that is cleaned in
+       tearDown() and the cached_src is cleaned in a one time clean-up
+       operation (eg 'test_suite_cleanup()) which must be run after the build
+       suite test (obviously).
+       '''
+
+    # Make sure we have a clean slate
+    assert (os.path.exists(os.path.dirname(build_src)))
+    assert (not os.path.exists(build_src))
+
+    cdir = os.getcwd()
+    if os.path.exists(cached_src):
+        shutil.copytree(cached_src, build_src)
+        os.chdir(build_src)
+    else:
+        # Only install the build dependencies on the initial setup
+        rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source])
+        assert (rc == 0)
+
+        os.makedirs(build_src)
+        os.chdir(build_src)
+
+        # These are always needed
+        pkgs = ['build-essential', 'dpkg-dev', 'fakeroot']
+        rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs)
+        assert (rc == 0)
+
+        rc, report = cmd(['apt-get','source',source])
+        assert (rc == 0)
+        shutil.copytree(build_src, cached_src)
+
+    unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0])
+
+    # Now apply the patches. Do it here so that we don't mess up our cached
+    # sources.
+    os.chdir(unpacked_dir)
+    assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None])
+    if patch_system != None and patch_system != "quiltv3":
+        if patch_system == "quilt":
+            os.environ.setdefault('QUILT_PATCHES','debian/patches')
+            rc, report = cmd(['quilt', 'push', '-a'])
+            assert (rc == 0)
+        elif patch_system == "cdbs":
+            rc, report = cmd(['./debian/rules', 'apply-patches'])
+            assert (rc == 0)
+        elif patch_system == "dpatch":
+            rc, report = cmd(['dpatch', 'apply-all'])
+            assert (rc == 0)
+
+    cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src])
+    os.chdir(cdir)
+
+    return unpacked_dir
+
+def _aa_status():
+    '''Get aa-status output'''
+    exe = "/usr/sbin/aa-status"
+    assert (os.path.exists(exe))
+    if os.geteuid() == 0:
+        return cmd([exe])
+    return cmd(['sudo', exe])
+
+def is_apparmor_loaded(path):
+    '''Check if profile is loaded'''
+    rc, report = _aa_status()
+    if rc != 0:
+        return False
+
+    for line in report.splitlines():
+        if line.endswith(path):
+            return True
+    return False
+
+def is_apparmor_confined(path):
+    '''Check if application is confined'''
+    rc, report = _aa_status()
+    if rc != 0:
+        return False
+
+    for line in report.splitlines():
+        if re.search('%s \(' % path, line):
+            return True
+    return False
+
+def check_apparmor(path, first_ubuntu_release, is_running=True):
+    '''Check if path is loaded and confined for everything higher than the
+       first Ubuntu release specified.
+
+       Usage:
+        rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True)
+        if rc < 0:
+            return self._skipped(report)
+
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+     '''
+    global manager
+    rc = -1
+
+    if manager.lsb_release["Release"] < first_ubuntu_release:
+        return (rc, "Skipped apparmor check")
+
+    if not os.path.exists('/sbin/apparmor_parser'):
+        return (rc, "Skipped (couldn't find apparmor_parser)")
+
+    rc = 0
+    msg = ""
+    if not is_apparmor_loaded(path):
+        rc = 1
+        msg = "Profile not loaded for '%s'" % path
+
+    # this check only makes sense it the 'path' is currently executing
+    if is_running and rc == 0 and not is_apparmor_confined(path):
+        rc = 1
+        msg = "'%s' is not running in enforce mode" % path
+
+    return (rc, msg)
+
+def get_gcc_version(gcc, full=True):
+    gcc_version = 'none'
+    if not gcc.startswith('/'):
+        gcc = '/usr/bin/%s' % (gcc)
+    if os.path.exists(gcc):
+        gcc_version = 'unknown'
+        lines = cmd([gcc,'-v'])[1].strip().splitlines()
+        version_lines = [x for x in lines if x.startswith('gcc version')]
+        if len(version_lines) == 1:
+            gcc_version = " ".join(version_lines[0].split()[2:])
+    if not full:
+        return gcc_version.split()[0]
+    return gcc_version
+
+def is_kdeinit_running():
+    '''Test if kdeinit is running'''
+    # applications that use kdeinit will spawn it if it isn't running in the
+    # test. This is a problem because it does not exit. This is a helper to
+    # check for it.
+    rc, report = cmd(['ps', 'x'])
+    if 'kdeinit4 Running' not in report:
+        print >>sys.stderr, ("kdeinit not running (you may start/stop any KDE application then run this script again)")
+        return False
+    return True
+
+def get_pkgconfig_flags(libs=[]):
+    '''Find pkg-config flags for libraries'''
+    assert (len(libs) > 0)
+    rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs)
+    expected = 0
+    if rc != expected:
+        print >>sys.stderr, 'Got exit code %d, expected %d\n' % (rc, expected)
+    assert(rc == expected)
+    return pkg_config.split()
+
+class TestDaemon:
+    '''Helper class to manage daemons consistently'''
+    def __init__(self, init):
+        '''Setup daemon attributes'''
+        self.initscript = init
+
+    def start(self):
+        '''Start daemon'''
+        rc, report = cmd([self.initscript, 'start'])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        time.sleep(2)
+        if expected != rc:
+            return (False, result + report)
+
+        if "fail" in report:
+            return (False, "Found 'fail' in report\n" + report)
+
+        return (True, "")
+
+    def stop(self):
+        '''Stop daemon'''
+        rc, report = cmd([self.initscript, 'stop'])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        if expected != rc:
+            return (False, result + report)
+
+        if "fail" in report:
+            return (False, "Found 'fail' in report\n" + report)
+
+        return (True, "")
+
+    def reload(self):
+        '''Reload daemon'''
+        rc, report = cmd([self.initscript, 'force-reload'])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        if expected != rc:
+            return (False, result + report)
+
+        if "fail" in report:
+            return (False, "Found 'fail' in report\n" + report)
+
+        return (True, "")
+
+    def restart(self):
+        '''Restart daemon'''
+        (res, str) = self.stop()
+        if not res:
+            return (res, str)
+
+        (res, str) = self.start()
+        if not res:
+            return (res, str)
+
+        return (True, "")
+
+    def status(self):
+        '''Check daemon status'''
+        rc, report = cmd([self.initscript, 'status'])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        if expected != rc:
+            return (False, result + report)
+
+        if "fail" in report:
+            return (False, "Found 'fail' in report\n" + report)
+
+        return (True, "")
+
+class TestlibManager(object):
+    '''Singleton class used to set up per-test-run information'''
+    def __init__(self):
+        # Set glibc aborts to dump to stderr instead of the tty so test output
+        # is more sane.
+        os.environ.setdefault('LIBC_FATAL_STDERR_','1')
+
+        # check verbosity
+        self.verbosity = False
+        if (len(sys.argv) > 1 and '-v' in sys.argv[1:]):
+            self.verbosity = True
+
+        # Load LSB release file
+        self.lsb_release = dict()
+        if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'):
+            raise OSError, "Please install 'lsb-release'"
+        for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].splitlines():
+            field, value = line.split(':',1)
+            value=value.strip()
+            field=field.strip()
+            # Convert numerics
+            try:
+                value = float(value)
+            except:
+                pass
+            self.lsb_release.setdefault(field,value)
+
+        # FIXME: hack OEM releases into known-Ubuntu versions
+        if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)":
+            if self.lsb_release['Release'] == 1.0:
+                self.lsb_release['Distributor ID'] = "Ubuntu"
+                self.lsb_release['Release'] = 8.04
+            else:
+                raise OSError, "Unknown version of HP MIE"
+
+        # FIXME: hack to assume a most-recent release if we're not
+        # running under Ubuntu.
+        if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]:
+            self.lsb_release['Release'] = 10000
+        # Adjust Linaro release to pretend to be Ubuntu
+        if self.lsb_release['Distributor ID'] in ["Linaro"]:
+       	    self.lsb_release['Distributor ID'] = "Ubuntu"
+            self.lsb_release['Release'] -= 0.01
+
+        # Load arch
+        if not os.path.exists('/usr/bin/dpkg'):
+            machine = cmd(['uname','-m'])[1].strip()
+            if machine.endswith('86'):
+                self.dpkg_arch = 'i386'
+            elif machine.endswith('_64'):
+                self.dpkg_arch = 'amd64'
+            elif machine.startswith('arm'):
+                self.dpkg_arch = 'armel'
+            else:
+                raise ValueError, "Unknown machine type '%s'" % (machine)
+        else:
+            self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip()
+
+        # Find kernel version
+        self.kernel_is_ubuntu = False
+        self.kernel_version_signature = None
+        self.kernel_version = cmd(["uname","-r"])[1].strip()
+        versig = '/proc/version_signature'
+        if os.path.exists(versig):
+            self.kernel_is_ubuntu = True
+            self.kernel_version_signature = file(versig).read().strip()
+            self.kernel_version_ubuntu = self.kernel_version
+        elif os.path.exists('/usr/bin/dpkg'):
+            # this can easily be inaccurate but is only an issue for Dapper
+            rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)])
+            if rc == 0:
+                self.kernel_version_signature = out.strip().split('\n').pop().split()[2]
+                self.kernel_version_ubuntu = self.kernel_version_signature
+        if self.kernel_version_signature == None:
+            # Attempt to fall back to something for non-Debian-based
+            self.kernel_version_signature = self.kernel_version
+            self.kernel_version_ubuntu = self.kernel_version
+        # Build ubuntu version without hardware suffix
+        try:
+            self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)])
+        except:
+            pass
+
+        # Find gcc version
+        self.gcc_version = get_gcc_version('gcc')
+
+        # Find libc
+        self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0]
+
+        # Report self
+        if self.verbosity:
+            kernel = self.kernel_version_ubuntu
+            if kernel != self.kernel_version_signature:
+                kernel += " (%s)" % (self.kernel_version_signature)
+            print >>sys.stdout, "Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( \
+                sys.argv[0],
+                self.lsb_release['Distributor ID'],
+                self.lsb_release['Release'],
+                kernel,
+                self.dpkg_arch,
+                os.geteuid(), os.getuid(),
+                os.environ.get('SUDO_USER', ''))
+            sys.stdout.flush()
+
+        # Additional heuristics
+        #if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']:
+        #    sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000))
+        #    sys.stdout.flush()
+        #    time.sleep(0.5)
+        #    sys.stdout.write("destroyed\n")
+        #    time.sleep(0.5)
+
+    def hello(self, msg):
+        print >>sys.stderr, "Hello from %s" % (msg)
+# The central instance
+manager = TestlibManager()
+
+class TestlibCase(unittest.TestCase):
+    def __init__(self, *args):
+        '''This is called for each TestCase test instance, which isn't much better
+           than SetUp.'''
+
+        unittest.TestCase.__init__(self, *args)
+
+        # Attach to and duplicate dicts from manager singleton
+        self.manager = manager
+        #self.manager.hello(repr(self) + repr(*args))
+        self.my_verbosity = self.manager.verbosity
+        self.lsb_release = self.manager.lsb_release
+        self.dpkg_arch = self.manager.dpkg_arch
+        self.kernel_version = self.manager.kernel_version
+        self.kernel_version_signature = self.manager.kernel_version_signature
+        self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu
+        self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu
+        self.gcc_version = self.manager.gcc_version
+        self.path_libc = self.manager.path_libc
+
+    def version_compare(self, one, two):
+        return apt_pkg.VersionCompare(one,two)
+
+    def assertFileType(self, filename, filetype):
+        '''Checks the file type of the file specified'''
+
+        (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename])
+        out = out.strip()
+        expected = 0
+        # Absolutely no idea why this happens on Hardy
+        if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0:
+            rc = 0
+        result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report)
+        self.assertEquals(expected, rc, result)
+
+        filetype = '^%s$' % (filetype)
+        result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype)
+        self.assertNotEquals(None, re.search(filetype, out), result)
+
+    def yank_commonname_from_cert(self, certfile):
+        '''Extract the commonName from a given PEM'''
+        rc, out = cmd(['openssl','asn1parse','-in',certfile])
+        if rc == 0:
+            ready = False
+            for line in out.splitlines():
+                if ready:
+                    return line.split(':')[-1]
+                if ':commonName' in line:
+                    ready = True
+        return socket.getfqdn()
+
+    def announce(self, text):
+        if self.my_verbosity:
+            print >>sys.stdout, "(%s) " % (text),
+            sys.stdout.flush()
+
+    def make_clean(self):
+        rc, output = self.shell_cmd(['make','clean'])
+        self.assertEquals(rc, 0, output)
+
+    def get_makefile_compiler(self):
+        # Find potential compiler name
+        compiler = 'gcc'
+        if os.path.exists('Makefile'):
+            for line in open('Makefile'):
+                if line.startswith('CC') and '=' in line:
+                    items = [x.strip() for x in line.split('=')]
+                    if items[0] == 'CC':
+                        compiler = items[1]
+                        break
+        return compiler
+
+    def make_target(self, target, expected=0):
+        '''Compile a target and report output'''
+
+        compiler = self.get_makefile_compiler()
+        rc, output = self.shell_cmd(['make',target])
+        self.assertEquals(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output)
+        self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output)
+        return output
+
+    # call as   return testlib.skipped()
+    def _skipped(self, reason=""):
+        '''Provide a visible way to indicate that a test was skipped'''
+        if reason != "":
+            reason = ': %s' % (reason)
+        self.announce("skipped%s" % (reason))
+        return False
+
+    def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
+        argstr = "'" + "', '".join(args).strip() + "'"
+        rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr)
+        report = 'Command: ' + argstr + '\nOutput:\n' + out
+        return rc, report, out
+
+    def shell_cmd(self, args, stdin=None):
+        return cmd(args,stdin=stdin)
+
+    def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
+        '''Test a shell command matches a specific exit code'''
+        rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, msg + result + report)
+
+    def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
+        '''Test a shell command doesn't match a specific exit code'''
+        rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
+        result = 'Got (unwanted) exit code %d\n' % rc
+        self.assertNotEquals(unwanted, rc, msg + result + report)
+
+    def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False):
+        '''Test a shell command contains a specific output'''
+        rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
+        result = 'Got exit code %d.  Looking for text "%s"\n' % (rc, text)
+        if not invert:
+            self.assertTrue(text in out, msg + result + report)
+        else:
+            self.assertFalse(text in out, msg + result + report)
+
+    def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
+        '''Test a shell command matches a specific output'''
+        rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
+        result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args))
+        if not invert:
+            self.assertEquals(text, out, msg + result + report)
+        else:
+            self.assertNotEquals(text, out, msg + result + report)
+        if expected != None:
+            result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
+            self.assertEquals(rc, expected, msg + result + report)
+
+    def _word_find(self, report, content, invert=False):
+        '''Check for a specific string'''
+        if invert:
+            warning = 'Found "%s"\n' % content
+            self.assertTrue(content not in report, warning + report)
+        else:
+            warning = 'Could not find "%s"\n' % content
+            self.assertTrue(content in report, warning + report)
+
+    def _test_sysctl_value(self, path, expected, msg=None, exists=True):
+        sysctl = '/proc/sys/%s' % (path)
+        self.assertEquals(exists, os.path.exists(sysctl), sysctl)
+        value = None
+        if exists:
+            value = int(file(sysctl).read())
+            report = "%s is not %d: %d" % (sysctl, expected, value)
+            if msg:
+                report += " (%s)" % (msg)
+            self.assertEquals(value, expected, report)
+        return value
+
+    def set_sysctl_value(self, path, desired):
+        sysctl = '/proc/sys/%s' % (path)
+        self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl))
+        file(sysctl,'w').write(str(desired))
+        self._test_sysctl_value(path, desired)
+
+    def kernel_at_least(self, introduced):
+        return self.version_compare(self.kernel_version_ubuntu,
+                                    introduced) >= 0
+
+    def kernel_claims_cve_fixed(self, cve):
+        changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version)
+        if os.path.exists(changelog):
+            for line in gzip.open(changelog):
+                if cve in line and not "revert" in line and not "Revert" in line:
+                    return True
+        return False
+
+class TestGroup:
+    '''Create a temporary test group and remove it again in the dtor.'''
+
+    def __init__(self, group=None, lower=False):
+        '''Create a new group'''
+
+        self.group = None
+        if group:
+            if group_exists(group):
+                raise ValueError, 'group name already exists'
+        else:
+            while(True):
+                group = random_string(7,lower=lower)
+                if not group_exists(group):
+                    break
+
+        assert subprocess.call(['groupadd',group]) == 0
+        self.group = group
+        g = grp.getgrnam(self.group)
+        self.gid = g[2]
+
+    def __del__(self):
+        '''Remove the created group.'''
+
+        if self.group:
+            rc, report = cmd(['groupdel', self.group])
+            assert rc == 0
+
+class TestUser:
+    '''Create a temporary test user and remove it again in the dtor.'''
+
+    def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None):
+        '''Create a new user account with a random password.
+
+        By default, the login name is random, too, but can be explicitly
+        specified with 'login'. By default, a home directory is created, this
+        can be suppressed with 'home=False'.'''
+
+        self.login = None
+
+        if os.geteuid() != 0:
+            raise ValueError, "You must be root to run this test"
+
+        if login:
+            if login_exists(login):
+                raise ValueError, 'login name already exists'
+        else:
+            while(True):
+                login = 't' + random_string(7,lower=lower)
+                if not login_exists(login):
+                    break
+
+        self.salt = random_string(2)
+        self.password = random_string(8,lower=lower)
+        self.crypted = crypt.crypt(self.password, self.salt)
+
+        creation = ['useradd', '-p', self.crypted]
+        if home:
+            creation += ['-m']
+        if group:
+            creation += ['-G',group]
+        if uidmin:
+            creation += ['-K','UID_MIN=%d'%uidmin]
+        if shell:
+            creation += ['-s',shell]
+        creation += [login]
+        assert subprocess.call(creation) == 0
+        # Set GECOS
+        assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0
+
+        self.login = login
+        p = pwd.getpwnam(self.login)
+        self.uid   = p[2]
+        self.gid   = p[3]
+        self.gecos = p[4]
+        self.home  = p[5]
+        self.shell = p[6]
+
+    def __del__(self):
+        '''Remove the created user account.'''
+
+        if self.login:
+            # sanity check the login name so we don't accidentally wipe too much
+            if len(self.login)>3 and not '/' in self.login:
+                subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login])
+            rc, report = cmd(['userdel', '-f', self.login])
+            assert rc == 0
+
+    def add_to_group(self, group):
+        '''Add user to the specified group name'''
+        rc, report = cmd(['usermod', '-G', group, self.login])
+        if rc != 0:
+            print report
+        assert rc == 0
+
+# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
+class TimeoutFunctionException(Exception):
+    """Exception to raise on a timeout"""
+    pass
+class TimeoutFunction:
+    def __init__(self, function, timeout):
+        self.timeout = timeout
+        self.function = function
+
+    def handle_timeout(self, signum, frame):
+        raise TimeoutFunctionException()
+
+    def __call__(self, *args, **kwargs):
+        old = signal.signal(signal.SIGALRM, self.handle_timeout)
+        signal.alarm(self.timeout)
+        try:
+            result = self.function(*args, **kwargs)
+        finally:
+            signal.signal(signal.SIGALRM, old)
+        signal.alarm(0)
+        return result
+
+def main():
+    print "hi"
+    unittest.main()

Added: trunk/debian/tests/testlib_httpd.py
===================================================================
--- trunk/debian/tests/testlib_httpd.py	                        (rev 0)
+++ trunk/debian/tests/testlib_httpd.py	2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,353 @@
+#!/usr/bin/python
+#
+#    testlib_httpd.py quality assurance test script
+#    Copyright (C) 2008-2013 Canonical Ltd.
+#    Author: Jamie Strandboge <jamie at canonical.com>
+#    Author: Marc Deslauriers <marc.deslauriers at canonical.com>
+#
+#    This program is free software: you can redistribute it and/or modify
+#    it under the terms of the GNU General Public License version 3,
+#    as published by the Free Software Foundation.
+#
+#    This program is distributed in the hope that it will be useful,
+#    but WITHOUT ANY WARRANTY; without even the implied warranty of
+#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+#    GNU General Public License for more details.
+#
+#    You should have received a copy of the GNU General Public License
+#    along with this program.  If not, see <httpd://www.gnu.org/licenses/>.
+#
+
+import unittest, subprocess
+import os
+import sys
+import tempfile
+import testlib
+import time
+import socket
+import shutil
+import cookielib
+import urllib2
+import re
+import base64
+
+class HttpdCommon(testlib.TestlibCase):
+    '''Common functions'''
+    def _setUp(self, clearlogs = False):
+        '''Setup'''
+        self.release = self.lsb_release['Codename']
+        self.html_page = "/var/www/test.html"
+        self.php_page = "/var/www/test.php"
+        self.cgi_page = "/usr/lib/cgi-bin/test-cgi.pl"
+        self.apache2_default = "/etc/default/apache2"
+        self.ssl_key = "/etc/ssl/private/server.key"
+        self.ssl_crt = "/etc/ssl/certs/server.crt"
+        self.ssl_site = "/etc/apache2/sites-enabled/999-testlib"
+        self.ports_file = "/etc/apache2/ports.conf"
+        self.access_log = "/var/log/apache2/access.log"
+        self.error_log = "/var/log/apache2/error.log"
+        if not hasattr(self, 'initscript'):
+            self._set_initscript("/etc/init.d/apache2")
+
+        # Dapper's apache2 is disabled by default
+        if self.lsb_release['Release'] == 6.06:
+            testlib.config_replace(self.apache2_default, "", append=True)
+            subprocess.call(['sed', '-i', 's/NO_START=1/NO_START=0/', self.apache2_default])
+
+        self._stop()
+        if clearlogs == True:
+            self._clearlogs()
+        self._start()
+
+    def _set_initscript(self, initscript):
+        self.initscript = initscript
+
+    def _tearDown(self):
+        '''Clean up after each test_* function'''
+        self._stop()
+        time.sleep(2)
+        if os.path.exists(self.html_page):
+            os.unlink(self.html_page)
+        if os.path.exists(self.php_page):
+            os.unlink(self.php_page)
+        if os.path.exists(self.cgi_page):
+            os.unlink(self.cgi_page)
+        if os.path.exists(self.ssl_key):
+            os.unlink(self.ssl_key)
+        if os.path.exists(self.ssl_crt):
+            os.unlink(self.ssl_crt)
+        if os.path.exists(self.ssl_site):
+            os.unlink(self.ssl_site)
+        self._disable_mod("ssl")
+        testlib.config_restore(self.ports_file)
+        testlib.config_restore(self.apache2_default)
+
+    def _start(self):
+        '''Start httpd'''
+        #print self.initscript,"start"
+        rc, report = testlib.cmd([self.initscript, 'start'])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+        time.sleep(2)
+
+    def _stop(self):
+        '''Stop httpd'''
+        #print self.initscript,"stop"
+        rc, report = testlib.cmd([self.initscript, 'stop'])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+
+    def _clearlogs(self):
+        '''Clear httpd logs'''
+        if os.path.exists(self.access_log):
+            os.unlink(self.access_log)
+        if os.path.exists(self.error_log):
+            os.unlink(self.error_log)
+
+    def __disable_mod(self, mod):
+        if not os.path.exists(os.path.join("/etc/apache2/mods-available", mod + \
+                                       ".load")):
+            return
+        if not os.path.exists("/usr/sbin/a2dismod"):
+            return
+        rc, report = testlib.cmd(['a2dismod', mod])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+
+    def _disable_mod(self, mod):
+        self.__disable_mod(mod)
+        self._restart()
+        time.sleep(2)
+
+    def _disable_mods(self, mods):
+        '''take a list of modules to disable'''
+        for mod in mods:
+            self.__disable_mod(mod)
+        self._restart()
+        time.sleep(2)
+
+    def __enable_mod(self, mod):
+        rc, report = testlib.cmd(['a2enmod', mod])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+
+    def _enable_mod(self, mod):
+        self.__enable_mod(mod)
+        # for some reason, force-reload doesn't work
+        # if self.lsb_release['Release'] >= 8.04:
+        #    self._reload()
+        # else:
+        self._restart()
+        time.sleep(2)
+
+    def _enable_mods(self, mods):
+        '''take a list of modules to enable'''
+        for mod in mods:
+            self.__enable_mod(mod)
+        # for some reason, force-reload doesn't work
+        # if self.lsb_release['Release'] >= 8.04:
+        #    self._reload()
+        # else:
+        self._restart()
+        time.sleep(2)
+
+    def _disable_site(self, sitename):
+        rc, report = testlib.cmd(['a2dissite', sitename])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+        self._restart()
+        time.sleep(2)
+
+    def _enable_site(self, sitename):
+        rc, report = testlib.cmd(['a2ensite', sitename])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+        # for some reason, force-reload doesn't work
+        # if self.lsb_release['Release'] >= 8.04:
+        #    self._reload()
+        #else:
+        self._restart()
+        time.sleep(2)
+
+    def _reload(self):
+        '''Reload httpd'''
+        rc, report = testlib.cmd([self.initscript, 'force-reload'])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+
+    def _restart(self):
+        '''Restart httpd'''
+        self._stop()
+        self._start()
+
+    def _prepare_ssl(self, srvkey, srvcert):
+        '''Prepare Apache for ssl connections'''
+        self._enable_mod("ssl")
+
+        # copy instead of rename so we don't get invalid cross-device link errors
+        shutil.copy(srvkey, self.ssl_key)
+        shutil.copy(srvcert, self.ssl_crt)
+
+        if self.lsb_release['Release'] <= 7.04:
+            testlib.config_replace(self.ports_file, "Listen 443", True)
+
+        # create the conffile entry
+        site_contents = '''
+NameVirtualHost *:443
+<VirtualHost *:443>
+        SSLEngine on
+        SSLOptions +StrictRequire
+        SSLCertificateFile /etc/ssl/certs/server.crt
+        SSLCertificateKeyFile /etc/ssl/private/server.key
+
+        ServerAdmin webmaster at localhost
+
+        DocumentRoot /var/www/
+        ErrorLog /var/log/apache2/error.log
+
+        # Possible values include: debug, info, notice, warn, error, crit,
+        # alert, emerg.
+        LogLevel warn
+
+        CustomLog /var/log/apache2/access.log combined
+        ServerSignature On
+</VirtualHost>
+'''
+        testlib.create_fill(self.ssl_site, site_contents)
+        self._reload()
+
+    def _test_url_proxy(self, url="http://localhost/", content="", proxy="localhost:3128"):
+        '''Test the given url'''
+        rc, report = testlib.cmd(['elinks', '-verbose', '2', '-no-home', '1', '-eval', 'set protocol.ftp.proxy.host = "%s"' %(proxy), '-eval', 'set protocol.http.proxy.host = "%s"' %(proxy), '-eval', 'set protocol.https.proxy.host = "%s"' %(proxy), '-dump', url])
+        expected = 0
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+
+        if content != "":
+            self._word_find(report, content)
+
+    def _test_url(self, url="http://localhost/", content="", invert=False, source=False):
+        '''Test the given url'''
+        if source:
+            report = self._get_page_source(url)
+        else:
+            report = self._get_page(url)
+
+        if content != "":
+            self._word_find(report, content, invert)
+
+    def _get_page_source(self, url="http://localhost/", data='', headers=None):
+        '''Fetch html source'''
+        cookies = "/tmp/cookies.lwp"
+        testlib.create_fill(cookies, "#LWP-Cookies-2.0")
+
+        if headers == None:
+            headers = {'User-agent' : 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
+
+        clean_url = url
+        if re.search(r'http(|s)://.*:.*@[a-z].*', url):
+            tmp = re.sub(r'^http(|s)://', '', url)
+            username = tmp.split('@')[0].split(':')[0]
+            password = tmp.split('@')[0].split(':')[1]
+            base64_str = base64.encodestring('%s:%s' % (username, password))[:-1]
+            headers['Authorization'] = "Basic %s" % (base64_str)
+            # strip out the username and password from the url
+            clean_url = re.sub(r'%s:%s@' % (username, password), '', url)
+
+        cj = cookielib.LWPCookieJar(filename=cookies)
+        cj.load()
+
+        opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
+        urllib2.install_opener(opener)
+
+        try:
+            if data != '':
+                req = urllib2.Request(clean_url, data, headers)
+            else:
+                req = urllib2.Request(clean_url, headers=headers)
+        except:
+            raise
+
+        tries = 0
+        failed = True
+        while tries < 3:
+            try:
+                handle = urllib2.urlopen(req)
+                failed = False
+                break
+            except urllib2.HTTPError, e:
+                raise
+                if e.code != 503:
+                    # for debugging
+                    #print >>sys.stderr, 'Error retrieving page "url=%s", "data=%s"' % (url, data)
+                    raise
+            tries += 1
+            time.sleep(2)
+
+        self.assertFalse(failed, 'Could not retrieve page "url=%s", "data=%s"' % (url, data))
+        html = handle.read()
+        cj.save()
+
+        return html
+
+    def _get_page(self, url="http://localhost/"):
+        '''Get contents of given url'''
+        rc, report = testlib.cmd(['elinks', '-verbose', '2', '-no-home', '1', '-dump', url])
+        expected = 0
+
+        result = 'Got exit code %d, expected %d\n' % (rc, expected)
+        self.assertEquals(expected, rc, result + report)
+
+        return report
+
+    def _test_raw(self, request="", content="", host="localhost", port=80, invert = False, limit=1024):
+        '''Test the given url with a raw socket to include headers'''
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.connect((host, port))
+        s.send(request)
+        data = s.recv(limit)
+        s.close()
+
+        if content != "":
+            self._word_find(data, content, invert = invert)
+
+def create_php_page(page, php_content=None):
+    '''Create a basic php page'''
+
+    # complexity here is due to maintaining interface compatability when
+    # php_content is not provided
+    if not php_content:
+        str = "php works"
+        php_content = "echo '" + str + "'; "
+    else:
+        str = php_content
+    script = '''<?php
+%s
+?>''' %(php_content)
+    testlib.create_fill(page, script)
+    return str
+
+def create_perl_script(page):
+    '''Create a basic perl script'''
+    str = "perl works"
+    script = '''#!/usr/bin/perl
+print "Content-Type: text/plain\\n\\n";
+print "''' + str + '''\\n";
+
+'''
+    testlib.create_fill(page, script, 0755)
+
+    return str
+
+def create_html_page(page):
+    '''Create html page'''
+    str = "html works"
+    testlib.create_fill(page, "<html><body>" + str + "</body></html>")
+    return str




More information about the Pkg-mailman-hackers mailing list