[Pkg-mailman-hackers] Pkg-mailman commit - rev 738 - in trunk/debian: . tests
Thijs Kinkhorst
thijs at alioth.debian.org
Sun Aug 4 11:00:55 UTC 2013
Author: thijs
Date: 2013-08-04 11:00:53 +0000 (Sun, 04 Aug 2013)
New Revision: 738
Added:
trunk/debian/tests/
trunk/debian/tests/control
trunk/debian/tests/mailman
trunk/debian/tests/test-mailman.py
trunk/debian/tests/testlib.py
trunk/debian/tests/testlib_httpd.py
Modified:
trunk/debian/changelog
trunk/debian/control
Log:
Add autopkgtests
Modified: trunk/debian/changelog
===================================================================
--- trunk/debian/changelog 2013-08-04 10:56:58 UTC (rev 737)
+++ trunk/debian/changelog 2013-08-04 11:00:53 UTC (rev 738)
@@ -8,6 +8,7 @@
* Updates to Russian debconf templates, thanks Ivan Krylov!
(closes: #710268).
* Needs at least version 3.8.0 of logrotate (closes: #687215).
+ * Add autopkgtests, thanks Yolanda Robla! (closes: #710095)
* Packaging cleanup: checked for policy 3.9.4, update Vcs URL,
recommend default-mta instead of exim4.
Modified: trunk/debian/control
===================================================================
--- trunk/debian/control 2013-08-04 10:56:58 UTC (rev 737)
+++ trunk/debian/control 2013-08-04 11:00:53 UTC (rev 738)
@@ -12,6 +12,7 @@
Vcs-Svn: svn://anonscm.debian.org/pkg-mailman/trunk
Vcs-Browser: http://anonscm.debian.org/viewvc/pkg-mailman/trunk/
X-Python-Version: >= 2.4
+XS-Testsuite: autopkgtest
Package: mailman
Architecture: any
Added: trunk/debian/tests/control
===================================================================
--- trunk/debian/tests/control (rev 0)
+++ trunk/debian/tests/control 2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,3 @@
+Tests: mailman
+Depends: @, apache2-mpm-prefork, elinks, postfix, procmail
+Restrictions: needs-root
Added: trunk/debian/tests/mailman
===================================================================
--- trunk/debian/tests/mailman (rev 0)
+++ trunk/debian/tests/mailman 2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,6 @@
+#!/bin/bash
+#----------------
+# Testing mailman
+#----------------
+set -e
+python `dirname $0`/test-mailman.py 2>&1
Added: trunk/debian/tests/test-mailman.py
===================================================================
--- trunk/debian/tests/test-mailman.py (rev 0)
+++ trunk/debian/tests/test-mailman.py 2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,459 @@
+#!/usr/bin/python
+#
+# test-mailman.py quality assurance test script for mailman
+# Copyright (C) 2010 Canonical Ltd.
+# Author: Marc Deslauriers <marc.deslauriers at canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# packages required for test to run:
+# QRT-Packages: apache2-mpm-prefork elinks postfix mailman procmail
+# packages where more than one package can satisfy a runtime requirement:
+# QRT-Alternates:
+# files and directories required for the test to run:
+# QRT-Depends: testlib_httpd.py
+# privilege required for the test to run (remove line if running as user is okay):
+# QRT-Privilege: root
+# QRT-Conflicts: apache2-mpm-event apache2-mpm-itk apache2-mpm-worker exim4
+
+'''
+ In general, this test should be run in a virtual machine (VM) or possibly
+ a chroot and not on a production machine. While efforts are made to make
+ these tests non-destructive, there is no guarantee this script will not
+ alter the machine. You have been warned.
+
+ When installing mailman, be sure to install the english language files
+ When postfix installs, select "local only"
+
+ How to run in a clean VM:
+ $ sudo apt-get -y install python-unit <QRT-Packages> && sudo ./test-mailman.py -v'
+
+ How to run in a clean schroot named 'lucid':
+ $ schroot -c lucid -u root -- sh -c 'apt-get -y install python-unit <QRT-Packages> && ./test-mailman.py -v'
+'''
+
+
+import unittest, subprocess, sys, os, time, smtplib
+import urllib, urllib2, cookielib, re, tempfile
+import testlib
+import testlib_httpd
+
+try:
+ from private.qrt.mailman import PrivateMailmanTest
+except ImportError:
+ class PrivateMailmanTest(object):
+ '''Empty class'''
+ print >>sys.stdout, "Skipping private tests"
+
+class MailmanTest(testlib_httpd.HttpdCommon, PrivateMailmanTest):
+ '''Test Mailman.'''
+
+ def setUp(self):
+ '''Set up prior to each test_* function'''
+ self.mailman_daemon = testlib.TestDaemon("/etc/init.d/mailman")
+ self.mailman_cfg = '/etc/mailman/mm_cfg.py'
+ self.mailman_aliases = '/var/lib/mailman/data/aliases'
+ self.mailman_pid = '/var/run/mailman/mailman.pid'
+ self.postfix_daemon = testlib.TestDaemon("/etc/init.d/postfix")
+ self.postfix_mastercf = '/etc/postfix/master.cf'
+ self.postfix_maincf = '/etc/postfix/main.cf'
+ self.postfix_transport = '/etc/postfix/transportqrt'
+ self.postfix_aliases = '/etc/aliases'
+ self.apache_pid = "/var/run/apache2.pid"
+ self.ports_file = "/etc/apache2/ports.conf"
+ self.default_site = "/etc/apache2/sites-available/default"
+ self.mailman_site = "/etc/apache2/sites-enabled/mailman"
+ self.tempdir = tempfile.mkdtemp()
+
+ self.cj = cookielib.LWPCookieJar()
+ self.opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(self.cj))
+
+ # Make sure daemons are stopped before we begin
+ self.postfix_daemon.stop()
+ self.mailman_daemon.stop()
+
+ testlib.config_replace(self.mailman_aliases, "", append=True)
+ testlib.config_set(self.mailman_cfg,'MTA',"'Postfix'")
+ subprocess.call(['/usr/lib/mailman/bin/genaliases'], stdout=subprocess.PIPE)
+ subprocess.call(['chown', 'root:list', self.mailman_aliases])
+ # Is this a packaging mistake?
+ subprocess.call(['chown', 'list:list', '/var/lib/mailman/archives/private'])
+
+ self._zap_lists()
+ subprocess.call(['/usr/sbin/newlist', '-q', 'mailman at lists.example.com', 'root at example.com' ,'ubuntu'], stdout=subprocess.PIPE)
+
+ self._setUp_postfix()
+ self._setUp_apache()
+
+ self.mailman_daemon.restart()
+
+ self.user = testlib.TestUser(lower=True)
+ self.s = None
+ # Silently allow for this connection to fail, to handle the
+ # initial setup of the postfix server.
+ try:
+ self.s = smtplib.SMTP('localhost', port=25)
+ except:
+ pass
+
+ def tearDown(self):
+ '''Clean up after each test_* function'''
+
+ try:
+ self.s.quit()
+ except:
+ pass
+ self.user = None
+
+ self._zap_lists()
+
+ if os.path.exists(self.tempdir):
+ testlib.recursive_rm(self.tempdir)
+
+ testlib.config_restore(self.mailman_cfg)
+ testlib.config_restore(self.mailman_aliases)
+
+ self._tearDown_postfix()
+ self._tearDown_apache()
+
+ self.mailman_daemon.stop()
+
+ def _zap_lists(self):
+ '''Remove existing mailman lists.'''
+
+ if os.path.exists('/var/lib/mailman/lists/testlist'):
+ subprocess.call(['/usr/sbin/rmlist', '-a', 'testlist'], stdout=subprocess.PIPE)
+ if os.path.exists('/var/lib/mailman/lists/mailman'):
+ subprocess.call(['/usr/sbin/rmlist', '-a', 'mailman'], stdout=subprocess.PIPE)
+
+ def _setUp_postfix(self):
+ '''Create Postfix server configs.'''
+ testlib.config_replace(self.postfix_mastercf, "", append=True)
+
+ testlib.config_set(self.postfix_maincf,'mydestination','example.com, localhost.localdomain, localhost')
+
+ # Move listener to localhost:25
+ master = open('/etc/postfix/master.cf.new','w')
+ for cfline in open(self.postfix_mastercf):
+ if cfline.startswith('smtp') and 'smtpd' in cfline and 'inet' in cfline:
+ master.write('127.0.0.1:25 inet n - - - - smtpd\n')
+ else:
+ master.write(cfline)
+ master.write('''mailman unix - n n - - pipe
+ flags=FR user=list argv=/usr/lib/mailman/bin/postfix-to-mailman.py
+ ${nexthop} ${user}''')
+ master.close()
+ os.rename('/etc/postfix/master.cf.new',self.postfix_mastercf)
+
+ # Use mbox only
+ testlib.config_comment(self.postfix_maincf,'home_mailbox')
+ testlib.config_set(self.postfix_maincf,'mailbox_command','procmail -a "$EXTENSION"')
+
+ # Config mailman
+ testlib.config_set(self.postfix_maincf,'relay_domains','lists.example.com')
+ testlib.config_set(self.postfix_maincf,'transport_maps','hash:%s' % self.postfix_transport)
+ testlib.config_set(self.postfix_maincf,'mailman_destination_recipient_limit','1')
+ testlib.config_set(self.postfix_maincf,'alias_maps','hash:%s, hash:%s' % (self.postfix_aliases,self.mailman_aliases))
+
+ testlib.config_replace(self.postfix_transport, "lists.example.com mailman:")
+ subprocess.call(['postmap', self.postfix_transport], stdout=subprocess.PIPE)
+
+ testlib.config_replace(self.postfix_aliases, '''mailman: "|/var/lib/mailman/mail/mailman post mailman"
+mailman-admin: "|/var/lib/mailman/mail/mailman admin mailman"
+mailman-bounces: "|/var/lib/mailman/mail/mailman bounces mailman"
+mailman-confirm: "|/var/lib/mailman/mail/mailman confirm mailman"
+mailman-join: "|/var/lib/mailman/mail/mailman join mailman"
+mailman-leave: "|/var/lib/mailman/mail/mailman leave mailman"
+mailman-owner: "|/var/lib/mailman/mail/mailman owner mailman"
+mailman-request: "|/var/lib/mailman/mail/mailman request mailman"
+mailman-subscribe: "|/var/lib/mailman/mail/mailman subscribe mailman"
+mailman-unsubscribe: "|/var/lib/mailman/mail/mailman unsubscribe mailman"''', append=True)
+
+ subprocess.call(['chown', 'root:list', self.postfix_aliases])
+ subprocess.call(['newaliases'])
+
+ # Restart server
+ self.postfix_daemon.restart()
+ # Postfix exits its init script before the master listener has started
+ time.sleep(2)
+
+ def _tearDown_postfix(self):
+ '''Tear down Postfix'''
+
+ self.postfix_daemon.stop()
+
+ testlib.config_restore(self.postfix_mastercf)
+ testlib.config_restore(self.postfix_maincf)
+ testlib.config_restore(self.postfix_aliases)
+
+ subprocess.call(['chown', 'root:root', self.postfix_aliases])
+
+ if os.path.exists(self.postfix_transport):
+ os.unlink(self.postfix_transport)
+ if os.path.exists(self.postfix_transport + ".db"):
+ os.unlink(self.postfix_transport + ".db")
+
+ def _setUp_apache(self):
+ '''Set up Apache'''
+
+ # Change the default port, so we can run in a schroot
+ testlib.config_replace(self.ports_file, "", append=True)
+ subprocess.call(['sed', '-i', 's/80/8000/g', self.ports_file])
+ testlib.config_replace(self.default_site, "", append=True)
+ subprocess.call(['sed', '-i', 's/80/8000/g', self.default_site])
+
+ if os.path.exists(self.mailman_site):
+ os.unlink(self.mailman_site)
+
+ if self.lsb_release['Release'] == 6.06:
+ self._dapper_apache_conf()
+ else:
+ os.symlink("/etc/mailman/apache.conf", self.mailman_site)
+
+ testlib_httpd.HttpdCommon._setUp(self)
+
+ def _tearDown_apache(self):
+ '''Tear down Apache'''
+
+ if os.path.exists(self.mailman_site):
+ os.unlink(self.mailman_site)
+
+ testlib.config_restore(self.ports_file)
+ testlib.config_restore(self.default_site)
+ testlib_httpd.HttpdCommon._tearDown(self)
+
+ def _dapper_apache_conf(self):
+ '''Create an example apache conf file for Dapper.'''
+ conf_file = open(self.mailman_site,'w')
+ conf_file.write('''ScriptAlias /cgi-bin/mailman/ /usr/lib/cgi-bin/mailman/
+Alias /pipermail/ /var/lib/mailman/archives/public/
+Alias /images/mailman/ /usr/share/images/mailman/
+<Directory /usr/lib/cgi-bin/mailman/>
+ AllowOverride None
+ Options ExecCGI
+ AddHandler cgi-script .cgi
+ Order allow,deny
+ Allow from all
+</Directory>
+<Directory /var/lib/mailman/archives/public/>
+ Options Indexes FollowSymlinks
+ AllowOverride None
+ Order allow,deny
+ Allow from all
+</Directory>
+<Directory /usr/share/images/mailman/>
+ AllowOverride None
+ Order allow,deny
+ Allow from all
+</Directory>
+''')
+ conf_file.close()
+
+ def _deliver_email(self, from_addr, to_addr, body):
+ '''Perform mail delivery'''
+ result = self.s.sendmail(from_addr, to_addr, body)
+
+ def _check_email(self, user, pattern, timeout=30):
+ '''Get mailman confirmation email'''
+ re_pattern = re.compile(pattern)
+ spool_file = '/var/mail/%s' % (user.login)
+ result = None
+ contents = ''
+ while timeout > 0:
+ if os.path.exists(spool_file):
+ contents = open(spool_file).read()
+ result = re_pattern.search(contents)
+ if result != None:
+ break
+ time.sleep(1)
+ timeout -= 1
+ self.assertTrue(timeout > 0, "Reached timeout searching for pattern in '%s'" % contents)
+ return result
+
+ def _get_confirmation(self, user):
+ '''Get mailman confirmation email'''
+ pattern = 'confirm (\w+)\n'
+ result = self._check_email(user, pattern)
+ return result.group(1)
+
+ def _test_roundtrip_mail(self, user):
+ '''Send and check email delivery'''
+
+ body = '''From: Rooty <root>
+To: "%s" <%s at example.com>
+Subject: This is test 1
+
+Hello, nice to meet you.
+''' % (user.gecos, user.login)
+
+ self._deliver_email('root', user.login + '@example.com', body)
+ pattern = "Subject: This is test 1"
+ self._check_email(user, pattern)
+
+ def test_aaa_daemons(self):
+ '''Test daemon'''
+
+ self.assertTrue(testlib.check_pidfile("apache2", self.apache_pid))
+ self.assertTrue(testlib.check_pidfile("python", self.mailman_pid))
+
+ def test_aaa_status(self):
+ '''Test status (apache2ctl)'''
+ rc, report = testlib.cmd(['apache2ctl', 'status'])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+
+ def test_aab_http(self):
+ '''Test http'''
+ self._test_url("http://localhost:8000/")
+
+ test_str = testlib_httpd.create_html_page(self.html_page)
+ self._test_url("http://localhost:8000/" + \
+ os.path.basename(self.html_page), test_str)
+ self._test_url("http://localhost:8000/cgi-bin/mailman/listinfo/mailman",
+ "About Mailman")
+
+ def test_aac_sending_mail_direct(self):
+ '''Test postfix mail delivery'''
+ self._test_roundtrip_mail(self.user)
+
+ def test_baa_mailman_subscribe(self):
+ '''Test mailman subscription'''
+
+ password = "Ubuntu"
+
+ # Try and subscribe to the Mailman list
+ values = { 'email': self.user.login + "@example.com",
+ 'pw': password,
+ 'pw-conf': password,
+ }
+ data = urllib.urlencode(values)
+ req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/subscribe/mailman', data)
+ result = self.opener.open(req).read()
+ self.assertTrue('Mailman Subscription results' in result, result)
+
+ # Parse the confirmation email
+ conf = self._get_confirmation(self.user)
+
+ # Now let's confirm via the web page
+ values = { 'cookie': conf,
+ 'realname': '',
+ 'digests': '0',
+ 'language': 'en',
+ 'submit': 'Subscribe to list Mailman',
+ }
+ data = urllib.urlencode(values)
+ req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/confirm/mailman', data)
+ result = self.opener.open(req).read()
+ self.assertTrue('successfully confirmed your subscription request' in result, result)
+
+ # Send an email to the list
+ body = '''From: "%s" <%s at example.com>
+To: "Mailman list" <mailman at lists.example.com>
+Subject: This is mailman test
+
+Yay! My first post. Ubuntu rocks!
+''' % (self.user.gecos, self.user.login)
+
+ self._deliver_email(self.user.login + '@example.com', 'mailman at lists.example.com', body)
+
+ # See if it was delivered
+ pattern = "Ubuntu rocks"
+ self._check_email(self.user, pattern)
+
+ def test_cve_2010_3089(self):
+ '''Test CVE-2010-3089'''
+
+ tempconf = os.path.join(self.tempdir, 'templist-config')
+
+ # Create a test list and insert XSS into description
+ subprocess.call(['/usr/sbin/newlist', '-q', 'testlist at lists.example.com', 'root at example.com' ,'ubuntu'], stdout=subprocess.PIPE)
+ subprocess.call(['/usr/sbin/config_list', '-o', tempconf, 'testlist'], stdout=subprocess.PIPE)
+ testlib.config_set(tempconf,'description',"'<XSSTEST>'")
+ subprocess.call(['/usr/sbin/config_list', '-i', tempconf, 'testlist'], stdout=subprocess.PIPE)
+
+ request = "GET /cgi-bin/mailman/listinfo/testlist HTTP/1.1\nHost: localhost\nConnection: close\n\n"
+ self._test_raw(request, '<XSSTEST>', port=8000, invert=True)
+ self._test_raw(request, '<XSSTEST>', port=8000)
+
+ def test_cve_2011_0707(self):
+ '''Test CVE-2011-0707'''
+
+ password = "Ubuntu"
+
+ # Try and subscribe to the Mailman list
+ values = { 'email': self.user.login + "@example.com",
+ 'realname': '<XSSTEST>',
+ 'pw': password,
+ 'pw-conf': password,
+ }
+ data = urllib.urlencode(values)
+ req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/subscribe/mailman', data)
+ result = self.opener.open(req).read()
+ self.assertTrue('Mailman Subscription results' in result, result)
+
+ # Parse the confirmation email
+ conf = self._get_confirmation(self.user)
+
+ # Now let's confirm via the web page
+ values = { 'cookie': conf,
+ 'realname': '<XSSTEST>',
+ 'digests': '0',
+ 'language': 'en',
+ 'submit': 'Subscribe to list Mailman',
+ }
+ data = urllib.urlencode(values)
+ req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/confirm/mailman', data)
+ result = self.opener.open(req).read()
+ self.assertTrue('successfully confirmed your subscription request' in result, result)
+
+ # Kill email so we can parse the new confirmation string
+ subprocess.call(['rm','-rf', '/var/mail/'+self.user.login])
+
+ # Look at the unsubscribe page
+ values = { 'email': self.user.login + "@example.com",
+ 'password': password,
+ 'language': 'en',
+ 'login-unsub': 'Unsubscribe',
+ }
+ data = urllib.urlencode(values)
+ req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/options/mailman', data)
+ result = self.opener.open(req).read()
+
+ # Parse the confirmation email
+ conf = self._get_confirmation(self.user)
+
+ # Now let's confirm via the web page
+ values = { 'cookie': conf,
+ 'email': self.user.login + "@example.com",
+ 'password': password,
+ 'language': 'en',
+ }
+ data = urllib.urlencode(values)
+ req = urllib2.Request('http://localhost:8000/cgi-bin/mailman/confirm/mailman', data)
+ result = self.opener.open(req).read()
+
+ self._word_find(result, '<XSSTEST>', invert=True)
+ self._word_find(result, '<XSSTEST>')
+
+
+if __name__ == '__main__':
+ # simple
+ unittest.main()
+
+ # more configurable
+ #suite = unittest.TestSuite()
+ #suite.addTest(unittest.TestLoader().loadTestsFromTestCase(PkgTest))
+ #rc = unittest.TextTestRunner(verbosity=2).run(suite)
+ #if not rc.wasSuccessful():
+ # sys.exit(1)
Added: trunk/debian/tests/testlib.py
===================================================================
--- trunk/debian/tests/testlib.py (rev 0)
+++ trunk/debian/tests/testlib.py 2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,1144 @@
+#
+# testlib.py quality assurance test script
+# Copyright (C) 2008-2011 Canonical Ltd.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Library General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Library General Public License for more details.
+#
+# You should have received a copy of the GNU Library General Public
+# License along with this program. If not, see
+# <http://www.gnu.org/licenses/>.
+#
+
+'''Common classes and functions for package tests.'''
+
+import string, random, crypt, subprocess, pwd, grp, signal, time, unittest, tempfile, shutil, os, os.path, re, glob
+import sys, socket, gzip
+from stat import *
+from encodings import string_escape
+
+import warnings
+warnings.filterwarnings('ignore', message=r'.*apt_pkg\.TagFile.*', category=DeprecationWarning)
+try:
+ import apt_pkg
+ apt_pkg.InitSystem();
+except:
+ # On non-Debian system, fall back to simple comparison without debianisms
+ class apt_pkg(object):
+ def VersionCompare(one, two):
+ list_one = one.split('.')
+ list_two = two.split('.')
+ while len(list_one)>0 and len(list_two)>0:
+ if list_one[0] > list_two[0]:
+ return 1
+ if list_one[0] < list_two[0]:
+ return -1
+ list_one.pop(0)
+ list_two.pop(0)
+ return 0
+
+bogus_nxdomain = "208.69.32.132"
+
+# http://www.chiark.greenend.org.uk/ucgi/~cjwatson/blosxom/2009-07-02-python-sigpipe.html
+# This is needed so that the subprocesses that produce endless output
+# actually quit when the reader goes away.
+import signal
+def subprocess_setup():
+ # Python installs a SIGPIPE handler by default. This is usually not what
+ # non-Python subprocesses expect.
+ signal.signal(signal.SIGPIPE, signal.SIG_DFL)
+
+class TimedOutException(Exception):
+ def __init__(self, value = "Timed Out"):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+def _restore_backup(path):
+ pathbackup = path + '.autotest'
+ if os.path.exists(pathbackup):
+ shutil.move(pathbackup, path)
+
+def _save_backup(path):
+ pathbackup = path + '.autotest'
+ if os.path.exists(path) and not os.path.exists(pathbackup):
+ shutil.copy2(path, pathbackup)
+ # copy2 does not copy ownership, so do it here.
+ # Reference: http://docs.python.org/library/shutil.html
+ a = os.stat(path)
+ os.chown(pathbackup, a[4], a[5])
+
+def config_copydir(path):
+ if os.path.exists(path) and not os.path.isdir(path):
+ raise OSError, "'%s' is not a directory" % (path)
+ _restore_backup(path)
+
+ pathbackup = path + '.autotest'
+ if os.path.exists(path):
+ shutil.copytree(path, pathbackup, symlinks=True)
+
+def config_replace(path,contents,append=False):
+ '''Replace (or append) to a config file'''
+ _restore_backup(path)
+ if os.path.exists(path):
+ _save_backup(path)
+ if append:
+ contents = file(path).read() + contents
+ open(path, 'w').write(contents)
+
+def config_comment(path, field):
+ _save_backup(path)
+ contents = ""
+ for line in file(path):
+ if re.search("^\s*%s\s*=" % (field), line):
+ line = "#" + line
+ contents += line
+
+ open(path+'.new', 'w').write(contents)
+ os.rename(path+'.new', path)
+
+def config_set(path, field, value, spaces=True):
+ _save_backup(path)
+ contents = ""
+ if spaces==True:
+ setting = '%s = %s\n' % (field, value)
+ else:
+ setting = '%s=%s\n' % (field, value)
+ found = False
+ for line in file(path):
+ if re.search("^\s*%s\s*=" % (field), line):
+ found = True
+ line = setting
+ contents += line
+ if not found:
+ contents += setting
+
+ open(path+'.new', 'w').write(contents)
+ os.rename(path+'.new', path)
+
+def config_patch(path, patch, depth=1):
+ '''Patch a config file'''
+ _restore_backup(path)
+ _save_backup(path)
+
+ handle, name = mkstemp_fill(patch)
+ rc = subprocess.call(['/usr/bin/patch', '-p%s' %(depth), path], stdin=handle, stdout=subprocess.PIPE)
+ os.unlink(name)
+ if rc != 0:
+ raise Exception("Patch failed")
+
+def config_restore(path):
+ '''Rename a replaced config file back to its initial state'''
+ _restore_backup(path)
+
+def timeout(secs, f, *args):
+ def handler(signum, frame):
+ raise TimedOutException()
+
+ old = signal.signal(signal.SIGALRM, handler)
+ result = None
+ signal.alarm(secs)
+ try:
+ result = f(*args)
+ finally:
+ signal.alarm(0)
+ signal.signal(signal.SIGALRM, old)
+
+ return result
+
+def require_nonroot():
+ if os.geteuid() == 0:
+ print >>sys.stderr, "This series of tests should be run as a regular user with sudo access, not as root."
+ sys.exit(1)
+
+def require_root():
+ if os.geteuid() != 0:
+ print >>sys.stderr, "This series of tests should be run with root privileges (e.g. via sudo)."
+ sys.exit(1)
+
+def require_sudo():
+ if os.geteuid() != 0 or os.environ.get('SUDO_USER', None) == None:
+ print >>sys.stderr, "This series of tests must be run under sudo."
+ sys.exit(1)
+ if os.environ['SUDO_USER'] == 'root':
+ print >>sys.stderr, 'Please run this test using sudo from a regular user. (You ran sudo from root.)'
+ sys.exit(1)
+
+def random_string(length,lower=False):
+ '''Return a random string, consisting of ASCII letters, with given
+ length.'''
+
+ s = ''
+ selection = string.letters
+ if lower:
+ selection = string.lowercase
+ maxind = len(selection)-1
+ for l in range(length):
+ s += selection[random.randint(0, maxind)]
+ return s
+
+def mkstemp_fill(contents,suffix='',prefix='testlib-',dir=None):
+ '''As tempfile.mkstemp does, return a (file, name) pair, but with
+ prefilled contents.'''
+
+ handle, name = tempfile.mkstemp(suffix=suffix,prefix=prefix,dir=dir)
+ os.close(handle)
+ handle = file(name,"w+")
+ handle.write(contents)
+ handle.flush()
+ handle.seek(0)
+
+ return handle, name
+
+def create_fill(path, contents, mode=0644):
+ '''Safely create a page'''
+ # make the temp file in the same dir as the destination file so we
+ # don't get invalid cross-device link errors when we rename
+ handle, name = mkstemp_fill(contents, dir=os.path.dirname(path))
+ handle.close()
+ os.rename(name, path)
+ os.chmod(path, mode)
+
+def login_exists(login):
+ '''Checks whether the given login exists on the system.'''
+
+ try:
+ pwd.getpwnam(login)
+ return True
+ except KeyError:
+ return False
+
+def group_exists(group):
+ '''Checks whether the given login exists on the system.'''
+
+ try:
+ grp.getgrnam(group)
+ return True
+ except KeyError:
+ return False
+
+def recursive_rm(dirPath, contents_only=False):
+ '''recursively remove directory'''
+ names = os.listdir(dirPath)
+ for name in names:
+ path = os.path.join(dirPath, name)
+ if os.path.islink(path) or not os.path.isdir(path):
+ os.unlink(path)
+ else:
+ recursive_rm(path)
+ if contents_only == False:
+ os.rmdir(dirPath)
+
+def check_pidfile(exe, pidfile):
+ '''Checks if pid in pidfile is running'''
+ if not os.path.exists(pidfile):
+ return False
+
+ # get the pid
+ try:
+ fd = open(pidfile, 'r')
+ pid = fd.readline().rstrip('\n')
+ fd.close()
+ except:
+ return False
+
+ return check_pid(exe, pid)
+
+def check_pid(exe, pid):
+ '''Checks if pid is running'''
+ cmdline = "/proc/%s/cmdline" % (str(pid))
+ if not os.path.exists(cmdline):
+ return False
+
+ # get the command line
+ try:
+ fd = open(cmdline, 'r')
+ tmp = fd.readline().split('\0')
+ fd.close()
+ except:
+ return False
+
+ # this allows us to match absolute paths or just the executable name
+ if re.match('^' + exe + '$', tmp[0]) or \
+ re.match('.*/' + exe + '$', tmp[0]) or \
+ re.match('^' + exe + ': ', tmp[0]) or \
+ re.match('^\(' + exe + '\)', tmp[0]):
+ return True
+
+ return False
+
+def check_port(port, proto, ver=4):
+ '''Check if something is listening on the specified port.
+ WARNING: for some reason this does not work with a bind mounted /proc
+ '''
+ assert (port >= 1)
+ assert (port <= 65535)
+ assert (proto.lower() == "tcp" or proto.lower() == "udp")
+ assert (ver == 4 or ver == 6)
+
+ fn = "/proc/net/%s" % (proto)
+ if ver == 6:
+ fn += str(ver)
+
+ rc, report = cmd(['cat', fn])
+ assert (rc == 0)
+
+ hport = "%0.4x" % port
+
+ if re.search(': [0-9a-f]{8}:%s [0-9a-f]' % str(hport).lower(), report.lower()):
+ return True
+ return False
+
+def get_arch():
+ '''Get the current architecture'''
+ rc, report = cmd(['uname', '-m'])
+ assert (rc == 0)
+ return report.strip()
+
+def get_memory():
+ '''Gets total ram and swap'''
+ meminfo = "/proc/meminfo"
+ memtotal = 0
+ swaptotal = 0
+ if not os.path.exists(meminfo):
+ return (False, False)
+
+ try:
+ fd = open(meminfo, 'r')
+ for line in fd.readlines():
+ splitline = line.split()
+ if splitline[0] == 'MemTotal:':
+ memtotal = int(splitline[1])
+ elif splitline[0] == 'SwapTotal:':
+ swaptotal = int(splitline[1])
+ fd.close()
+ except:
+ return (False, False)
+
+ return (memtotal,swaptotal)
+
+def is_running_in_vm():
+ '''Check if running under a VM'''
+ # add other virtualization environments here
+ for search in ['QEMU Virtual CPU']:
+ rc, report = cmd_pipe(['dmesg'], ['grep', search])
+ if rc == 0:
+ return True
+ return False
+
+def ubuntu_release():
+ '''Get the Ubuntu release'''
+ f = "/etc/lsb-release"
+ try:
+ size = os.stat(f)[ST_SIZE]
+ except:
+ return "UNKNOWN"
+
+ if size > 1024*1024:
+ raise IOError, 'Could not open "%s" (too big)' % f
+
+ try:
+ fh = open("/etc/lsb-release", 'r')
+ except:
+ raise
+
+ lines = fh.readlines()
+ fh.close()
+
+ pat = re.compile(r'DISTRIB_CODENAME')
+ for line in lines:
+ if pat.search(line):
+ return line.split('=')[1].rstrip('\n').rstrip('\r')
+
+ return "UNKNOWN"
+
+def cmd(command, input = None, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, stdin = None, timeout = None):
+ '''Try to execute given command (array) and return its stdout, or return
+ a textual error if it failed.'''
+
+ try:
+ sp = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True, preexec_fn=subprocess_setup)
+ except OSError, e:
+ return [127, str(e)]
+
+ out, outerr = sp.communicate(input)
+ # Handle redirection of stdout
+ if out == None:
+ out = ''
+ # Handle redirection of stderr
+ if outerr == None:
+ outerr = ''
+ return [sp.returncode,out+outerr]
+
+def cmd_pipe(command1, command2, input = None, stderr = subprocess.STDOUT, stdin = None):
+ '''Try to pipe command1 into command2.'''
+ try:
+ sp1 = subprocess.Popen(command1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
+ sp2 = subprocess.Popen(command2, stdin=sp1.stdout, stdout=subprocess.PIPE, stderr=stderr, close_fds=True)
+ except OSError, e:
+ return [127, str(e)]
+
+ out = sp2.communicate(input)[0]
+ return [sp2.returncode,out]
+
+def cwd_has_enough_space(cdir, total_bytes):
+ '''Determine if the partition of the current working directory has 'bytes'
+ free.'''
+ rc, df_output = cmd(['df'])
+ result = 'Got exit code %d, expected %d\n' % (rc, 0)
+ if rc != 0:
+ return False
+
+ kb = total_bytes / 1024
+
+ mounts = dict()
+ for line in df_output.splitlines():
+ if '/' not in line:
+ continue
+ tmp = line.split()
+ mounts[tmp[5]] = int(tmp[3])
+
+ cdir = os.getcwd()
+ while cdir != '/':
+ if not mounts.has_key(cdir):
+ cdir = os.path.dirname(cdir)
+ continue
+ if kb < mounts[cdir]:
+ return True
+ else:
+ return False
+
+ if kb < mounts['/']:
+ return True
+
+ return False
+
+def get_md5(filename):
+ '''Gets the md5sum of the file specified'''
+
+ (rc, report) = cmd(["/usr/bin/md5sum", "-b", filename])
+ expected = 0
+ assert (expected == rc)
+
+ return report.split(' ')[0]
+
+def dpkg_compare_installed_version(pkg, check, version):
+ '''Gets the version for the installed package, and compares it to the
+ specified version.
+ '''
+ (rc, report) = cmd(["/usr/bin/dpkg", "-s", pkg])
+ assert (rc == 0)
+ assert ("Status: install ok installed" in report)
+ installed_version = ""
+ for line in report.splitlines():
+ if line.startswith("Version: "):
+ installed_version = line.split()[1]
+
+ assert (installed_version != "")
+
+ (rc, report) = cmd(["/usr/bin/dpkg", "--compare-versions", installed_version, check, version])
+ assert (rc == 0 or rc == 1)
+ if rc == 0:
+ return True
+ return False
+
+def prepare_source(source, builder, cached_src, build_src, patch_system):
+ '''Download and unpack source package, installing necessary build depends,
+ adjusting the permissions for the 'builder' user, and returning the
+ directory of the unpacked source. Patch system can be one of:
+ - cdbs
+ - dpatch
+ - quilt
+ - quiltv3
+ - None (not the string)
+
+ This is normally used like this:
+
+ def setUp(self):
+ ...
+ self.topdir = os.getcwd()
+ self.cached_src = os.path.join(os.getcwd(), "source")
+ self.tmpdir = tempfile.mkdtemp(prefix='testlib', dir='/tmp')
+ self.builder = testlib.TestUser()
+ testlib.cmd(['chgrp', self.builder.login, self.tmpdir])
+ os.chmod(self.tmpdir, 0775)
+
+ def tearDown(self):
+ ...
+ self.builder = None
+ self.topdir = os.getcwd()
+ if os.path.exists(self.tmpdir):
+ testlib.recursive_rm(self.tmpdir)
+
+ def test_suite_build(self):
+ ...
+ build_dir = testlib.prepare_source('foo', \
+ self.builder, \
+ self.cached_src, \
+ os.path.join(self.tmpdir, \
+ os.path.basename(self.cached_src)),
+ "quilt")
+ os.chdir(build_dir)
+
+ # Example for typical build, adjust as necessary
+ print ""
+ print " make clean"
+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'clean'])
+
+ print " configure"
+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, './configure', '--prefix=%s' % self.tmpdir, '--enable-debug'])
+
+ print " make (will take a while)"
+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make'])
+
+ print " make check (will take a while)",
+ rc, report = testlib.cmd(['sudo', '-u', self.builder.login, 'make', 'check'])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+
+ def test_suite_cleanup(self):
+ ...
+ if os.path.exists(self.cached_src):
+ testlib.recursive_rm(self.cached_src)
+
+ It is up to the caller to clean up cached_src and build_src (as in the
+ above example, often the build_src is in a tmpdir that is cleaned in
+ tearDown() and the cached_src is cleaned in a one time clean-up
+ operation (eg 'test_suite_cleanup()) which must be run after the build
+ suite test (obviously).
+ '''
+
+ # Make sure we have a clean slate
+ assert (os.path.exists(os.path.dirname(build_src)))
+ assert (not os.path.exists(build_src))
+
+ cdir = os.getcwd()
+ if os.path.exists(cached_src):
+ shutil.copytree(cached_src, build_src)
+ os.chdir(build_src)
+ else:
+ # Only install the build dependencies on the initial setup
+ rc, report = cmd(['apt-get','-y','--force-yes','build-dep',source])
+ assert (rc == 0)
+
+ os.makedirs(build_src)
+ os.chdir(build_src)
+
+ # These are always needed
+ pkgs = ['build-essential', 'dpkg-dev', 'fakeroot']
+ rc, report = cmd(['apt-get','-y','--force-yes','install'] + pkgs)
+ assert (rc == 0)
+
+ rc, report = cmd(['apt-get','source',source])
+ assert (rc == 0)
+ shutil.copytree(build_src, cached_src)
+
+ unpacked_dir = os.path.join(build_src, glob.glob('%s-*' % source)[0])
+
+ # Now apply the patches. Do it here so that we don't mess up our cached
+ # sources.
+ os.chdir(unpacked_dir)
+ assert (patch_system in ['cdbs', 'dpatch', 'quilt', 'quiltv3', None])
+ if patch_system != None and patch_system != "quiltv3":
+ if patch_system == "quilt":
+ os.environ.setdefault('QUILT_PATCHES','debian/patches')
+ rc, report = cmd(['quilt', 'push', '-a'])
+ assert (rc == 0)
+ elif patch_system == "cdbs":
+ rc, report = cmd(['./debian/rules', 'apply-patches'])
+ assert (rc == 0)
+ elif patch_system == "dpatch":
+ rc, report = cmd(['dpatch', 'apply-all'])
+ assert (rc == 0)
+
+ cmd(['chown', '-R', '%s:%s' % (builder.uid, builder.gid), build_src])
+ os.chdir(cdir)
+
+ return unpacked_dir
+
+def _aa_status():
+ '''Get aa-status output'''
+ exe = "/usr/sbin/aa-status"
+ assert (os.path.exists(exe))
+ if os.geteuid() == 0:
+ return cmd([exe])
+ return cmd(['sudo', exe])
+
+def is_apparmor_loaded(path):
+ '''Check if profile is loaded'''
+ rc, report = _aa_status()
+ if rc != 0:
+ return False
+
+ for line in report.splitlines():
+ if line.endswith(path):
+ return True
+ return False
+
+def is_apparmor_confined(path):
+ '''Check if application is confined'''
+ rc, report = _aa_status()
+ if rc != 0:
+ return False
+
+ for line in report.splitlines():
+ if re.search('%s \(' % path, line):
+ return True
+ return False
+
+def check_apparmor(path, first_ubuntu_release, is_running=True):
+ '''Check if path is loaded and confined for everything higher than the
+ first Ubuntu release specified.
+
+ Usage:
+ rc, report = testlib.check_apparmor('/usr/sbin/foo', 8.04, is_running=True)
+ if rc < 0:
+ return self._skipped(report)
+
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+ '''
+ global manager
+ rc = -1
+
+ if manager.lsb_release["Release"] < first_ubuntu_release:
+ return (rc, "Skipped apparmor check")
+
+ if not os.path.exists('/sbin/apparmor_parser'):
+ return (rc, "Skipped (couldn't find apparmor_parser)")
+
+ rc = 0
+ msg = ""
+ if not is_apparmor_loaded(path):
+ rc = 1
+ msg = "Profile not loaded for '%s'" % path
+
+ # this check only makes sense it the 'path' is currently executing
+ if is_running and rc == 0 and not is_apparmor_confined(path):
+ rc = 1
+ msg = "'%s' is not running in enforce mode" % path
+
+ return (rc, msg)
+
+def get_gcc_version(gcc, full=True):
+ gcc_version = 'none'
+ if not gcc.startswith('/'):
+ gcc = '/usr/bin/%s' % (gcc)
+ if os.path.exists(gcc):
+ gcc_version = 'unknown'
+ lines = cmd([gcc,'-v'])[1].strip().splitlines()
+ version_lines = [x for x in lines if x.startswith('gcc version')]
+ if len(version_lines) == 1:
+ gcc_version = " ".join(version_lines[0].split()[2:])
+ if not full:
+ return gcc_version.split()[0]
+ return gcc_version
+
+def is_kdeinit_running():
+ '''Test if kdeinit is running'''
+ # applications that use kdeinit will spawn it if it isn't running in the
+ # test. This is a problem because it does not exit. This is a helper to
+ # check for it.
+ rc, report = cmd(['ps', 'x'])
+ if 'kdeinit4 Running' not in report:
+ print >>sys.stderr, ("kdeinit not running (you may start/stop any KDE application then run this script again)")
+ return False
+ return True
+
+def get_pkgconfig_flags(libs=[]):
+ '''Find pkg-config flags for libraries'''
+ assert (len(libs) > 0)
+ rc, pkg_config = cmd(['pkg-config', '--cflags', '--libs'] + libs)
+ expected = 0
+ if rc != expected:
+ print >>sys.stderr, 'Got exit code %d, expected %d\n' % (rc, expected)
+ assert(rc == expected)
+ return pkg_config.split()
+
+class TestDaemon:
+ '''Helper class to manage daemons consistently'''
+ def __init__(self, init):
+ '''Setup daemon attributes'''
+ self.initscript = init
+
+ def start(self):
+ '''Start daemon'''
+ rc, report = cmd([self.initscript, 'start'])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ time.sleep(2)
+ if expected != rc:
+ return (False, result + report)
+
+ if "fail" in report:
+ return (False, "Found 'fail' in report\n" + report)
+
+ return (True, "")
+
+ def stop(self):
+ '''Stop daemon'''
+ rc, report = cmd([self.initscript, 'stop'])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ if expected != rc:
+ return (False, result + report)
+
+ if "fail" in report:
+ return (False, "Found 'fail' in report\n" + report)
+
+ return (True, "")
+
+ def reload(self):
+ '''Reload daemon'''
+ rc, report = cmd([self.initscript, 'force-reload'])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ if expected != rc:
+ return (False, result + report)
+
+ if "fail" in report:
+ return (False, "Found 'fail' in report\n" + report)
+
+ return (True, "")
+
+ def restart(self):
+ '''Restart daemon'''
+ (res, str) = self.stop()
+ if not res:
+ return (res, str)
+
+ (res, str) = self.start()
+ if not res:
+ return (res, str)
+
+ return (True, "")
+
+ def status(self):
+ '''Check daemon status'''
+ rc, report = cmd([self.initscript, 'status'])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ if expected != rc:
+ return (False, result + report)
+
+ if "fail" in report:
+ return (False, "Found 'fail' in report\n" + report)
+
+ return (True, "")
+
+class TestlibManager(object):
+ '''Singleton class used to set up per-test-run information'''
+ def __init__(self):
+ # Set glibc aborts to dump to stderr instead of the tty so test output
+ # is more sane.
+ os.environ.setdefault('LIBC_FATAL_STDERR_','1')
+
+ # check verbosity
+ self.verbosity = False
+ if (len(sys.argv) > 1 and '-v' in sys.argv[1:]):
+ self.verbosity = True
+
+ # Load LSB release file
+ self.lsb_release = dict()
+ if not os.path.exists('/usr/bin/lsb_release') and not os.path.exists('/bin/lsb_release'):
+ raise OSError, "Please install 'lsb-release'"
+ for line in subprocess.Popen(['lsb_release','-a'],stdout=subprocess.PIPE,stderr=subprocess.PIPE).communicate()[0].splitlines():
+ field, value = line.split(':',1)
+ value=value.strip()
+ field=field.strip()
+ # Convert numerics
+ try:
+ value = float(value)
+ except:
+ pass
+ self.lsb_release.setdefault(field,value)
+
+ # FIXME: hack OEM releases into known-Ubuntu versions
+ if self.lsb_release['Distributor ID'] == "HP MIE (Mobile Internet Experience)":
+ if self.lsb_release['Release'] == 1.0:
+ self.lsb_release['Distributor ID'] = "Ubuntu"
+ self.lsb_release['Release'] = 8.04
+ else:
+ raise OSError, "Unknown version of HP MIE"
+
+ # FIXME: hack to assume a most-recent release if we're not
+ # running under Ubuntu.
+ if self.lsb_release['Distributor ID'] not in ["Ubuntu","Linaro"]:
+ self.lsb_release['Release'] = 10000
+ # Adjust Linaro release to pretend to be Ubuntu
+ if self.lsb_release['Distributor ID'] in ["Linaro"]:
+ self.lsb_release['Distributor ID'] = "Ubuntu"
+ self.lsb_release['Release'] -= 0.01
+
+ # Load arch
+ if not os.path.exists('/usr/bin/dpkg'):
+ machine = cmd(['uname','-m'])[1].strip()
+ if machine.endswith('86'):
+ self.dpkg_arch = 'i386'
+ elif machine.endswith('_64'):
+ self.dpkg_arch = 'amd64'
+ elif machine.startswith('arm'):
+ self.dpkg_arch = 'armel'
+ else:
+ raise ValueError, "Unknown machine type '%s'" % (machine)
+ else:
+ self.dpkg_arch = cmd(['dpkg','--print-architecture'])[1].strip()
+
+ # Find kernel version
+ self.kernel_is_ubuntu = False
+ self.kernel_version_signature = None
+ self.kernel_version = cmd(["uname","-r"])[1].strip()
+ versig = '/proc/version_signature'
+ if os.path.exists(versig):
+ self.kernel_is_ubuntu = True
+ self.kernel_version_signature = file(versig).read().strip()
+ self.kernel_version_ubuntu = self.kernel_version
+ elif os.path.exists('/usr/bin/dpkg'):
+ # this can easily be inaccurate but is only an issue for Dapper
+ rc, out = cmd(['dpkg','-l','linux-image-%s' % (self.kernel_version)])
+ if rc == 0:
+ self.kernel_version_signature = out.strip().split('\n').pop().split()[2]
+ self.kernel_version_ubuntu = self.kernel_version_signature
+ if self.kernel_version_signature == None:
+ # Attempt to fall back to something for non-Debian-based
+ self.kernel_version_signature = self.kernel_version
+ self.kernel_version_ubuntu = self.kernel_version
+ # Build ubuntu version without hardware suffix
+ try:
+ self.kernel_version_ubuntu = "-".join([x for x in self.kernel_version_signature.split(' ')[1].split('-') if re.search('^[0-9]', x)])
+ except:
+ pass
+
+ # Find gcc version
+ self.gcc_version = get_gcc_version('gcc')
+
+ # Find libc
+ self.path_libc = [x.split()[2] for x in cmd(['ldd','/bin/ls'])[1].splitlines() if x.startswith('\tlibc.so.')][0]
+
+ # Report self
+ if self.verbosity:
+ kernel = self.kernel_version_ubuntu
+ if kernel != self.kernel_version_signature:
+ kernel += " (%s)" % (self.kernel_version_signature)
+ print >>sys.stdout, "Running test: '%s' distro: '%s %.2f' kernel: '%s' arch: '%s' uid: %d/%d SUDO_USER: '%s')" % ( \
+ sys.argv[0],
+ self.lsb_release['Distributor ID'],
+ self.lsb_release['Release'],
+ kernel,
+ self.dpkg_arch,
+ os.geteuid(), os.getuid(),
+ os.environ.get('SUDO_USER', ''))
+ sys.stdout.flush()
+
+ # Additional heuristics
+ #if os.environ.get('SUDO_USER', os.environ.get('USER', '')) in ['mdeslaur']:
+ # sys.stdout.write("Replying to Marc Deslauriers in http://launchpad.net/bugs/%d: " % random.randint(600000, 980000))
+ # sys.stdout.flush()
+ # time.sleep(0.5)
+ # sys.stdout.write("destroyed\n")
+ # time.sleep(0.5)
+
+ def hello(self, msg):
+ print >>sys.stderr, "Hello from %s" % (msg)
+# The central instance
+manager = TestlibManager()
+
+class TestlibCase(unittest.TestCase):
+ def __init__(self, *args):
+ '''This is called for each TestCase test instance, which isn't much better
+ than SetUp.'''
+
+ unittest.TestCase.__init__(self, *args)
+
+ # Attach to and duplicate dicts from manager singleton
+ self.manager = manager
+ #self.manager.hello(repr(self) + repr(*args))
+ self.my_verbosity = self.manager.verbosity
+ self.lsb_release = self.manager.lsb_release
+ self.dpkg_arch = self.manager.dpkg_arch
+ self.kernel_version = self.manager.kernel_version
+ self.kernel_version_signature = self.manager.kernel_version_signature
+ self.kernel_version_ubuntu = self.manager.kernel_version_ubuntu
+ self.kernel_is_ubuntu = self.manager.kernel_is_ubuntu
+ self.gcc_version = self.manager.gcc_version
+ self.path_libc = self.manager.path_libc
+
+ def version_compare(self, one, two):
+ return apt_pkg.VersionCompare(one,two)
+
+ def assertFileType(self, filename, filetype):
+ '''Checks the file type of the file specified'''
+
+ (rc, report, out) = self._testlib_shell_cmd(["/usr/bin/file", "-b", filename])
+ out = out.strip()
+ expected = 0
+ # Absolutely no idea why this happens on Hardy
+ if self.lsb_release['Release'] == 8.04 and rc == 255 and len(out) > 0:
+ rc = 0
+ result = 'Got exit code %d, expected %d:\n%s\n' % (rc, expected, report)
+ self.assertEquals(expected, rc, result)
+
+ filetype = '^%s$' % (filetype)
+ result = 'File type reported by file: [%s], expected regex: [%s]\n' % (out, filetype)
+ self.assertNotEquals(None, re.search(filetype, out), result)
+
+ def yank_commonname_from_cert(self, certfile):
+ '''Extract the commonName from a given PEM'''
+ rc, out = cmd(['openssl','asn1parse','-in',certfile])
+ if rc == 0:
+ ready = False
+ for line in out.splitlines():
+ if ready:
+ return line.split(':')[-1]
+ if ':commonName' in line:
+ ready = True
+ return socket.getfqdn()
+
+ def announce(self, text):
+ if self.my_verbosity:
+ print >>sys.stdout, "(%s) " % (text),
+ sys.stdout.flush()
+
+ def make_clean(self):
+ rc, output = self.shell_cmd(['make','clean'])
+ self.assertEquals(rc, 0, output)
+
+ def get_makefile_compiler(self):
+ # Find potential compiler name
+ compiler = 'gcc'
+ if os.path.exists('Makefile'):
+ for line in open('Makefile'):
+ if line.startswith('CC') and '=' in line:
+ items = [x.strip() for x in line.split('=')]
+ if items[0] == 'CC':
+ compiler = items[1]
+ break
+ return compiler
+
+ def make_target(self, target, expected=0):
+ '''Compile a target and report output'''
+
+ compiler = self.get_makefile_compiler()
+ rc, output = self.shell_cmd(['make',target])
+ self.assertEquals(rc, expected, 'rc(%d)!=%d:\n' % (rc, expected) + output)
+ self.assertTrue('%s ' % (compiler) in output, 'Expected "%s":' % (compiler) + output)
+ return output
+
+ # call as return testlib.skipped()
+ def _skipped(self, reason=""):
+ '''Provide a visible way to indicate that a test was skipped'''
+ if reason != "":
+ reason = ': %s' % (reason)
+ self.announce("skipped%s" % (reason))
+ return False
+
+ def _testlib_shell_cmd(self,args,stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT):
+ argstr = "'" + "', '".join(args).strip() + "'"
+ rc, out = cmd(args,stdin=stdin,stdout=stdout,stderr=stderr)
+ report = 'Command: ' + argstr + '\nOutput:\n' + out
+ return rc, report, out
+
+ def shell_cmd(self, args, stdin=None):
+ return cmd(args,stdin=stdin)
+
+ def assertShellExitEquals(self, expected, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
+ '''Test a shell command matches a specific exit code'''
+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, msg + result + report)
+
+ def assertShellExitNotEquals(self, unwanted, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg=""):
+ '''Test a shell command doesn't match a specific exit code'''
+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
+ result = 'Got (unwanted) exit code %d\n' % rc
+ self.assertNotEquals(unwanted, rc, msg + result + report)
+
+ def assertShellOutputContains(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False):
+ '''Test a shell command contains a specific output'''
+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
+ result = 'Got exit code %d. Looking for text "%s"\n' % (rc, text)
+ if not invert:
+ self.assertTrue(text in out, msg + result + report)
+ else:
+ self.assertFalse(text in out, msg + result + report)
+
+ def assertShellOutputEquals(self, text, args, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, msg="", invert=False, expected=None):
+ '''Test a shell command matches a specific output'''
+ rc, report, out = self._testlib_shell_cmd(args, stdin=stdin, stdout=stdout, stderr=stderr)
+ result = 'Got exit code %d. Looking for exact text "%s" (%s)\n' % (rc, text, " ".join(args))
+ if not invert:
+ self.assertEquals(text, out, msg + result + report)
+ else:
+ self.assertNotEquals(text, out, msg + result + report)
+ if expected != None:
+ result = 'Got exit code %d. Expected %d (%s)\n' % (rc, expected, " ".join(args))
+ self.assertEquals(rc, expected, msg + result + report)
+
+ def _word_find(self, report, content, invert=False):
+ '''Check for a specific string'''
+ if invert:
+ warning = 'Found "%s"\n' % content
+ self.assertTrue(content not in report, warning + report)
+ else:
+ warning = 'Could not find "%s"\n' % content
+ self.assertTrue(content in report, warning + report)
+
+ def _test_sysctl_value(self, path, expected, msg=None, exists=True):
+ sysctl = '/proc/sys/%s' % (path)
+ self.assertEquals(exists, os.path.exists(sysctl), sysctl)
+ value = None
+ if exists:
+ value = int(file(sysctl).read())
+ report = "%s is not %d: %d" % (sysctl, expected, value)
+ if msg:
+ report += " (%s)" % (msg)
+ self.assertEquals(value, expected, report)
+ return value
+
+ def set_sysctl_value(self, path, desired):
+ sysctl = '/proc/sys/%s' % (path)
+ self.assertTrue(os.path.exists(sysctl),"%s does not exist" % (sysctl))
+ file(sysctl,'w').write(str(desired))
+ self._test_sysctl_value(path, desired)
+
+ def kernel_at_least(self, introduced):
+ return self.version_compare(self.kernel_version_ubuntu,
+ introduced) >= 0
+
+ def kernel_claims_cve_fixed(self, cve):
+ changelog = "/usr/share/doc/linux-image-%s/changelog.Debian.gz" % (self.kernel_version)
+ if os.path.exists(changelog):
+ for line in gzip.open(changelog):
+ if cve in line and not "revert" in line and not "Revert" in line:
+ return True
+ return False
+
+class TestGroup:
+ '''Create a temporary test group and remove it again in the dtor.'''
+
+ def __init__(self, group=None, lower=False):
+ '''Create a new group'''
+
+ self.group = None
+ if group:
+ if group_exists(group):
+ raise ValueError, 'group name already exists'
+ else:
+ while(True):
+ group = random_string(7,lower=lower)
+ if not group_exists(group):
+ break
+
+ assert subprocess.call(['groupadd',group]) == 0
+ self.group = group
+ g = grp.getgrnam(self.group)
+ self.gid = g[2]
+
+ def __del__(self):
+ '''Remove the created group.'''
+
+ if self.group:
+ rc, report = cmd(['groupdel', self.group])
+ assert rc == 0
+
+class TestUser:
+ '''Create a temporary test user and remove it again in the dtor.'''
+
+ def __init__(self, login=None, home=True, group=None, uidmin=None, lower=False, shell=None):
+ '''Create a new user account with a random password.
+
+ By default, the login name is random, too, but can be explicitly
+ specified with 'login'. By default, a home directory is created, this
+ can be suppressed with 'home=False'.'''
+
+ self.login = None
+
+ if os.geteuid() != 0:
+ raise ValueError, "You must be root to run this test"
+
+ if login:
+ if login_exists(login):
+ raise ValueError, 'login name already exists'
+ else:
+ while(True):
+ login = 't' + random_string(7,lower=lower)
+ if not login_exists(login):
+ break
+
+ self.salt = random_string(2)
+ self.password = random_string(8,lower=lower)
+ self.crypted = crypt.crypt(self.password, self.salt)
+
+ creation = ['useradd', '-p', self.crypted]
+ if home:
+ creation += ['-m']
+ if group:
+ creation += ['-G',group]
+ if uidmin:
+ creation += ['-K','UID_MIN=%d'%uidmin]
+ if shell:
+ creation += ['-s',shell]
+ creation += [login]
+ assert subprocess.call(creation) == 0
+ # Set GECOS
+ assert subprocess.call(['usermod','-c','Buddy %s' % (login),login]) == 0
+
+ self.login = login
+ p = pwd.getpwnam(self.login)
+ self.uid = p[2]
+ self.gid = p[3]
+ self.gecos = p[4]
+ self.home = p[5]
+ self.shell = p[6]
+
+ def __del__(self):
+ '''Remove the created user account.'''
+
+ if self.login:
+ # sanity check the login name so we don't accidentally wipe too much
+ if len(self.login)>3 and not '/' in self.login:
+ subprocess.call(['rm','-rf', '/home/'+self.login, '/var/mail/'+self.login])
+ rc, report = cmd(['userdel', '-f', self.login])
+ assert rc == 0
+
+ def add_to_group(self, group):
+ '''Add user to the specified group name'''
+ rc, report = cmd(['usermod', '-G', group, self.login])
+ if rc != 0:
+ print report
+ assert rc == 0
+
+# Timeout handler using alarm() from John P. Speno's Pythonic Avocado
+class TimeoutFunctionException(Exception):
+ """Exception to raise on a timeout"""
+ pass
+class TimeoutFunction:
+ def __init__(self, function, timeout):
+ self.timeout = timeout
+ self.function = function
+
+ def handle_timeout(self, signum, frame):
+ raise TimeoutFunctionException()
+
+ def __call__(self, *args, **kwargs):
+ old = signal.signal(signal.SIGALRM, self.handle_timeout)
+ signal.alarm(self.timeout)
+ try:
+ result = self.function(*args, **kwargs)
+ finally:
+ signal.signal(signal.SIGALRM, old)
+ signal.alarm(0)
+ return result
+
+def main():
+ print "hi"
+ unittest.main()
Added: trunk/debian/tests/testlib_httpd.py
===================================================================
--- trunk/debian/tests/testlib_httpd.py (rev 0)
+++ trunk/debian/tests/testlib_httpd.py 2013-08-04 11:00:53 UTC (rev 738)
@@ -0,0 +1,353 @@
+#!/usr/bin/python
+#
+# testlib_httpd.py quality assurance test script
+# Copyright (C) 2008-2013 Canonical Ltd.
+# Author: Jamie Strandboge <jamie at canonical.com>
+# Author: Marc Deslauriers <marc.deslauriers at canonical.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <httpd://www.gnu.org/licenses/>.
+#
+
+import unittest, subprocess
+import os
+import sys
+import tempfile
+import testlib
+import time
+import socket
+import shutil
+import cookielib
+import urllib2
+import re
+import base64
+
+class HttpdCommon(testlib.TestlibCase):
+ '''Common functions'''
+ def _setUp(self, clearlogs = False):
+ '''Setup'''
+ self.release = self.lsb_release['Codename']
+ self.html_page = "/var/www/test.html"
+ self.php_page = "/var/www/test.php"
+ self.cgi_page = "/usr/lib/cgi-bin/test-cgi.pl"
+ self.apache2_default = "/etc/default/apache2"
+ self.ssl_key = "/etc/ssl/private/server.key"
+ self.ssl_crt = "/etc/ssl/certs/server.crt"
+ self.ssl_site = "/etc/apache2/sites-enabled/999-testlib"
+ self.ports_file = "/etc/apache2/ports.conf"
+ self.access_log = "/var/log/apache2/access.log"
+ self.error_log = "/var/log/apache2/error.log"
+ if not hasattr(self, 'initscript'):
+ self._set_initscript("/etc/init.d/apache2")
+
+ # Dapper's apache2 is disabled by default
+ if self.lsb_release['Release'] == 6.06:
+ testlib.config_replace(self.apache2_default, "", append=True)
+ subprocess.call(['sed', '-i', 's/NO_START=1/NO_START=0/', self.apache2_default])
+
+ self._stop()
+ if clearlogs == True:
+ self._clearlogs()
+ self._start()
+
+ def _set_initscript(self, initscript):
+ self.initscript = initscript
+
+ def _tearDown(self):
+ '''Clean up after each test_* function'''
+ self._stop()
+ time.sleep(2)
+ if os.path.exists(self.html_page):
+ os.unlink(self.html_page)
+ if os.path.exists(self.php_page):
+ os.unlink(self.php_page)
+ if os.path.exists(self.cgi_page):
+ os.unlink(self.cgi_page)
+ if os.path.exists(self.ssl_key):
+ os.unlink(self.ssl_key)
+ if os.path.exists(self.ssl_crt):
+ os.unlink(self.ssl_crt)
+ if os.path.exists(self.ssl_site):
+ os.unlink(self.ssl_site)
+ self._disable_mod("ssl")
+ testlib.config_restore(self.ports_file)
+ testlib.config_restore(self.apache2_default)
+
+ def _start(self):
+ '''Start httpd'''
+ #print self.initscript,"start"
+ rc, report = testlib.cmd([self.initscript, 'start'])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+ time.sleep(2)
+
+ def _stop(self):
+ '''Stop httpd'''
+ #print self.initscript,"stop"
+ rc, report = testlib.cmd([self.initscript, 'stop'])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+
+ def _clearlogs(self):
+ '''Clear httpd logs'''
+ if os.path.exists(self.access_log):
+ os.unlink(self.access_log)
+ if os.path.exists(self.error_log):
+ os.unlink(self.error_log)
+
+ def __disable_mod(self, mod):
+ if not os.path.exists(os.path.join("/etc/apache2/mods-available", mod + \
+ ".load")):
+ return
+ if not os.path.exists("/usr/sbin/a2dismod"):
+ return
+ rc, report = testlib.cmd(['a2dismod', mod])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+
+ def _disable_mod(self, mod):
+ self.__disable_mod(mod)
+ self._restart()
+ time.sleep(2)
+
+ def _disable_mods(self, mods):
+ '''take a list of modules to disable'''
+ for mod in mods:
+ self.__disable_mod(mod)
+ self._restart()
+ time.sleep(2)
+
+ def __enable_mod(self, mod):
+ rc, report = testlib.cmd(['a2enmod', mod])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+
+ def _enable_mod(self, mod):
+ self.__enable_mod(mod)
+ # for some reason, force-reload doesn't work
+ # if self.lsb_release['Release'] >= 8.04:
+ # self._reload()
+ # else:
+ self._restart()
+ time.sleep(2)
+
+ def _enable_mods(self, mods):
+ '''take a list of modules to enable'''
+ for mod in mods:
+ self.__enable_mod(mod)
+ # for some reason, force-reload doesn't work
+ # if self.lsb_release['Release'] >= 8.04:
+ # self._reload()
+ # else:
+ self._restart()
+ time.sleep(2)
+
+ def _disable_site(self, sitename):
+ rc, report = testlib.cmd(['a2dissite', sitename])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+ self._restart()
+ time.sleep(2)
+
+ def _enable_site(self, sitename):
+ rc, report = testlib.cmd(['a2ensite', sitename])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+ # for some reason, force-reload doesn't work
+ # if self.lsb_release['Release'] >= 8.04:
+ # self._reload()
+ #else:
+ self._restart()
+ time.sleep(2)
+
+ def _reload(self):
+ '''Reload httpd'''
+ rc, report = testlib.cmd([self.initscript, 'force-reload'])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+
+ def _restart(self):
+ '''Restart httpd'''
+ self._stop()
+ self._start()
+
+ def _prepare_ssl(self, srvkey, srvcert):
+ '''Prepare Apache for ssl connections'''
+ self._enable_mod("ssl")
+
+ # copy instead of rename so we don't get invalid cross-device link errors
+ shutil.copy(srvkey, self.ssl_key)
+ shutil.copy(srvcert, self.ssl_crt)
+
+ if self.lsb_release['Release'] <= 7.04:
+ testlib.config_replace(self.ports_file, "Listen 443", True)
+
+ # create the conffile entry
+ site_contents = '''
+NameVirtualHost *:443
+<VirtualHost *:443>
+ SSLEngine on
+ SSLOptions +StrictRequire
+ SSLCertificateFile /etc/ssl/certs/server.crt
+ SSLCertificateKeyFile /etc/ssl/private/server.key
+
+ ServerAdmin webmaster at localhost
+
+ DocumentRoot /var/www/
+ ErrorLog /var/log/apache2/error.log
+
+ # Possible values include: debug, info, notice, warn, error, crit,
+ # alert, emerg.
+ LogLevel warn
+
+ CustomLog /var/log/apache2/access.log combined
+ ServerSignature On
+</VirtualHost>
+'''
+ testlib.create_fill(self.ssl_site, site_contents)
+ self._reload()
+
+ def _test_url_proxy(self, url="http://localhost/", content="", proxy="localhost:3128"):
+ '''Test the given url'''
+ rc, report = testlib.cmd(['elinks', '-verbose', '2', '-no-home', '1', '-eval', 'set protocol.ftp.proxy.host = "%s"' %(proxy), '-eval', 'set protocol.http.proxy.host = "%s"' %(proxy), '-eval', 'set protocol.https.proxy.host = "%s"' %(proxy), '-dump', url])
+ expected = 0
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+
+ if content != "":
+ self._word_find(report, content)
+
+ def _test_url(self, url="http://localhost/", content="", invert=False, source=False):
+ '''Test the given url'''
+ if source:
+ report = self._get_page_source(url)
+ else:
+ report = self._get_page(url)
+
+ if content != "":
+ self._word_find(report, content, invert)
+
+ def _get_page_source(self, url="http://localhost/", data='', headers=None):
+ '''Fetch html source'''
+ cookies = "/tmp/cookies.lwp"
+ testlib.create_fill(cookies, "#LWP-Cookies-2.0")
+
+ if headers == None:
+ headers = {'User-agent' : 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'}
+
+ clean_url = url
+ if re.search(r'http(|s)://.*:.*@[a-z].*', url):
+ tmp = re.sub(r'^http(|s)://', '', url)
+ username = tmp.split('@')[0].split(':')[0]
+ password = tmp.split('@')[0].split(':')[1]
+ base64_str = base64.encodestring('%s:%s' % (username, password))[:-1]
+ headers['Authorization'] = "Basic %s" % (base64_str)
+ # strip out the username and password from the url
+ clean_url = re.sub(r'%s:%s@' % (username, password), '', url)
+
+ cj = cookielib.LWPCookieJar(filename=cookies)
+ cj.load()
+
+ opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
+ urllib2.install_opener(opener)
+
+ try:
+ if data != '':
+ req = urllib2.Request(clean_url, data, headers)
+ else:
+ req = urllib2.Request(clean_url, headers=headers)
+ except:
+ raise
+
+ tries = 0
+ failed = True
+ while tries < 3:
+ try:
+ handle = urllib2.urlopen(req)
+ failed = False
+ break
+ except urllib2.HTTPError, e:
+ raise
+ if e.code != 503:
+ # for debugging
+ #print >>sys.stderr, 'Error retrieving page "url=%s", "data=%s"' % (url, data)
+ raise
+ tries += 1
+ time.sleep(2)
+
+ self.assertFalse(failed, 'Could not retrieve page "url=%s", "data=%s"' % (url, data))
+ html = handle.read()
+ cj.save()
+
+ return html
+
+ def _get_page(self, url="http://localhost/"):
+ '''Get contents of given url'''
+ rc, report = testlib.cmd(['elinks', '-verbose', '2', '-no-home', '1', '-dump', url])
+ expected = 0
+
+ result = 'Got exit code %d, expected %d\n' % (rc, expected)
+ self.assertEquals(expected, rc, result + report)
+
+ return report
+
+ def _test_raw(self, request="", content="", host="localhost", port=80, invert = False, limit=1024):
+ '''Test the given url with a raw socket to include headers'''
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((host, port))
+ s.send(request)
+ data = s.recv(limit)
+ s.close()
+
+ if content != "":
+ self._word_find(data, content, invert = invert)
+
+def create_php_page(page, php_content=None):
+ '''Create a basic php page'''
+
+ # complexity here is due to maintaining interface compatability when
+ # php_content is not provided
+ if not php_content:
+ str = "php works"
+ php_content = "echo '" + str + "'; "
+ else:
+ str = php_content
+ script = '''<?php
+%s
+?>''' %(php_content)
+ testlib.create_fill(page, script)
+ return str
+
+def create_perl_script(page):
+ '''Create a basic perl script'''
+ str = "perl works"
+ script = '''#!/usr/bin/perl
+print "Content-Type: text/plain\\n\\n";
+print "''' + str + '''\\n";
+
+'''
+ testlib.create_fill(page, script, 0755)
+
+ return str
+
+def create_html_page(page):
+ '''Create html page'''
+ str = "html works"
+ testlib.create_fill(page, "<html><body>" + str + "</body></html>")
+ return str
More information about the Pkg-mailman-hackers
mailing list