[Python-modules-commits] [python-django-channels] 02/07: New upstream version 1.1.8.1
Michael Fladischer
fladi at moszumanska.debian.org
Mon Jan 8 17:17:24 UTC 2018
This is an automated email from the git hooks/post-receive script.
fladi pushed a commit to branch debian/master
in repository python-django-channels.
commit 94a0fbea0ef4c858e24285f6328ed15e59e72e15
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date: Mon Jan 8 17:45:41 2018 +0100
New upstream version 1.1.8.1
---
channels/management/commands/runworker.py | 5 +
channels/test/websocket.py | 5 +-
channels/worker.py | 22 ++-
docs/generics.rst | 8 +
docs/getting-started.rst | 4 +-
patchinator.py | 259 ------------------------------
tests/test_worker.py | 38 ++++-
tests/test_wsclient.py | 4 +-
8 files changed, 79 insertions(+), 266 deletions(-)
diff --git a/channels/management/commands/runworker.py b/channels/management/commands/runworker.py
index e8366e6..d0dd7ae 100644
--- a/channels/management/commands/runworker.py
+++ b/channels/management/commands/runworker.py
@@ -34,6 +34,10 @@ class Command(BaseCommand):
default=1, type=int,
help='Number of threads to execute.'
)
+ parser.add_argument(
+ '--disable-graceful-stop', action='store_false', dest='stop_gracefully',
+ help='Disable graceful stop for a worker group (instant exit on termination signal).',
+ )
def handle(self, *args, **options):
# Get the backend to use
@@ -68,6 +72,7 @@ class Command(BaseCommand):
self.logger.info("Using multi-threaded worker, {} thread(s).".format(self.n_threads))
worker_cls = WorkerGroup
worker_kwargs['n_threads'] = self.n_threads
+ worker_kwargs['stop_gracefully'] = self.options.get('stop_gracefully')
# Run the worker
self.logger.info("Running worker against channel layer %s", self.channel_layer)
try:
diff --git a/channels/test/websocket.py b/channels/test/websocket.py
index 4b762ce..c260bd8 100644
--- a/channels/test/websocket.py
+++ b/channels/test/websocket.py
@@ -82,6 +82,9 @@ class WSClient(Client):
self.channel_layer.send(to, self._get_content(content, text, path))
self._session_cookie = False
+ def _list_headers(self):
+ return [[key.encode(), self.headers[key]] for key in self.headers]
+
def _get_content(self, content={}, text=None, path='/'):
content = copy.deepcopy(content)
content.setdefault('reply_channel', self.reply_channel)
@@ -93,7 +96,7 @@ class WSClient(Client):
else:
content.setdefault('path', path)
- content.setdefault('headers', self.headers)
+ content.setdefault('headers', self._list_headers())
if self._ordered:
if 'order' in content:
diff --git a/channels/worker.py b/channels/worker.py
index 1eacd67..ec1a9ad 100644
--- a/channels/worker.py
+++ b/channels/worker.py
@@ -149,6 +149,8 @@ class Worker(object):
finally:
# Send consumer finished so DB conns close etc.
consumer_finished.send(sender=self.__class__)
+ # Reset the flag to have a correct value after finishing the run loop
+ self.in_job = False
class WorkerGroup(Worker):
@@ -159,12 +161,21 @@ class WorkerGroup(Worker):
def __init__(self, *args, **kwargs):
n_threads = kwargs.pop('n_threads', multiprocessing.cpu_count()) - 1
+ self.stop_gracefully = kwargs.pop('stop_gracefully', False)
super(WorkerGroup, self).__init__(*args, **kwargs)
kwargs['signal_handlers'] = False
self.workers = [Worker(*args, **kwargs) for ii in range(n_threads)]
def sigterm_handler(self, signo, stack_frame):
- logger.info("Shutdown signal received by WorkerGroup, terminating immediately.")
+ self.termed = True
+ for worker in self.workers:
+ worker.termed = True
+ msg_prefix = "Shutdown signal received by WorkerGroup, "
+ if self.stop_gracefully:
+ logger.info(msg_prefix + "waiting for sub-workers.")
+ self.wait_for_workers()
+ else:
+ logger.info(msg_prefix + "terminating immediately.")
sys.exit(0)
def ready(self):
@@ -182,3 +193,12 @@ class WorkerGroup(Worker):
t.daemon = True
t.start()
super(WorkerGroup, self).run()
+ if self.stop_gracefully:
+ self.wait_for_workers()
+
+ def wait_for_workers(self):
+ while self.in_job or any(
+ self.threads[worker_id].is_alive() and
+ self.workers[worker_id].in_job
+ for worker_id in range(len(self.workers))):
+ time.sleep(0.1)
diff --git a/docs/generics.rst b/docs/generics.rst
index 17768da..5270b78 100644
--- a/docs/generics.rst
+++ b/docs/generics.rst
@@ -266,6 +266,14 @@ And if you just want to use the user from the django session, add ``http_user``:
This will give you ``message.user``, which will be the same as ``request.user``
would be on a regular View.
+And if you want to use both user and session from the django session, add ``http_user_and_session``::
+
+ class MyConsumer(WebsocketConsumer):
+
+ http_user_and_session = True
+
+This will give you ``message.user`` and ``message.http_session``.
+
Applying Decorators
-------------------
diff --git a/docs/getting-started.rst b/docs/getting-started.rst
index 77a374e..35cca58 100644
--- a/docs/getting-started.rst
+++ b/docs/getting-started.rst
@@ -445,7 +445,7 @@ Now, of course, a WebSocket solution is somewhat limited in scope without the
ability to live with the rest of your website - in particular, we want to make
sure we know what user we're talking to, in case we have things like private
chat channels (we don't want a solution where clients just ask for the right
-channels, as anyone could change the code and just put in private channel names)
+channels, as anyone could change the code and just put in private channel names).
It can also save you having to manually make clients ask for what they want to
see; if I see you open a WebSocket to my "updates" endpoint, and I know which
@@ -559,7 +559,7 @@ from hosts listed in the ``ALLOWED_HOSTS`` setting::
from channels import Channel, Group
from channels.sessions import channel_session
from channels.auth import channel_session_user, channel_session_user_from_http
- from channels.security.websockets import allowed_hosts_only.
+ from channels.security.websockets import allowed_hosts_only
# Connected to websocket.connect
@allowed_hosts_only
diff --git a/patchinator.py b/patchinator.py
deleted file mode 100644
index 661a659..0000000
--- a/patchinator.py
+++ /dev/null
@@ -1,259 +0,0 @@
-#!/usr/bin/python
-"""
-Script that automatically generates a Django patch from the Channels codebase
-based on some simple rules and string replacements.
-
-Once Channels lands in Django, will be reversed to instead generate this
-third-party app from the now-canonical Django source.
-"""
-
-import re
-import os.path
-import sys
-
-from isort import SortImports
-
-
-# Transforms: Turn one content string into another
-
-class Replacement(object):
- """
- Represents a string replacement in a file; uses a regular expression to
- substitute strings in the file.
- """
-
- def __init__(self, match, sub, regex=True):
- self.match = match
- self.sub = sub
- self.regex = regex
-
- def __call__(self, value):
- if self.regex:
- return re.sub(self.match, self.sub, value)
- else:
- return value.replace(self.match, self.sub)
-
-
-class Insert(object):
- """
- Inserts a string before/after another in a file, one time only, with multiline match.
- """
-
- def __init__(self, match, to_insert, after=False):
- self.match = match
- self.to_insert = to_insert
- self.after = after
-
- def __call__(self, value):
- match = re.search(self.match, value, flags=re.MULTILINE)
- if not match:
- raise ValueError("Could not find match %s" % self.match)
- if self.after:
- return value[:match.end()] + self.to_insert + value[match.end():]
- else:
- return value[:match.start()] + self.to_insert + value[match.start():]
-
-
-class Isort(object):
- """
- Runs isort on the file
- """
-
- def __call__(self, value):
- return SortImports(file_contents=value).output
-
-
-# Operations: Copy or patch files
-
-class FileMap(object):
- """
- Represents a file map from the source to the destination, with
- optional extra regex transforms.
- """
-
- def __init__(self, source_path, dest_path, transforms, makedirs=True):
- self.source_path = source_path
- self.dest_path = dest_path
- self.transforms = transforms
- self.makedirs = makedirs
-
- def run(self, source_dir, dest_dir):
- print("COPY: %s -> %s" % (self.source_path, self.dest_path))
- # Open and read in source file
- source = os.path.join(source_dir, self.source_path)
- with open(source, "r") as fh:
- content = fh.read()
- # Run transforms
- for transform in self.transforms:
- content = transform(content)
- # Save new file
- dest = os.path.join(dest_dir, self.dest_path)
- if self.makedirs:
- if not os.path.isdir(os.path.dirname(dest)):
- os.makedirs(os.path.dirname(dest))
- with open(dest, "w") as fh:
- fh.write(content)
-
-
-class NewFile(object):
- """
- Writes a file to the destination, either blank or with some content from
- a string.
- """
-
- def __init__(self, dest_path, content=""):
- self.dest_path = dest_path
- self.content = content
-
- def run(self, source_dir, dest_dir):
- print("NEW: %s" % (self.dest_path, ))
- # Save new file
- dest = os.path.join(dest_dir, self.dest_path)
- with open(dest, "w") as fh:
- fh.write(self.content)
-
-
-# Main class and config
-
-
-global_transforms = [
- Replacement(r"import channels.([a-zA-Z0-9_\.]+)$", r"import django.channels.\1 as channels"),
- Replacement(r"from channels import", r"from django.channels import"),
- Replacement(r"from channels.([a-zA-Z0-9_\.]+) import", r"from django.channels.\1 import"),
- Replacement(r"from .handler import", r"from django.core.handlers.asgi import"),
- Replacement(r"from django.channels.test import", r"from django.test.channels import"),
- Replacement(r"from django.channels.handler import", r"from django.core.handlers.asgi import"),
- Replacement(r"tests.test_routing", r"channels_tests.test_routing"),
- Replacement(r"django.core.urlresolvers", r"django.urls"),
-]
-
-python_transforms = global_transforms + [
- Isort(),
-]
-
-docs_transforms = global_transforms + [
- Replacement(r"<concepts>`", r"</topics/channels/concepts>`"),
- Replacement(r":doc:`concepts`", r":doc:`/topics/channels/concepts`"),
- Replacement(r":doc:`deploying`", r":doc:`/topics/channels/deploying`"),
- Replacement(r":doc:`scaling`", r":doc:`/topics/channels/scaling`"),
- Replacement(r":doc:`getting-started`", r":doc:`/intro/channels`"),
- Replacement(r"<backends>`", r"</ref/channels/backends>`"),
- Replacement(r":doc:`backends`", r":doc:`/ref/channels/backends`"),
- Replacement(r":doc:`([\w\d\s]+) <asgi>`", r"`\1 <https://channels.readthedocs.io/en/latest/asgi.html>`_"),
- Replacement(r"\n\(.*installation>`\)\n", r""),
- Replacement(r":doc:`installed Channels correctly <installation>`", r"added the channel layer setting"),
- Replacement(r"Channels", r"channels"),
- Replacement(r"Started with channels", r"Started with Channels"),
- Replacement(r"Running with channels", r"Running with Channels"),
- Replacement(r"channels consumers", r"channel consumers"),
- Replacement(r"channels' design", r"The channels design"),
- Replacement(r"channels is being released", r"Channels is being released"),
- Replacement(r"channels is", r"channels are"),
- Replacement(r"channels provides a", r"Channels provides a"),
- Replacement(r"channels can use", r"Channels can use"),
- Replacement(r"channels Concepts", r"Channels Concepts"),
- Replacement(r"channels works", r"channels work"),
-]
-
-
-class Patchinator(object):
-
- operations = [
- FileMap(
- "channels/asgi.py", "django/channels/asgi.py", python_transforms + [
- Replacement("if django.VERSION[1] > 9:\n django.setup(set_prefix=False)\n else:\n django.setup()", "django.setup(set_prefix=False)", regex=False), # NOQA
- ],
- ),
- FileMap(
- "channels/auth.py", "django/channels/auth.py", python_transforms,
- ),
- FileMap(
- "channels/channel.py", "django/channels/channel.py", python_transforms,
- ),
- FileMap(
- "channels/exceptions.py", "django/channels/exceptions.py", python_transforms,
- ),
- FileMap(
- "channels/handler.py", "django/core/handlers/asgi.py", python_transforms,
- ),
- FileMap(
- "channels/routing.py", "django/channels/routing.py", python_transforms,
- ),
- FileMap(
- "channels/message.py", "django/channels/message.py", python_transforms,
- ),
- FileMap(
- "channels/sessions.py", "django/channels/sessions.py", python_transforms,
- ),
- FileMap(
- "channels/staticfiles.py", "django/contrib/staticfiles/consumers.py", python_transforms,
- ),
- FileMap(
- "channels/utils.py", "django/channels/utils.py", python_transforms,
- ),
- FileMap(
- "channels/worker.py", "django/channels/worker.py", python_transforms,
- ),
- FileMap(
- "channels/management/commands/runworker.py",
- "django/core/management/commands/runworker.py",
- python_transforms,
- ),
- # Tests
- FileMap(
- "channels/test/base.py", "django/test/channels.py", python_transforms,
- ),
- NewFile(
- "tests/channels_tests/__init__.py",
- ),
- FileMap(
- "tests/test_handler.py", "tests/channels_tests/test_handler.py", python_transforms,
- ),
- FileMap(
- "tests/test_routing.py", "tests/channels_tests/test_routing.py", python_transforms,
- ),
- FileMap(
- "tests/test_request.py", "tests/channels_tests/test_request.py", python_transforms,
- ),
- FileMap(
- "tests/test_sessions.py", "tests/channels_tests/test_sessions.py", python_transforms,
- ),
- # Docs
- FileMap(
- "docs/backends.rst", "docs/ref/channels/backends.txt", docs_transforms,
- ),
- FileMap(
- "docs/concepts.rst", "docs/topics/channels/concepts.txt", docs_transforms,
- ),
- FileMap(
- "docs/deploying.rst", "docs/topics/channels/deploying.txt", docs_transforms,
- ),
- FileMap(
- "docs/getting-started.rst", "docs/intro/channels.txt", docs_transforms,
- ),
- FileMap(
- "docs/reference.rst", "docs/ref/channels/api.txt", docs_transforms,
- ),
- FileMap(
- "docs/testing.rst", "docs/topics/channels/testing.txt", docs_transforms,
- ),
- FileMap(
- "docs/cross-compat.rst", "docs/topics/channels/cross-compat.txt", docs_transforms,
- ),
- ]
-
- def __init__(self, source, destination):
- self.source = os.path.abspath(source)
- self.destination = os.path.abspath(destination)
-
- def run(self):
- print("Patchinator running.\n Source: %s\n Destination: %s" % (self.source, self.destination))
- for operation in self.operations:
- operation.run(self.source, self.destination)
-
-
-if __name__ == '__main__':
- try:
- Patchinator(os.path.dirname(__file__), sys.argv[1]).run()
- except IndexError:
- print("Supply the target Django directory on the command line")
diff --git a/tests/test_worker.py b/tests/test_worker.py
index a8975fd..745336f 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -1,10 +1,13 @@
from __future__ import unicode_literals
+import time
+import threading
+
from channels import DEFAULT_CHANNEL_LAYER, Channel, route
from channels.asgi import channel_layers
from channels.exceptions import ConsumeLater
from channels.test import ChannelTestCase
-from channels.worker import Worker
+from channels.worker import Worker, WorkerGroup
try:
from unittest import mock
@@ -93,3 +96,36 @@ class WorkerTests(ChannelTestCase):
worker.run()
self.assertEqual(consumer.call_count, 1)
self.assertEqual(channel_layer.send.call_count, 0)
+
+
+class WorkerGroupTests(ChannelTestCase):
+
+ def test_graceful_stop(self):
+
+ callback_is_running = threading.Event()
+
+ def callback(channel, message):
+ callback_is_running.set()
+ # emulate some delay to validate graceful stop
+ time.sleep(0.1)
+ callback_is_running.clear()
+
+ consumer = mock.Mock()
+ Channel('test').send({'test': 'test'}, immediately=True)
+ channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
+ channel_layer.router.add_route(route('test', consumer))
+ old_send = channel_layer.send
+ channel_layer.send = mock.Mock(side_effect=old_send) # proxy 'send' for counting
+ worker_group = WorkerGroup(channel_layer, n_threads=3,
+ signal_handlers=False, stop_gracefully=True,
+ callback=callback)
+ worker_group_t = threading.Thread(target=worker_group.run)
+ worker_group_t.daemon = True
+ worker_group_t.start()
+ # wait when a worker starts the callback and terminate the worker group
+ callback_is_running.wait()
+ self.assertRaises(SystemExit, worker_group.sigterm_handler, None, None)
+ self.assertFalse(callback_is_running.is_set())
+
+ self.assertEqual(consumer.call_count, 1)
+ self.assertEqual(channel_layer.send.call_count, 0)
diff --git a/tests/test_wsclient.py b/tests/test_wsclient.py
index a8924ca..0b94621 100644
--- a/tests/test_wsclient.py
+++ b/tests/test_wsclient.py
@@ -52,8 +52,8 @@ class WSClientTests(ChannelTestCase):
self.assertEqual(content['path'], '/')
self.assertTrue('headers' in content)
- self.assertTrue('cookie' in content['headers'])
- self.assertTrue(b'sessionid' in content['headers']['cookie'])
+ self.assertIn(b'cookie', [x[0] for x in content['headers']])
+ self.assertIn(b'sessionid', [x[1] for x in content['headers'] if x[0] == b'cookie'][0])
def test_ordering_in_content(self):
client = WSClient(ordered=True)
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-django-channels.git
More information about the Python-modules-commits
mailing list