[Secure-testing-commits] r12984 - lib/python
Florian Weimer
fw at alioth.debian.org
Sun Oct 11 12:02:21 UTC 2009
Author: fw
Date: 2009-10-11 12:02:02 +0000 (Sun, 11 Oct 2009)
New Revision: 12984
Modified:
lib/python/web_support.py
Log:
lib/python/web_support.py (WebServiceHttp): implement HTTP invocation
Introduces flatten_later helper methods in Result objects.
Modified: lib/python/web_support.py
===================================================================
--- lib/python/web_support.py 2009-10-11 10:23:56 UTC (rev 12983)
+++ lib/python/web_support.py 2009-10-11 12:02:02 UTC (rev 12984)
@@ -26,6 +26,9 @@
import traceback
import types
import urllib
+import threading
+import SocketServer
+import BaseHTTPServer
class ServinvokeError(Exception):
pass
@@ -556,6 +559,13 @@
def flatten(self, write):
pass
+ def flatten_later(self):
+ """Flattens this result.
+
+ Returns a closure which sends the result using a
+ BaseHTTPRequestHandler object passed as argument."""
+ pass
+
class RedirectResult(Result):
"""Permanently redirects the browser to a new URL."""
def __init__(self, url, permanent=True):
@@ -568,6 +578,13 @@
def flatten(self, write):
write("Status: %d\nLocation: %s\n\n" % (self.status, self.url))
+ def flatten_later(self):
+ def later(req):
+ req.send_response(self.status)
+ req.send_header('Location', self.url)
+ req.end_headers()
+ return later
+
class HTMLResult(Result):
"""An object of this class combines a status code with HTML contents."""
def __init__(self, contents, status=200, doctype=''):
@@ -584,6 +601,19 @@
write("Content-Type: text/html\n\n%s\n" % self.doctype)
self.contents.flatten(write)
+ def flatten_later(self):
+ buf = cStringIO.StringIO()
+ buf.write(self.doctype)
+ buf.write('\n')
+ self.contents.flatten(buf.write)
+ buf = buf.getvalue()
+ def later(req):
+ req.send_response(self.status)
+ req.send_header('Content-Type', 'text/html; charset=UTF-8')
+ req.end_headers()
+ req.wfile.write(buf)
+ return later
+
class BinaryResult(Result):
"""An object of this class combines a status code with HTML contents."""
def __init__(self, contents, status=200,
@@ -599,6 +629,13 @@
write("Content-Type: %s\n\n" % self.mimetype)
write(self.contents)
+ def flatten_later(self):
+ def later(req):
+ req.send_response(self.status)
+ req.send_header('Content-Type', self.mimetype)
+ req.wfile.write(self.contents)
+ return later
+
class WebServiceBase:
def __init__(self):
self.router = PathRouter()
@@ -679,6 +716,80 @@
assert isinstance(r, Result), `r`
r.flatten(result.write)
+class ThreadingHTTPServer(BaseHTTPServer.HTTPServer,
+ SocketServer.ThreadingMixIn):
+ pass
+
+RE_BASE_URL = re.compile(r'^http://([^/]+)(.*)')
+
+class WebServiceHTTP(WebServiceBase):
+ def __init__(self, socket_name):
+ WebServiceBase.__init__(self)
+ (base_url, address, port) = socket_name
+ self.lock = threading.Lock()
+
+ self.__parse_base_url(base_url)
+
+ service_self = self
+ class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_GET(self):
+ (method, path, remaining, params) = self.route()
+ if path is None:
+ return
+
+ url = URLFactory(service_self.server_name,
+ service_self.script_name,
+ path, params)
+
+ service_self.lock.acquire()
+ try:
+ service_self.pre_dispatch()
+ r = method(remaining, params, url)
+ assert isinstance(r, Result), `r`
+ result = r.flatten_later()
+ finally:
+ service_self.lock.release()
+ result(self)
+
+ def __parse_path(self):
+ pos = self.path.find('?')
+ if pos < 0:
+ return (self.path, {})
+ path = self.path[:pos]
+ if path[:1] != '/':
+ path = '/' + path
+ params = cgi.parse_qs(self.path[pos + 1:])
+ return (path, params)
+
+ def route(self):
+ (path, params) = self.__parse_path()
+ prefix_len = len(service_self.script_name)
+ prefix = path[0:prefix_len]
+ result = None
+ if prefix == service_self.script_name:
+ suffix = path[prefix_len:]
+ try:
+ (method, remaining) = \
+ service_self.router.get(suffix)
+ return (method, suffix, remaining, params)
+ except InvalidPath:
+ pass
+ self.send_error(404, "page not found")
+ return (None, None, None, None)
+
+ self.server = ThreadingHTTPServer((address, port), Handler)
+
+ def run(self):
+ self.server.serve_forever()
+
+ def __parse_base_url(self, url):
+ m = RE_BASE_URL.match(url)
+ if m is None:
+ raise ValueError("invalid base URL: " + url)
+ self.server_name = m.group(1)
+ self.script_name = m.group(2)
+
+
def __test():
assert str(URL("")) == ""
assert str(URL("abc")) == "abc"
More information about the Secure-testing-commits
mailing list