[Python-modules-commits] r395 - in /packages/paste: ./ branches/
branches/upstream/
branches/upstream/current/ branches/upstream/current/paste/
branches/upstream/current/paste/debug/
branches/upstream/current/paste/util/ tags/
pox-guest at users.alioth.debian.org
pox-guest at users.alioth.debian.org
Thu May 4 15:55:45 UTC 2006
Author: pox-guest
Date: Thu May 4 15:55:44 2006
New Revision: 395
URL: http://svn.debian.org/wsvn/python-modules/?sc=1&rev=395
Log:
[svn-inject] Installing original source of paste
Added:
packages/paste/
packages/paste/branches/
packages/paste/branches/upstream/
packages/paste/branches/upstream/current/
packages/paste/branches/upstream/current/paste/
packages/paste/branches/upstream/current/paste/debug/
packages/paste/branches/upstream/current/paste/debug/doctest_webapp.py (with props)
packages/paste/branches/upstream/current/paste/util/
packages/paste/branches/upstream/current/paste/util/scgiserver.py
packages/paste/tags/
Added: packages/paste/branches/upstream/current/paste/debug/doctest_webapp.py
URL: http://svn.debian.org/wsvn/python-modules/packages/paste/branches/upstream/current/paste/debug/doctest_webapp.py?rev=395&op=file
==============================================================================
--- packages/paste/branches/upstream/current/paste/debug/doctest_webapp.py (added)
+++ packages/paste/branches/upstream/current/paste/debug/doctest_webapp.py Thu May 4 15:55:44 2006
@@ -1,0 +1,433 @@
+#!/usr/bin/env python2.4
+# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
+# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
+
+"""
+These are functions for use when doctest-testing a document.
+"""
+
+try:
+ import subprocess
+except ImportError:
+ from paste.util import subprocess24 as subprocess
+import doctest
+import os
+import sys
+import shutil
+import re
+import cgi
+import rfc822
+from cStringIO import StringIO
+from paste.util import PySourceColor
+
+
+here = os.path.abspath(__file__)
+paste_parent = os.path.dirname(
+ os.path.dirname(os.path.dirname(here)))
+
+def run(command):
+ data = run_raw(command)
+ if data:
+ print data
+
+def run_raw(command):
+ """
+ Runs the string command, returns any output.
+ """
+ proc = subprocess.Popen(command, shell=True,
+ stderr=subprocess.STDOUT,
+ stdout=subprocess.PIPE, env=_make_env())
+ data = proc.stdout.read()
+ proc.wait()
+ while data.endswith('\n') or data.endswith('\r'):
+ data = data[:-1]
+ if data:
+ data = '\n'.join(
+ [l for l in data.splitlines() if l])
+ return data
+ else:
+ return ''
+
+def run_command(command, name, and_print=False):
+ output = run_raw(command)
+ data = '$ %s\n%s' % (command, output)
+ show_file('shell-command', name, description='shell transcript',
+ data=data)
+ if and_print and output:
+ print output
+
+def _make_env():
+ env = os.environ.copy()
+ env['PATH'] = (env.get('PATH', '')
+ + ':'
+ + os.path.join(paste_parent, 'scripts')
+ + ':'
+ + os.path.join(paste_parent, 'paste', '3rd-party',
+ 'sqlobject-files', 'scripts'))
+ env['PYTHONPATH'] = (env.get('PYTHONPATH', '')
+ + ':'
+ + paste_parent)
+ return env
+
+def clear_dir(dir):
+ """
+ Clears (deletes) the given directory
+ """
+ shutil.rmtree(dir, True)
+
+def ls(dir=None, recurse=False, indent=0):
+ """
+ Show a directory listing
+ """
+ dir = dir or os.getcwd()
+ fns = os.listdir(dir)
+ fns.sort()
+ for fn in fns:
+ full = os.path.join(dir, fn)
+ if os.path.isdir(full):
+ fn = fn + '/'
+ print ' '*indent + fn
+ if os.path.isdir(full) and recurse:
+ ls(dir=full, recurse=True, indent=indent+2)
+
+default_app = None
+default_url = None
+
+def set_default_app(app, url):
+ global default_app
+ global default_url
+ default_app = app
+ default_url = url
+
+def resource_filename(fn):
+ """
+ Returns the filename of the resource -- generally in the directory
+ resources/DocumentName/fn
+ """
+ return os.path.join(
+ os.path.dirname(sys.testing_document_filename),
+ 'resources',
+ os.path.splitext(os.path.basename(sys.testing_document_filename))[0],
+ fn)
+
+def show(path_info, example_name):
+ fn = resource_filename(example_name + '.html')
+ out = StringIO()
+ assert default_app is not None, (
+ "No default_app set")
+ url = default_url + path_info
+ out.write('<span class="doctest-url"><a href="%s">%s</a></span><br>\n'
+ % (url, url))
+ out.write('<div class="doctest-example">\n')
+ proc = subprocess.Popen(
+ ['paster', 'serve' '--server=console', '--no-verbose',
+ '--url=' + path_info],
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ env=_make_env())
+ stdout, errors = proc.communicate()
+ stdout = StringIO(stdout)
+ headers = rfc822.Message(stdout)
+ content = stdout.read()
+ for header, value in headers.items():
+ if header.lower() == 'status' and int(value.split()[0]) == 200:
+ continue
+ if header.lower() in ('content-type', 'content-length'):
+ continue
+ if (header.lower() == 'set-cookie'
+ and value.startswith('_SID_')):
+ continue
+ out.write('<span class="doctest-header">%s: %s</span><br>\n'
+ % (header, value))
+ lines = [l for l in content.splitlines() if l.strip()]
+ for line in lines:
+ out.write(line + '\n')
+ if errors:
+ out.write('<pre class="doctest-errors">%s</pre>'
+ % errors)
+ out.write('</div>\n')
+ result = out.getvalue()
+ if not os.path.exists(fn):
+ f = open(fn, 'wb')
+ f.write(result)
+ f.close()
+ else:
+ f = open(fn, 'rb')
+ expected = f.read()
+ f.close()
+ if not html_matches(expected, result):
+ print 'Pages did not match. Expected from %s:' % fn
+ print '-'*60
+ print expected
+ print '='*60
+ print 'Actual output:'
+ print '-'*60
+ print result
+
+def html_matches(pattern, text):
+ return True
+ regex = re.escape(pattern)
+ regex = regex.replace(r'\.\.\.', '.*')
+ regex = re.sub(r'0x[0-9a-f]+', '.*', regex)
+ regex = '^%s$' % regex
+ return re.search(regex, text)
+
+def convert_docstring_string(data):
+ if data.startswith('\n'):
+ data = data[1:]
+ lines = data.splitlines()
+ new_lines = []
+ for line in lines:
+ if line.rstrip() == '.':
+ new_lines.append('')
+ else:
+ new_lines.append(line)
+ data = '\n'.join(new_lines) + '\n'
+ return data
+
+def create_file(path, version, data):
+ data = convert_docstring_string(data)
+ write_data(path, data)
+ show_file(path, version)
+
+def append_to_file(path, version, data):
+ data = convert_docstring_string(data)
+ f = open(path, 'a')
+ f.write(data)
+ f.close()
+ # I think these appends can happen so quickly (in less than a second)
+ # that the .pyc file doesn't appear to be expired, even though it
+ # is after we've made this change; so we have to get rid of the .pyc
+ # file:
+ if path.endswith('.py'):
+ pyc_file = path + 'c'
+ if os.path.exists(pyc_file):
+ os.unlink(pyc_file)
+ show_file(path, version, description='added to %s' % path,
+ data=data)
+
+def show_file(path, version, description=None, data=None):
+ ext = os.path.splitext(path)[1]
+ if data is None:
+ f = open(path, 'rb')
+ data = f.read()
+ f.close()
+ if ext == '.py':
+ html = ('<div class="source-code">%s</div>'
+ % PySourceColor.str2html(data, PySourceColor.dark))
+ else:
+ html = '<pre class="source-code">%s</pre>' % cgi.escape(data, 1)
+ html = '<span class="source-filename">%s</span><br>%s' % (
+ description or path, html)
+ write_data(resource_filename('%s.%s.gen.html' % (path, version)),
+ html)
+
+def call_source_highlight(input, format):
+ proc = subprocess.Popen(['source-highlight', '--out-format=html',
+ '--no-doc', '--css=none',
+ '--src-lang=%s' % format], shell=False,
+ stdout=subprocess.PIPE)
+ stdout, stderr = proc.communicate(input)
+ result = stdout
+ proc.wait()
+ return result
+
+
+def write_data(path, data):
+ dir = os.path.dirname(os.path.abspath(path))
+ if not os.path.exists(dir):
+ os.makedirs(dir)
+ f = open(path, 'wb')
+ f.write(data)
+ f.close()
+
+
+def change_file(path, changes):
+ f = open(os.path.abspath(path), 'rb')
+ lines = f.readlines()
+ f.close()
+ for change_type, line, text in changes:
+ if change_type == 'insert':
+ lines[line:line] = [text]
+ elif change_type == 'delete':
+ lines[line:text] = []
+ else:
+ assert 0, (
+ "Unknown change_type: %r" % change_type)
+ f = open(path, 'wb')
+ f.write(''.join(lines))
+ f.close()
+
+class LongFormDocTestParser(doctest.DocTestParser):
+
+ """
+ This parser recognizes some reST comments as commands, without
+ prompts or expected output, like:
+
+ .. run:
+
+ do_this(...
+ ...)
+ """
+
+ _EXAMPLE_RE = re.compile(r"""
+ # Source consists of a PS1 line followed by zero or more PS2 lines.
+ (?: (?P<source>
+ (?:^(?P<indent> [ ]*) >>> .*) # PS1 line
+ (?:\n [ ]* \.\.\. .*)*) # PS2 lines
+ \n?
+ # Want consists of any non-blank lines that do not start with PS1.
+ (?P<want> (?:(?![ ]*$) # Not a blank line
+ (?![ ]*>>>) # Not a line starting with PS1
+ .*$\n? # But any other line
+ )*))
+ |
+ (?: # This is for longer commands that are prefixed with a reST
+ # comment like '.. run:' (two colons makes that a directive).
+ # These commands cannot have any output.
+
+ (?:^\.\.[ ]*(?P<run>run):[ ]*\n) # Leading command/command
+ (?:[ ]*\n)? # Blank line following
+ (?P<runsource>
+ (?:(?P<runindent> [ ]+)[^ ].*$)
+ (?:\n [ ]+ .*)*)
+ )
+ |
+ (?: # This is for shell commands
+
+ (?P<shellsource>
+ (?:^(P<shellindent> [ ]*) [$] .*) # Shell line
+ (?:\n [ ]* [>] .*)*) # Continuation
+ \n?
+ # Want consists of any non-blank lines that do not start with $
+ (?P<shellwant> (?:(?![ ]*$)
+ (?![ ]*[$]$)
+ .*$\n?
+ )*))
+ """, re.MULTILINE | re.VERBOSE)
+
+ def _parse_example(self, m, name, lineno):
+ r"""
+ Given a regular expression match from `_EXAMPLE_RE` (`m`),
+ return a pair `(source, want)`, where `source` is the matched
+ example's source code (with prompts and indentation stripped);
+ and `want` is the example's expected output (with indentation
+ stripped).
+
+ `name` is the string's name, and `lineno` is the line number
+ where the example starts; both are used for error messages.
+
+ >>> def parseit(s):
+ ... p = LongFormDocTestParser()
+ ... return p._parse_example(p._EXAMPLE_RE.search(s), '<string>', 1)
+ >>> parseit('>>> 1\n1')
+ ('1', {}, '1', None)
+ >>> parseit('>>> (1\n... +1)\n2')
+ ('(1\n+1)', {}, '2', None)
+ >>> parseit('.. run:\n\n test1\n test2\n')
+ ('test1\ntest2', {}, '', None)
+ """
+ # Get the example's indentation level.
+ runner = m.group('run') or ''
+ indent = len(m.group('%sindent' % runner))
+
+ # Divide source into lines; check that they're properly
+ # indented; and then strip their indentation & prompts.
+ source_lines = m.group('%ssource' % runner).split('\n')
+ if runner:
+ self._check_prefix(source_lines[1:], ' '*indent, name, lineno)
+ else:
+ self._check_prompt_blank(source_lines, indent, name, lineno)
+ self._check_prefix(source_lines[2:], ' '*indent + '.', name, lineno)
+ if runner:
+ source = '\n'.join([sl[indent:] for sl in source_lines])
+ else:
+ source = '\n'.join([sl[indent+4:] for sl in source_lines])
+
+ if runner:
+ want = ''
+ exc_msg = None
+ else:
+ # Divide want into lines; check that it's properly indented; and
+ # then strip the indentation. Spaces before the last newline should
+ # be preserved, so plain rstrip() isn't good enough.
+ want = m.group('want')
+ want_lines = want.split('\n')
+ if len(want_lines) > 1 and re.match(r' *$', want_lines[-1]):
+ del want_lines[-1] # forget final newline & spaces after it
+ self._check_prefix(want_lines, ' '*indent, name,
+ lineno + len(source_lines))
+ want = '\n'.join([wl[indent:] for wl in want_lines])
+
+ # If `want` contains a traceback message, then extract it.
+ m = self._EXCEPTION_RE.match(want)
+ if m:
+ exc_msg = m.group('msg')
+ else:
+ exc_msg = None
+
+ # Extract options from the source.
+ options = self._find_options(source, name, lineno)
+
+ return source, options, want, exc_msg
+
+
+ def parse(self, string, name='<string>'):
+ """
+ Divide the given string into examples and intervening text,
+ and return them as a list of alternating Examples and strings.
+ Line numbers for the Examples are 0-based. The optional
+ argument `name` is a name identifying this string, and is only
+ used for error messages.
+ """
+ string = string.expandtabs()
+ # If all lines begin with the same indentation, then strip it.
+ min_indent = self._min_indent(string)
+ if min_indent > 0:
+ string = '\n'.join([l[min_indent:] for l in string.split('\n')])
+
+ output = []
+ charno, lineno = 0, 0
+ # Find all doctest examples in the string:
+ for m in self._EXAMPLE_RE.finditer(string):
+ # Add the pre-example text to `output`.
+ output.append(string[charno:m.start()])
+ # Update lineno (lines before this example)
+ lineno += string.count('\n', charno, m.start())
+ # Extract info from the regexp match.
+ (source, options, want, exc_msg) = \
+ self._parse_example(m, name, lineno)
+ # Create an Example, and add it to the list.
+ if not self._IS_BLANK_OR_COMMENT(source):
+ # @@: Erg, this is the only line I need to change...
+ output.append( doctest.Example(source, want, exc_msg,
+ lineno=lineno,
+ indent=min_indent+len(m.group('indent') or m.group('runindent')),
+ options=options) )
+ # Update lineno (lines inside this example)
+ lineno += string.count('\n', m.start(), m.end())
+ # Update charno.
+ charno = m.end()
+ # Add any remaining post-example text to `output`.
+ output.append(string[charno:])
+ return output
+
+
+
+if __name__ == '__main__':
+ if sys.argv[1:] and sys.argv[1] == 'doctest':
+ doctest.testmod()
+ sys.exit()
+ if not paste_parent in sys.path:
+ sys.path.append(paste_parent)
+ for fn in sys.argv[1:]:
+ fn = os.path.abspath(fn)
+ # @@: OK, ick; but this module gets loaded twice
+ sys.testing_document_filename = fn
+ doctest.testfile(
+ fn, module_relative=False,
+ optionflags=doctest.ELLIPSIS|doctest.REPORT_ONLY_FIRST_FAILURE,
+ parser=LongFormDocTestParser())
+ new = os.path.splitext(fn)[0] + '.html'
+ assert new != fn
+ os.system('rst2html.py %s > %s' % (fn, new))
Propchange: packages/paste/branches/upstream/current/paste/debug/doctest_webapp.py
------------------------------------------------------------------------------
svn:executable =
Added: packages/paste/branches/upstream/current/paste/util/scgiserver.py
URL: http://svn.debian.org/wsvn/python-modules/packages/paste/branches/upstream/current/paste/util/scgiserver.py?rev=395&op=file
==============================================================================
--- packages/paste/branches/upstream/current/paste/util/scgiserver.py (added)
+++ packages/paste/branches/upstream/current/paste/util/scgiserver.py Thu May 4 15:55:44 2006
@@ -1,0 +1,147 @@
+#! /usr/bin/env python
+"""
+SCGI-->WSGI application proxy, "SWAP".
+
+(Originally written by Titus Brown.)
+
+This lets an SCGI front-end like mod_scgi be used to execute WSGI
+application objects. To use it, subclass the SWAP class like so::
+
+ class TestAppHandler(swap.SWAP):
+ def __init__(self, *args, **kwargs):
+ self.prefix = '/canal'
+ self.app_obj = TestAppClass
+ swap.SWAP.__init__(self, *args, **kwargs)
+
+where 'TestAppClass' is the application object from WSGI and '/canal'
+is the prefix for what is served by the SCGI Web-server-side process.
+
+Then execute the SCGI handler "as usual" by doing something like this::
+
+ scgi_server.SCGIServer(TestAppHandler, port=4000).serve()
+
+and point mod_scgi (or whatever your SCGI front end is) at port 4000.
+
+Kudos to the WSGI folk for writing a nice PEP & the Quixote folk for
+writing a nice extensible SCGI server for Python!
+"""
+
+import sys
+import time
+import os
+from scgi import scgi_server
+
+def debug(msg):
+ timestamp = time.strftime("%Y-%m-%d %H:%M:%S",
+ time.localtime(time.time()))
+ sys.stderr.write("[%s] %s\n" % (timestamp, msg))
+
+class SWAP(scgi_server.SCGIHandler):
+ """
+ SCGI->WSGI application proxy: let an SCGI server execute WSGI
+ application objects.
+ """
+ app_obj = None
+ prefix = None
+
+ def __init__(self, *args, **kwargs):
+ assert self.app_obj, "must set app_obj"
+ assert self.prefix, "must set prefix"
+ args = (self,) + args
+ scgi_server.SCGIHandler.__init__(*args, **kwargs)
+
+ def handle_connection(self, conn):
+ """
+ Handle an individual connection.
+ """
+ input = conn.makefile("r")
+ output = conn.makefile("w")
+
+ environ = self.read_env(input)
+ environ['wsgi.input'] = input
+ environ['wsgi.errors'] = sys.stderr
+ environ['wsgi.version'] = (1,0)
+ environ['wsgi.multithread'] = False
+ environ['wsgi.multiprocess'] = True
+ environ['wsgi.run_once'] = False
+
+ # dunno how SCGI does HTTPS signalling; can't test it myself... @CTB
+ if environ.get('HTTPS','off') in ('on','1'):
+ environ['wsgi.url_scheme'] = 'https'
+ else:
+ environ['wsgi.url_scheme'] = 'http'
+
+ ## SCGI does some weird environ manglement. We need to set
+ ## SCRIPT_NAME from 'prefix' and then set PATH_INFO from
+ ## REQUEST_URI.
+
+ prefix = self.prefix
+ path = environ['REQUEST_URI'][len(prefix):].split('?', 1)[0]
+
+ environ['SCRIPT_NAME'] = prefix
+ environ['PATH_INFO'] = path
+
+ headers_set = []
+ headers_sent = []
+ chunks = []
+ def write(data):
+ chunks.append(data)
+
+ def start_response(status,response_headers,exc_info=None):
+ if exc_info:
+ try:
+ if headers_sent:
+ # Re-raise original exception if headers sent
+ raise exc_info[0], exc_info[1], exc_info[2]
+ finally:
+ exc_info = None # avoid dangling circular ref
+ elif headers_set:
+ raise AssertionError("Headers already set!")
+
+ headers_set[:] = [status,response_headers]
+ return write
+
+ ###
+
+ result = self.app_obj(environ, start_response)
+ try:
+ for data in result:
+ chunks.append(data)
+
+ # Before the first output, send the stored headers
+ if not headers_set:
+ # Error -- the app never called start_response
+ status = '500 Server Error'
+ response_headers = [('Content-type', 'text/html')]
+ chunks = ["XXX start_response never called"]
+ else:
+ status, response_headers = headers_sent[:] = headers_set
+
+ output.write('Status: %s\r\n' % status)
+ for header in response_headers:
+ output.write('%s: %s\r\n' % header)
+ output.write('\r\n')
+
+ for data in chunks:
+ output.write(data)
+ finally:
+ if hasattr(result,'close'):
+ result.close()
+
+ # SCGI backends use connection closing to signal 'fini'.
+ try:
+ input.close()
+ output.close()
+ conn.close()
+ except IOError, err:
+ debug("IOError while closing connection ignored: %s" % err)
+
+
+def serve_application (application, prefix, port):
+ class SCGIAppHandler(SWAP):
+ def __init__ (self, *args, **kwargs):
+ self.prefix = prefix
+ self.app_obj = application
+ SWAP.__init__(self, *args, **kwargs)
+
+ scgi_server.SCGIServer(SCGIAppHandler, port=port).serve()
More information about the Python-modules-commits
mailing list