[Python-modules-commits] [paste] 01/01: remove .orig files
Piotr Ożarowski
piotr at moszumanska.debian.org
Thu Mar 10 22:37:58 UTC 2016
This is an automated email from the git hooks/post-receive script.
piotr pushed a commit to branch master
in repository paste.
commit 3ed9f58b7347be18c989220f59cb74745fce68d7
Author: Piotr Ożarowski <piotr at debian.org>
Date: Thu Mar 10 23:10:31 2016 +0100
remove .orig files
---
debian/changelog | 2 +-
paste/httpserver.py.orig | 1431 -------------------------------------------
paste/urlmap.py.orig | 263 --------
paste/util/template.py.orig | 762 -----------------------
4 files changed, 1 insertion(+), 2457 deletions(-)
diff --git a/debian/changelog b/debian/changelog
index ce919b6..354bc11 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,4 +1,4 @@
-paste (2.0.3+dfsg-1) UNRELEASED; urgency=medium
+paste (2.0.3+dfsg-1) unstable; urgency=medium
* New upstream release
* Removed 0001-Py3k-fixes.patch (applied upstream)
diff --git a/paste/httpserver.py.orig b/paste/httpserver.py.orig
deleted file mode 100755
index 81e7c16..0000000
--- a/paste/httpserver.py.orig
+++ /dev/null
@@ -1,1431 +0,0 @@
-# (c) 2005 Ian Bicking and contributors; written for Paste (http://pythonpaste.org)
-# Licensed under the MIT license: http://www.opensource.org/licenses/mit-license.php
-# (c) 2005 Clark C. Evans
-# This module is part of the Python Paste Project and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-# This code was written with funding by http://prometheusresearch.com
-"""
-WSGI HTTP Server
-
-This is a minimalistic WSGI server using Python's built-in BaseHTTPServer;
-if pyOpenSSL is installed, it also provides SSL capabilities.
-"""
-
-# @@: add in protection against HTTP/1.0 clients who claim to
-# be 1.1 but do not send a Content-Length
-
-# @@: add support for chunked encoding, this is not a 1.1 server
-# till this is completed.
-
-from __future__ import print_function
-import atexit
-import traceback
-import socket, sys, threading
-import posixpath
-import six
-import time
-import os
-from itertools import count
-from six.moves import _thread
-from six.moves import queue
-from six.moves.BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
-from six.moves.socketserver import ThreadingMixIn
-from six.moves.urllib.parse import unquote, urlsplit
-from paste.util import converters
-import logging
-try:
- from paste.util import killthread
-except ImportError:
- # Not available, probably no ctypes
- killthread = None
-
-__all__ = ['WSGIHandlerMixin', 'WSGIServer', 'WSGIHandler', 'serve']
-__version__ = "0.5"
-
-
-def _get_headers(headers, k):
- """
- Private function for abstracting differences in getting HTTP request
- headers on Python 2 vs. Python 3
- """
-
- if hasattr(headers, 'get_all'):
- return headers.get_all(k) # Python 3 - email.message.Message
- else:
- return headers.getheaders(k) # Python 2 - mimetools.Message
-
-
-class ContinueHook(object):
- """
- When a client request includes a 'Expect: 100-continue' header, then
- it is the responsibility of the server to send 100 Continue when it
- is ready for the content body. This allows authentication, access
- levels, and other exceptions to be detected *before* bandwith is
- spent on the request body.
-
- This is a rfile wrapper that implements this functionality by
- sending 100 Continue to the client immediately after the user
- requests the content via a read() operation on the rfile stream.
- After this response is sent, it becomes a pass-through object.
- """
-
- def __init__(self, rfile, write):
- self._ContinueFile_rfile = rfile
- self._ContinueFile_write = write
- for attr in ('close', 'closed', 'fileno', 'flush',
- 'mode', 'bufsize', 'softspace'):
- if hasattr(rfile, attr):
- setattr(self, attr, getattr(rfile, attr))
- for attr in ('read', 'readline', 'readlines'):
- if hasattr(rfile, attr):
- setattr(self, attr, getattr(self, '_ContinueFile_' + attr))
-
- def _ContinueFile_send(self):
- self._ContinueFile_write("HTTP/1.1 100 Continue\r\n\r\n")
- rfile = self._ContinueFile_rfile
- for attr in ('read', 'readline', 'readlines'):
- if hasattr(rfile, attr):
- setattr(self, attr, getattr(rfile, attr))
-
- def _ContinueFile_read(self, size=-1):
- self._ContinueFile_send()
- return self._ContinueFile_rfile.read(size)
-
- def _ContinueFile_readline(self, size=-1):
- self._ContinueFile_send()
- return self._ContinueFile_rfile.readline(size)
-
- def _ContinueFile_readlines(self, sizehint=0):
- self._ContinueFile_send()
- return self._ContinueFile_rfile.readlines(sizehint)
-
-class WSGIHandlerMixin:
- """
- WSGI mix-in for HTTPRequestHandler
-
- This class is a mix-in to provide WSGI functionality to any
- HTTPRequestHandler derivative (as provided in Python's BaseHTTPServer).
- This assumes a ``wsgi_application`` handler on ``self.server``.
- """
- lookup_addresses = True
-
- def log_request(self, *args, **kwargs):
- """ disable success request logging
-
- Logging transactions should not be part of a WSGI server,
- if you want logging; look at paste.translogger
- """
- pass
-
- def log_message(self, *args, **kwargs):
- """ disable error message logging
-
- Logging transactions should not be part of a WSGI server,
- if you want logging; look at paste.translogger
- """
- pass
-
- def version_string(self):
- """ behavior that BaseHTTPServer should have had """
- if not self.sys_version:
- return self.server_version
- else:
- return self.server_version + ' ' + self.sys_version
-
- def wsgi_write_chunk(self, chunk):
- """
- Write a chunk of the output stream; send headers if they
- have not already been sent.
- """
- if not self.wsgi_headers_sent and not self.wsgi_curr_headers:
- raise RuntimeError(
- "Content returned before start_response called")
- if not self.wsgi_headers_sent:
- self.wsgi_headers_sent = True
- (status, headers) = self.wsgi_curr_headers
- code, message = status.split(" ", 1)
- self.send_response(int(code), message)
- #
- # HTTP/1.1 compliance; either send Content-Length or
- # signal that the connection is being closed.
- #
- send_close = True
- for (k, v) in headers:
- lk = k.lower()
- if 'content-length' == lk:
- send_close = False
- if 'connection' == lk:
- if 'close' == v.lower():
- self.close_connection = 1
- send_close = False
- self.send_header(k, v)
- if send_close:
- self.close_connection = 1
- self.send_header('Connection', 'close')
-
- self.end_headers()
- self.wfile.write(chunk)
-
- def wsgi_start_response(self, status, response_headers, exc_info=None):
- if exc_info:
- try:
- if self.wsgi_headers_sent:
- six.reraise(exc_info[0], exc_info[1], exc_info[2])
- else:
- # In this case, we're going to assume that the
- # higher-level code is currently handling the
- # issue and returning a resonable response.
- # self.log_error(repr(exc_info))
- pass
- finally:
- exc_info = None
- elif self.wsgi_curr_headers:
- assert 0, "Attempt to set headers a second time w/o an exc_info"
- self.wsgi_curr_headers = (status, response_headers)
- return self.wsgi_write_chunk
-
- def wsgi_setup(self, environ=None):
- """
- Setup the member variables used by this WSGI mixin, including
- the ``environ`` and status member variables.
-
- After the basic environment is created; the optional ``environ``
- argument can be used to override any settings.
- """
-
- dummy_url = 'http://dummy%s' % (self.path,)
- (scheme, netloc, path, query, fragment) = urlsplit(dummy_url)
- path = unquote(path)
- endslash = path.endswith('/')
- path = posixpath.normpath(path)
- if endslash and path != '/':
- # Put the slash back...
- path += '/'
- (server_name, server_port) = self.server.server_address[:2]
-
- rfile = self.rfile
- # We can put in the protection to keep from over-reading the
- # file
- try:
- content_length = int(self.headers.get('Content-Length', '0'))
- except ValueError:
- content_length = 0
- if '100-continue' == self.headers.get('Expect','').lower():
- rfile = LimitedLengthFile(ContinueHook(rfile, self.wfile.write), content_length)
- else:
- if not hasattr(self.connection, 'get_context'):
- # @@: LimitedLengthFile is currently broken in connection
- # with SSL (sporatic errors that are diffcult to trace, but
- # ones that go away when you don't use LimitedLengthFile)
- rfile = LimitedLengthFile(rfile, content_length)
-
- remote_address = self.client_address[0]
- self.wsgi_environ = {
- 'wsgi.version': (1,0)
- ,'wsgi.url_scheme': 'http'
- ,'wsgi.input': rfile
- ,'wsgi.errors': sys.stderr
- ,'wsgi.multithread': True
- ,'wsgi.multiprocess': False
- ,'wsgi.run_once': False
- # CGI variables required by PEP-333
- ,'REQUEST_METHOD': self.command
- ,'SCRIPT_NAME': '' # application is root of server
- ,'PATH_INFO': path
- ,'QUERY_STRING': query
- ,'CONTENT_TYPE': self.headers.get('Content-Type', '')
- ,'CONTENT_LENGTH': self.headers.get('Content-Length', '0')
- ,'SERVER_NAME': server_name
- ,'SERVER_PORT': str(server_port)
- ,'SERVER_PROTOCOL': self.request_version
- # CGI not required by PEP-333
- ,'REMOTE_ADDR': remote_address
- }
- if scheme:
- self.wsgi_environ['paste.httpserver.proxy.scheme'] = scheme
- if netloc:
- self.wsgi_environ['paste.httpserver.proxy.host'] = netloc
-
- if self.lookup_addresses:
- # @@: make lookup_addreses actually work, at this point
- # it has been address_string() is overriden down in
- # file and hence is a noop
- if remote_address.startswith("192.168.") \
- or remote_address.startswith("10.") \
- or remote_address.startswith("172.16."):
- pass
- else:
- address_string = None # self.address_string()
- if address_string:
- self.wsgi_environ['REMOTE_HOST'] = address_string
-
- if hasattr(self.server, 'thread_pool'):
- # Now that we know what the request was for, we should
- # tell the thread pool what its worker is working on
- self.server.thread_pool.worker_tracker[_thread.get_ident()][1] = self.wsgi_environ
- self.wsgi_environ['paste.httpserver.thread_pool'] = self.server.thread_pool
-
- for k, v in self.headers.items():
- key = 'HTTP_' + k.replace("-","_").upper()
- if key in ('HTTP_CONTENT_TYPE','HTTP_CONTENT_LENGTH'):
- continue
- self.wsgi_environ[key] = ','.join(_get_headers(self.headers, k))
-
- if hasattr(self.connection,'get_context'):
- self.wsgi_environ['wsgi.url_scheme'] = 'https'
- # @@: extract other SSL parameters from pyOpenSSL at...
- # http://www.modssl.org/docs/2.8/ssl_reference.html#ToC25
-
- if environ:
- assert isinstance(environ, dict)
- self.wsgi_environ.update(environ)
- if 'on' == environ.get('HTTPS'):
- self.wsgi_environ['wsgi.url_scheme'] = 'https'
-
- self.wsgi_curr_headers = None
- self.wsgi_headers_sent = False
-
- def wsgi_connection_drop(self, exce, environ=None):
- """
- Override this if you're interested in socket exceptions, such
- as when the user clicks 'Cancel' during a file download.
- """
- pass
-
- def wsgi_execute(self, environ=None):
- """
- Invoke the server's ``wsgi_application``.
- """
-
- self.wsgi_setup(environ)
-
- try:
- result = self.server.wsgi_application(self.wsgi_environ,
- self.wsgi_start_response)
- try:
- for chunk in result:
- self.wsgi_write_chunk(chunk)
- if not self.wsgi_headers_sent:
- self.wsgi_write_chunk('')
- finally:
- if hasattr(result,'close'):
- result.close()
- result = None
- except socket.error as exce:
- self.wsgi_connection_drop(exce, environ)
- return
- except:
- if not self.wsgi_headers_sent:
- error_msg = "Internal Server Error\n"
- self.wsgi_curr_headers = (
- '500 Internal Server Error',
- [('Content-type', 'text/plain'),
- ('Content-length', str(len(error_msg)))])
- self.wsgi_write_chunk("Internal Server Error\n")
- raise
-
-#
-# SSL Functionality
-#
-# This implementation was motivated by Sebastien Martini's SSL example
-# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/442473
-#
-try:
- from OpenSSL import SSL, tsafe
- SocketErrors = (socket.error, SSL.ZeroReturnError, SSL.SysCallError)
-except ImportError:
- # Do not require pyOpenSSL to be installed, but disable SSL
- # functionality in that case.
- SSL = None
- SocketErrors = (socket.error,)
- class SecureHTTPServer(HTTPServer):
- def __init__(self, server_address, RequestHandlerClass,
- ssl_context=None, request_queue_size=None):
- assert not ssl_context, "pyOpenSSL not installed"
- HTTPServer.__init__(self, server_address, RequestHandlerClass)
- if request_queue_size:
- self.socket.listen(request_queue_size)
-else:
-
- class _ConnFixer(object):
- """ wraps a socket connection so it implements makefile """
- def __init__(self, conn):
- self.__conn = conn
- def makefile(self, mode, bufsize):
- return socket._fileobject(self.__conn, mode, bufsize)
- def __getattr__(self, attrib):
- return getattr(self.__conn, attrib)
-
- class SecureHTTPServer(HTTPServer):
- """
- Provides SSL server functionality on top of the BaseHTTPServer
- by overriding _private_ members of Python's standard
- distribution. The interface for this instance only changes by
- adding a an optional ssl_context attribute to the constructor:
-
- cntx = SSL.Context(SSL.SSLv23_METHOD)
- cntx.use_privatekey_file("host.pem")
- cntx.use_certificate_file("host.pem")
-
- """
-
- def __init__(self, server_address, RequestHandlerClass,
- ssl_context=None, request_queue_size=None):
- # This overrides the implementation of __init__ in python's
- # SocketServer.TCPServer (which BaseHTTPServer.HTTPServer
- # does not override, thankfully).
- HTTPServer.__init__(self, server_address, RequestHandlerClass)
- self.socket = socket.socket(self.address_family,
- self.socket_type)
- self.ssl_context = ssl_context
- if ssl_context:
- class TSafeConnection(tsafe.Connection):
- def settimeout(self, *args):
- self._lock.acquire()
- try:
- return self._ssl_conn.settimeout(*args)
- finally:
- self._lock.release()
- def gettimeout(self):
- self._lock.acquire()
- try:
- return self._ssl_conn.gettimeout()
- finally:
- self._lock.release()
- self.socket = TSafeConnection(ssl_context, self.socket)
- self.server_bind()
- if request_queue_size:
- self.socket.listen(request_queue_size)
- self.server_activate()
-
- def get_request(self):
- # The default SSL request object does not seem to have a
- # ``makefile(mode, bufsize)`` method as expected by
- # Socketserver.StreamRequestHandler.
- (conn, info) = self.socket.accept()
- if self.ssl_context:
- conn = _ConnFixer(conn)
- return (conn, info)
-
- def _auto_ssl_context():
- import OpenSSL, random
- pkey = OpenSSL.crypto.PKey()
- pkey.generate_key(OpenSSL.crypto.TYPE_RSA, 768)
-
- cert = OpenSSL.crypto.X509()
-
- cert.set_serial_number(random.randint(0, sys.maxint))
- cert.gmtime_adj_notBefore(0)
- cert.gmtime_adj_notAfter(60 * 60 * 24 * 365)
- cert.get_subject().CN = '*'
- cert.get_subject().O = 'Dummy Certificate'
- cert.get_issuer().CN = 'Untrusted Authority'
- cert.get_issuer().O = 'Self-Signed'
- cert.set_pubkey(pkey)
- cert.sign(pkey, 'md5')
-
- ctx = SSL.Context(SSL.SSLv23_METHOD)
- ctx.use_privatekey(pkey)
- ctx.use_certificate(cert)
-
- return ctx
-
-class WSGIHandler(WSGIHandlerMixin, BaseHTTPRequestHandler):
- """
- A WSGI handler that overrides POST, GET and HEAD to delegate
- requests to the server's ``wsgi_application``.
- """
- server_version = 'PasteWSGIServer/' + __version__
-
- def handle_one_request(self):
- """Handle a single HTTP request.
-
- You normally don't need to override this method; see the class
- __doc__ string for information on how to handle specific HTTP
- commands such as GET and POST.
-
- """
- self.raw_requestline = self.rfile.readline()
- if not self.raw_requestline:
- self.close_connection = 1
- return
- if not self.parse_request(): # An error code has been sent, just exit
- return
- self.wsgi_execute()
-
- def handle(self):
- # don't bother logging disconnects while handling a request
- try:
- BaseHTTPRequestHandler.handle(self)
- except SocketErrors as exce:
- self.wsgi_connection_drop(exce)
-
- def address_string(self):
- """Return the client address formatted for logging.
-
- This is overridden so that no hostname lookup is done.
- """
- return ''
-
-class LimitedLengthFile(object):
- def __init__(self, file, length):
- self.file = file
- self.length = length
- self._consumed = 0
- if hasattr(self.file, 'seek'):
- self.seek = self._seek
-
- def __repr__(self):
- base_repr = repr(self.file)
- return base_repr[:-1] + ' length=%s>' % self.length
-
- def read(self, length=None):
- left = self.length - self._consumed
- if length is None:
- length = left
- else:
- length = min(length, left)
- # next two lines are hnecessary only if read(0) blocks
- if not left:
- return ''
- data = self.file.read(length)
- self._consumed += len(data)
- return data
-
- def readline(self, *args):
- max_read = self.length - self._consumed
- if len(args):
- max_read = min(args[0], max_read)
- data = self.file.readline(max_read)
- self._consumed += len(data)
- return data
-
- def readlines(self, hint=None):
- data = self.file.readlines(hint)
- for chunk in data:
- self._consumed += len(chunk)
- return data
-
- def __iter__(self):
- return self
-
- def next(self):
- if self.length - self._consumed <= 0:
- raise StopIteration
- return self.readline()
-
- ## Optional methods ##
-
- def _seek(self, place):
- self.file.seek(place)
- self._consumed = place
-
- def tell(self):
- if hasattr(self.file, 'tell'):
- return self.file.tell()
- else:
- return self._consumed
-
-class ThreadPool(object):
- """
- Generic thread pool with a queue of callables to consume.
-
- Keeps a notion of the status of its worker threads:
-
- idle: worker thread with nothing to do
-
- busy: worker thread doing its job
-
- hung: worker thread that's been doing a job for too long
-
- dying: a hung thread that has been killed, but hasn't died quite
- yet.
-
- zombie: what was a worker thread that we've tried to kill but
- isn't dead yet.
-
- At any time you can call track_threads, to get a dictionary with
- these keys and lists of thread_ids that fall in that status. All
- keys will be present, even if they point to emty lists.
-
- hung threads are threads that have been busy more than
- hung_thread_limit seconds. Hung threads are killed when they live
- longer than kill_thread_limit seconds. A thread is then
- considered dying for dying_limit seconds, if it is still alive
- after that it is considered a zombie.
-
- When there are no idle workers and a request comes in, another
- worker *may* be spawned. If there are less than spawn_if_under
- threads in the busy state, another thread will be spawned. So if
- the limit is 5, and there are 4 hung threads and 6 busy threads,
- no thread will be spawned.
-
- When there are more than max_zombie_threads_before_die zombie
- threads, a SystemExit exception will be raised, stopping the
- server. Use 0 or None to never raise this exception. Zombie
- threads *should* get cleaned up, but killing threads is no
- necessarily reliable. This is turned off by default, since it is
- only a good idea if you've deployed the server with some process
- watching from above (something similar to daemontools or zdaemon).
-
- Each worker thread only processes ``max_requests`` tasks before it
- dies and replaces itself with a new worker thread.
- """
-
-
- SHUTDOWN = object()
-
- def __init__(
- self, nworkers, name="ThreadPool", daemon=False,
- max_requests=100, # threads are killed after this many requests
- hung_thread_limit=30, # when a thread is marked "hung"
- kill_thread_limit=1800, # when you kill that hung thread
- dying_limit=300, # seconds that a kill should take to go into effect (longer than this and the thread is a "zombie")
- spawn_if_under=5, # spawn if there's too many hung threads
- max_zombie_threads_before_die=0, # when to give up on the process
- hung_check_period=100, # every 100 requests check for hung workers
- logger=None, # Place to log messages to
- error_email=None, # Person(s) to notify if serious problem occurs
- ):
- """
- Create thread pool with `nworkers` worker threads.
- """
- self.nworkers = nworkers
- self.max_requests = max_requests
- self.name = name
- self.queue = queue.Queue()
- self.workers = []
- self.daemon = daemon
- if logger is None:
- logger = logging.getLogger('paste.httpserver.ThreadPool')
- if isinstance(logger, six.string_types):
- logger = logging.getLogger(logger)
- self.logger = logger
- self.error_email = error_email
- self._worker_count = count()
-
- assert (not kill_thread_limit
- or kill_thread_limit >= hung_thread_limit), (
- "kill_thread_limit (%s) should be higher than hung_thread_limit (%s)"
- % (kill_thread_limit, hung_thread_limit))
- if not killthread:
- kill_thread_limit = 0
- self.logger.info(
- "Cannot use kill_thread_limit as ctypes/killthread is not available")
- self.kill_thread_limit = kill_thread_limit
- self.dying_limit = dying_limit
- self.hung_thread_limit = hung_thread_limit
- assert spawn_if_under <= nworkers, (
- "spawn_if_under (%s) should be less than nworkers (%s)"
- % (spawn_if_under, nworkers))
- self.spawn_if_under = spawn_if_under
- self.max_zombie_threads_before_die = max_zombie_threads_before_die
- self.hung_check_period = hung_check_period
- self.requests_since_last_hung_check = 0
- # Used to keep track of what worker is doing what:
- self.worker_tracker = {}
- # Used to keep track of the workers not doing anything:
- self.idle_workers = []
- # Used to keep track of threads that have been killed, but maybe aren't dead yet:
- self.dying_threads = {}
- # This is used to track when we last had to add idle workers;
- # we shouldn't cull extra workers until some time has passed
- # (hung_thread_limit) since workers were added:
- self._last_added_new_idle_workers = 0
- if not daemon:
- atexit.register(self.shutdown)
- for i in range(self.nworkers):
- self.add_worker_thread(message='Initial worker pool')
-
- def add_task(self, task):
- """
- Add a task to the queue
- """
- self.logger.debug('Added task (%i tasks queued)', self.queue.qsize())
- if self.hung_check_period:
- self.requests_since_last_hung_check += 1
- if self.requests_since_last_hung_check > self.hung_check_period:
- self.requests_since_last_hung_check = 0
- self.kill_hung_threads()
- if not self.idle_workers and self.spawn_if_under:
- # spawn_if_under can come into effect...
- busy = 0
- now = time.time()
- self.logger.debug('No idle workers for task; checking if we need to make more workers')
- for worker in self.workers:
- if not hasattr(worker, 'thread_id'):
- # Not initialized
- continue
- time_started, info = self.worker_tracker.get(worker.thread_id,
- (None, None))
- if time_started is not None:
- if now - time_started < self.hung_thread_limit:
- busy += 1
- if busy < self.spawn_if_under:
- self.logger.info(
- 'No idle tasks, and only %s busy tasks; adding %s more '
- 'workers', busy, self.spawn_if_under-busy)
- self._last_added_new_idle_workers = time.time()
- for i in range(self.spawn_if_under - busy):
- self.add_worker_thread(message='Response to lack of idle workers')
- else:
- self.logger.debug(
- 'No extra workers needed (%s busy workers)',
- busy)
- if (len(self.workers) > self.nworkers
- and len(self.idle_workers) > 3
- and time.time()-self._last_added_new_idle_workers > self.hung_thread_limit):
- # We've spawned worers in the past, but they aren't needed
- # anymore; kill off some
- self.logger.info(
- 'Culling %s extra workers (%s idle workers present)',
- len(self.workers)-self.nworkers, len(self.idle_workers))
- self.logger.debug(
- 'Idle workers: %s', self.idle_workers)
- for i in range(len(self.workers) - self.nworkers):
- self.queue.put(self.SHUTDOWN)
- self.queue.put(task)
-
- def track_threads(self):
- """
- Return a dict summarizing the threads in the pool (as
- described in the ThreadPool docstring).
- """
- result = dict(idle=[], busy=[], hung=[], dying=[], zombie=[])
- now = time.time()
- for worker in self.workers:
- if not hasattr(worker, 'thread_id'):
- # The worker hasn't fully started up, we should just
- # ignore it
- continue
- time_started, info = self.worker_tracker.get(worker.thread_id,
- (None, None))
- if time_started is not None:
- if now - time_started > self.hung_thread_limit:
- result['hung'].append(worker)
- else:
- result['busy'].append(worker)
- else:
- result['idle'].append(worker)
- for thread_id, (time_killed, worker) in self.dying_threads.items():
- if not self.thread_exists(thread_id):
- # Cull dying threads that are actually dead and gone
- self.logger.info('Killed thread %s no longer around',
- thread_id)
- try:
- del self.dying_threads[thread_id]
- except KeyError:
- pass
- continue
- if now - time_killed > self.dying_limit:
- result['zombie'].append(worker)
- else:
- result['dying'].append(worker)
- return result
-
- def kill_worker(self, thread_id):
- """
- Removes the worker with the given thread_id from the pool, and
- replaces it with a new worker thread.
-
- This should only be done for mis-behaving workers.
- """
- if killthread is None:
- raise RuntimeError(
- "Cannot kill worker; killthread/ctypes not available")
- thread_obj = threading._active.get(thread_id)
- killthread.async_raise(thread_id, SystemExit)
- try:
- del self.worker_tracker[thread_id]
- except KeyError:
- pass
- self.logger.info('Killing thread %s', thread_id)
- if thread_obj in self.workers:
- self.workers.remove(thread_obj)
- self.dying_threads[thread_id] = (time.time(), thread_obj)
- self.add_worker_thread(message='Replacement for killed thread %s' % thread_id)
-
- def thread_exists(self, thread_id):
- """
- Returns true if a thread with this id is still running
- """
- return thread_id in threading._active
-
- def add_worker_thread(self, *args, **kwargs):
- index = six.next(self._worker_count)
- worker = threading.Thread(target=self.worker_thread_callback,
- args=args, kwargs=kwargs,
- name=("worker %d" % index))
- worker.setDaemon(self.daemon)
- worker.start()
-
- def kill_hung_threads(self):
- """
- Tries to kill any hung threads
- """
- if not self.kill_thread_limit:
- # No killing should occur
- return
- now = time.time()
- max_time = 0
- total_time = 0
- idle_workers = 0
- starting_workers = 0
- working_workers = 0
- killed_workers = 0
- for worker in self.workers:
- if not hasattr(worker, 'thread_id'):
- # Not setup yet
- starting_workers += 1
- continue
- time_started, info = self.worker_tracker.get(worker.thread_id,
- (None, None))
- if time_started is None:
- # Must be idle
- idle_workers += 1
- continue
- working_workers += 1
- max_time = max(max_time, now-time_started)
- total_time += now-time_started
- if now - time_started > self.kill_thread_limit:
- self.logger.warning(
- 'Thread %s hung (working on task for %i seconds)',
- worker.thread_id, now - time_started)
- try:
- import pprint
- info_desc = pprint.pformat(info)
- except:
- out = six.StringIO()
- traceback.print_exc(file=out)
- info_desc = 'Error:\n%s' % out.getvalue()
- self.notify_problem(
- "Killing worker thread (id=%(thread_id)s) because it has been \n"
- "working on task for %(time)s seconds (limit is %(limit)s)\n"
- "Info on task:\n"
- "%(info)s"
- % dict(thread_id=worker.thread_id,
- time=now - time_started,
- limit=self.kill_thread_limit,
- info=info_desc))
- self.kill_worker(worker.thread_id)
- killed_workers += 1
- if working_workers:
- ave_time = float(total_time) / working_workers
- ave_time = '%.2fsec' % ave_time
- else:
- ave_time = 'N/A'
- self.logger.info(
- "kill_hung_threads status: %s threads (%s working, %s idle, %s starting) "
- "ave time %s, max time %.2fsec, killed %s workers"
- % (idle_workers + starting_workers + working_workers,
- working_workers, idle_workers, starting_workers,
- ave_time, max_time, killed_workers))
- self.check_max_zombies()
-
- def check_max_zombies(self):
- """
- Check if we've reached max_zombie_threads_before_die; if so
- then kill the entire process.
- """
- if not self.max_zombie_threads_before_die:
- return
- found = []
- now = time.time()
- for thread_id, (time_killed, worker) in self.dying_threads.items():
- if not self.thread_exists(thread_id):
- # Cull dying threads that are actually dead and gone
- try:
- del self.dying_threads[thread_id]
- except KeyError:
- pass
- continue
- if now - time_killed > self.dying_limit:
- found.append(thread_id)
- if found:
- self.logger.info('Found %s zombie threads', found)
- if len(found) > self.max_zombie_threads_before_die:
- self.logger.fatal(
- 'Exiting process because %s zombie threads is more than %s limit',
- len(found), self.max_zombie_threads_before_die)
- self.notify_problem(
- "Exiting process because %(found)s zombie threads "
- "(more than limit of %(limit)s)\n"
- "Bad threads (ids):\n"
- " %(ids)s\n"
- % dict(found=len(found),
- limit=self.max_zombie_threads_before_die,
- ids="\n ".join(map(str, found))),
- subject="Process restart (too many zombie threads)")
- self.shutdown(10)
- print('Shutting down', threading.currentThread())
- raise ServerExit(3)
-
- def worker_thread_callback(self, message=None):
- """
- Worker thread should call this method to get and process queued
- callables.
- """
- thread_obj = threading.currentThread()
- thread_id = thread_obj.thread_id = _thread.get_ident()
- self.workers.append(thread_obj)
- self.idle_workers.append(thread_id)
- requests_processed = 0
- add_replacement_worker = False
- self.logger.debug('Started new worker %s: %s', thread_id, message)
- try:
- while True:
- if self.max_requests and self.max_requests < requests_processed:
- # Replace this thread then die
- self.logger.debug('Thread %s processed %i requests (limit %s); stopping thread'
- % (thread_id, requests_processed, self.max_requests))
- add_replacement_worker = True
- break
- runnable = self.queue.get()
- if runnable is ThreadPool.SHUTDOWN:
- self.logger.debug('Worker %s asked to SHUTDOWN', thread_id)
- break
- try:
- self.idle_workers.remove(thread_id)
- except ValueError:
- pass
- self.worker_tracker[thread_id] = [time.time(), None]
- requests_processed += 1
- try:
- try:
- runnable()
- except:
- # We are later going to call sys.exc_clear(),
- # removing all remnants of any exception, so
- # we should log it now. But ideally no
- # exception should reach this level
- print('Unexpected exception in worker %r' % runnable,
- file=sys.stderr)
- traceback.print_exc()
- if thread_id in self.dying_threads:
- # That last exception was intended to kill me
- break
- finally:
- try:
- del self.worker_tracker[thread_id]
- except KeyError:
- pass
- if six.PY2:
- sys.exc_clear()
- self.idle_workers.append(thread_id)
- finally:
- try:
- del self.worker_tracker[thread_id]
- except KeyError:
- pass
- try:
- self.idle_workers.remove(thread_id)
- except ValueError:
- pass
- try:
- self.workers.remove(thread_obj)
- except ValueError:
- pass
- try:
- del self.dying_threads[thread_id]
- except KeyError:
- pass
- if add_replacement_worker:
- self.add_worker_thread(message='Voluntary replacement for thread %s' % thread_id)
-
- def shutdown(self, force_quit_timeout=0):
- """
- Shutdown the queue (after finishing any pending requests).
- """
- self.logger.info('Shutting down threadpool')
- # Add a shutdown request for every worker
- for i in range(len(self.workers)):
- self.queue.put(ThreadPool.SHUTDOWN)
- # Wait for each thread to terminate
- hung_workers = []
- for worker in self.workers:
- worker.join(0.5)
- if worker.isAlive():
- hung_workers.append(worker)
- zombies = []
- for thread_id in self.dying_threads:
- if self.thread_exists(thread_id):
- zombies.append(thread_id)
- if hung_workers or zombies:
- self.logger.info("%s workers didn't stop properly, and %s zombies",
- len(hung_workers), len(zombies))
- if hung_workers:
- for worker in hung_workers:
- self.kill_worker(worker.thread_id)
- self.logger.info('Workers killed forcefully')
- if force_quit_timeout:
- timed_out = False
- need_force_quit = bool(zombies)
- for worker in self.workers:
- if not timed_out and worker.isAlive():
- timed_out = True
- worker.join(force_quit_timeout)
- if worker.isAlive():
- print("Worker %s won't die" % worker)
- need_force_quit = True
- if need_force_quit:
- import atexit
- # Remove the threading atexit callback
... 1496 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/paste.git
More information about the Python-modules-commits
mailing list