[Python-modules-commits] [python-django-channels] 02/07: New upstream version 1.1.8.1

Michael Fladischer fladi at moszumanska.debian.org
Mon Jan 8 17:17:24 UTC 2018


This is an automated email from the git hooks/post-receive script.

fladi pushed a commit to branch debian/master
in repository python-django-channels.

commit 94a0fbea0ef4c858e24285f6328ed15e59e72e15
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date:   Mon Jan 8 17:45:41 2018 +0100

    New upstream version 1.1.8.1
---
 channels/management/commands/runworker.py |   5 +
 channels/test/websocket.py                |   5 +-
 channels/worker.py                        |  22 ++-
 docs/generics.rst                         |   8 +
 docs/getting-started.rst                  |   4 +-
 patchinator.py                            | 259 ------------------------------
 tests/test_worker.py                      |  38 ++++-
 tests/test_wsclient.py                    |   4 +-
 8 files changed, 79 insertions(+), 266 deletions(-)

diff --git a/channels/management/commands/runworker.py b/channels/management/commands/runworker.py
index e8366e6..d0dd7ae 100644
--- a/channels/management/commands/runworker.py
+++ b/channels/management/commands/runworker.py
@@ -34,6 +34,10 @@ class Command(BaseCommand):
             default=1, type=int,
             help='Number of threads to execute.'
         )
+        parser.add_argument(
+            '--disable-graceful-stop', action='store_false', dest='stop_gracefully',
+            help='Disable graceful stop for a worker group (instant exit on termination signal).',
+        )
 
     def handle(self, *args, **options):
         # Get the backend to use
@@ -68,6 +72,7 @@ class Command(BaseCommand):
             self.logger.info("Using multi-threaded worker, {} thread(s).".format(self.n_threads))
             worker_cls = WorkerGroup
             worker_kwargs['n_threads'] = self.n_threads
+            worker_kwargs['stop_gracefully'] = self.options.get('stop_gracefully')
         # Run the worker
         self.logger.info("Running worker against channel layer %s", self.channel_layer)
         try:
diff --git a/channels/test/websocket.py b/channels/test/websocket.py
index 4b762ce..c260bd8 100644
--- a/channels/test/websocket.py
+++ b/channels/test/websocket.py
@@ -82,6 +82,9 @@ class WSClient(Client):
         self.channel_layer.send(to, self._get_content(content, text, path))
         self._session_cookie = False
 
+    def _list_headers(self):
+        return [[key.encode(), self.headers[key]] for key in self.headers]
+
     def _get_content(self, content={}, text=None, path='/'):
         content = copy.deepcopy(content)
         content.setdefault('reply_channel', self.reply_channel)
@@ -93,7 +96,7 @@ class WSClient(Client):
         else:
             content.setdefault('path', path)
 
-        content.setdefault('headers', self.headers)
+        content.setdefault('headers', self._list_headers())
 
         if self._ordered:
             if 'order' in content:
diff --git a/channels/worker.py b/channels/worker.py
index 1eacd67..ec1a9ad 100644
--- a/channels/worker.py
+++ b/channels/worker.py
@@ -149,6 +149,8 @@ class Worker(object):
             finally:
                 # Send consumer finished so DB conns close etc.
                 consumer_finished.send(sender=self.__class__)
+        # Reset the flag to have a correct value after finishing the run loop
+        self.in_job = False
 
 
 class WorkerGroup(Worker):
@@ -159,12 +161,21 @@ class WorkerGroup(Worker):
 
     def __init__(self, *args, **kwargs):
         n_threads = kwargs.pop('n_threads', multiprocessing.cpu_count()) - 1
+        self.stop_gracefully = kwargs.pop('stop_gracefully', False)
         super(WorkerGroup, self).__init__(*args, **kwargs)
         kwargs['signal_handlers'] = False
         self.workers = [Worker(*args, **kwargs) for ii in range(n_threads)]
 
     def sigterm_handler(self, signo, stack_frame):
-        logger.info("Shutdown signal received by WorkerGroup, terminating immediately.")
+        self.termed = True
+        for worker in self.workers:
+            worker.termed = True
+        msg_prefix = "Shutdown signal received by WorkerGroup, "
+        if self.stop_gracefully:
+            logger.info(msg_prefix + "waiting for sub-workers.")
+            self.wait_for_workers()
+        else:
+            logger.info(msg_prefix + "terminating immediately.")
         sys.exit(0)
 
     def ready(self):
@@ -182,3 +193,12 @@ class WorkerGroup(Worker):
             t.daemon = True
             t.start()
         super(WorkerGroup, self).run()
+        if self.stop_gracefully:
+            self.wait_for_workers()
+
+    def wait_for_workers(self):
+        while self.in_job or any(
+                self.threads[worker_id].is_alive() and
+                self.workers[worker_id].in_job
+                for worker_id in range(len(self.workers))):
+            time.sleep(0.1)
diff --git a/docs/generics.rst b/docs/generics.rst
index 17768da..5270b78 100644
--- a/docs/generics.rst
+++ b/docs/generics.rst
@@ -266,6 +266,14 @@ And if you just want to use the user from the django session, add ``http_user``:
 This will give you ``message.user``, which will be the same as ``request.user``
 would be on a regular View.
 
+And if you want to use both user and session from the django session, add ``http_user_and_session``::
+
+    class MyConsumer(WebsocketConsumer):
+
+        http_user_and_session = True
+
+This will give you ``message.user`` and ``message.http_session``.
+
 
 Applying Decorators
 -------------------
diff --git a/docs/getting-started.rst b/docs/getting-started.rst
index 77a374e..35cca58 100644
--- a/docs/getting-started.rst
+++ b/docs/getting-started.rst
@@ -445,7 +445,7 @@ Now, of course, a WebSocket solution is somewhat limited in scope without the
 ability to live with the rest of your website - in particular, we want to make
 sure we know what user we're talking to, in case we have things like private
 chat channels (we don't want a solution where clients just ask for the right
-channels, as anyone could change the code and just put in private channel names)
+channels, as anyone could change the code and just put in private channel names).
 
 It can also save you having to manually make clients ask for what they want to
 see; if I see you open a WebSocket to my "updates" endpoint, and I know which
@@ -559,7 +559,7 @@ from hosts listed in the ``ALLOWED_HOSTS`` setting::
     from channels import Channel, Group
     from channels.sessions import channel_session
     from channels.auth import channel_session_user, channel_session_user_from_http
-    from channels.security.websockets import allowed_hosts_only.
+    from channels.security.websockets import allowed_hosts_only
 
     # Connected to websocket.connect
     @allowed_hosts_only
diff --git a/patchinator.py b/patchinator.py
deleted file mode 100644
index 661a659..0000000
--- a/patchinator.py
+++ /dev/null
@@ -1,259 +0,0 @@
-#!/usr/bin/python
-"""
-Script that automatically generates a Django patch from the Channels codebase
-based on some simple rules and string replacements.
-
-Once Channels lands in Django, will be reversed to instead generate this
-third-party app from the now-canonical Django source.
-"""
-
-import re
-import os.path
-import sys
-
-from isort import SortImports
-
-
-# Transforms: Turn one content string into another
-
-class Replacement(object):
-    """
-    Represents a string replacement in a file; uses a regular expression to
-    substitute strings in the file.
-    """
-
-    def __init__(self, match, sub, regex=True):
-        self.match = match
-        self.sub = sub
-        self.regex = regex
-
-    def __call__(self, value):
-        if self.regex:
-            return re.sub(self.match, self.sub, value)
-        else:
-            return value.replace(self.match, self.sub)
-
-
-class Insert(object):
-    """
-    Inserts a string before/after another in a file, one time only, with multiline match.
-    """
-
-    def __init__(self, match, to_insert, after=False):
-        self.match = match
-        self.to_insert = to_insert
-        self.after = after
-
-    def __call__(self, value):
-        match = re.search(self.match, value, flags=re.MULTILINE)
-        if not match:
-            raise ValueError("Could not find match %s" % self.match)
-        if self.after:
-            return value[:match.end()] + self.to_insert + value[match.end():]
-        else:
-            return value[:match.start()] + self.to_insert + value[match.start():]
-
-
-class Isort(object):
-    """
-    Runs isort on the file
-    """
-
-    def __call__(self, value):
-        return SortImports(file_contents=value).output
-
-
-# Operations: Copy or patch files
-
-class FileMap(object):
-    """
-    Represents a file map from the source to the destination, with
-    optional extra regex transforms.
-    """
-
-    def __init__(self, source_path, dest_path, transforms, makedirs=True):
-        self.source_path = source_path
-        self.dest_path = dest_path
-        self.transforms = transforms
-        self.makedirs = makedirs
-
-    def run(self, source_dir, dest_dir):
-        print("COPY: %s -> %s" % (self.source_path, self.dest_path))
-        # Open and read in source file
-        source = os.path.join(source_dir, self.source_path)
-        with open(source, "r") as fh:
-            content = fh.read()
-        # Run transforms
-        for transform in self.transforms:
-            content = transform(content)
-        # Save new file
-        dest = os.path.join(dest_dir, self.dest_path)
-        if self.makedirs:
-            if not os.path.isdir(os.path.dirname(dest)):
-                os.makedirs(os.path.dirname(dest))
-        with open(dest, "w") as fh:
-            fh.write(content)
-
-
-class NewFile(object):
-    """
-    Writes a file to the destination, either blank or with some content from
-    a string.
-    """
-
-    def __init__(self, dest_path, content=""):
-        self.dest_path = dest_path
-        self.content = content
-
-    def run(self, source_dir, dest_dir):
-        print("NEW: %s" % (self.dest_path, ))
-        # Save new file
-        dest = os.path.join(dest_dir, self.dest_path)
-        with open(dest, "w") as fh:
-            fh.write(self.content)
-
-
-# Main class and config
-
-
-global_transforms = [
-    Replacement(r"import channels.([a-zA-Z0-9_\.]+)$", r"import django.channels.\1 as channels"),
-    Replacement(r"from channels import", r"from django.channels import"),
-    Replacement(r"from channels.([a-zA-Z0-9_\.]+) import", r"from django.channels.\1 import"),
-    Replacement(r"from .handler import", r"from django.core.handlers.asgi import"),
-    Replacement(r"from django.channels.test import", r"from django.test.channels import"),
-    Replacement(r"from django.channels.handler import", r"from django.core.handlers.asgi import"),
-    Replacement(r"tests.test_routing", r"channels_tests.test_routing"),
-    Replacement(r"django.core.urlresolvers", r"django.urls"),
-]
-
-python_transforms = global_transforms + [
-    Isort(),
-]
-
-docs_transforms = global_transforms + [
-    Replacement(r"<concepts>`", r"</topics/channels/concepts>`"),
-    Replacement(r":doc:`concepts`", r":doc:`/topics/channels/concepts`"),
-    Replacement(r":doc:`deploying`", r":doc:`/topics/channels/deploying`"),
-    Replacement(r":doc:`scaling`", r":doc:`/topics/channels/scaling`"),
-    Replacement(r":doc:`getting-started`", r":doc:`/intro/channels`"),
-    Replacement(r"<backends>`", r"</ref/channels/backends>`"),
-    Replacement(r":doc:`backends`", r":doc:`/ref/channels/backends`"),
-    Replacement(r":doc:`([\w\d\s]+) <asgi>`", r"`\1 <https://channels.readthedocs.io/en/latest/asgi.html>`_"),
-    Replacement(r"\n\(.*installation>`\)\n", r""),
-    Replacement(r":doc:`installed Channels correctly <installation>`", r"added the channel layer setting"),
-    Replacement(r"Channels", r"channels"),
-    Replacement(r"Started with channels", r"Started with Channels"),
-    Replacement(r"Running with channels", r"Running with Channels"),
-    Replacement(r"channels consumers", r"channel consumers"),
-    Replacement(r"channels' design", r"The channels design"),
-    Replacement(r"channels is being released", r"Channels is being released"),
-    Replacement(r"channels is", r"channels are"),
-    Replacement(r"channels provides a", r"Channels provides a"),
-    Replacement(r"channels can use", r"Channels can use"),
-    Replacement(r"channels Concepts", r"Channels Concepts"),
-    Replacement(r"channels works", r"channels work"),
-]
-
-
-class Patchinator(object):
-
-    operations = [
-        FileMap(
-            "channels/asgi.py", "django/channels/asgi.py", python_transforms + [
-                Replacement("if django.VERSION[1] > 9:\n        django.setup(set_prefix=False)\n    else:\n        django.setup()", "django.setup(set_prefix=False)", regex=False),  # NOQA
-            ],
-        ),
-        FileMap(
-            "channels/auth.py", "django/channels/auth.py", python_transforms,
-        ),
-        FileMap(
-            "channels/channel.py", "django/channels/channel.py", python_transforms,
-        ),
-        FileMap(
-            "channels/exceptions.py", "django/channels/exceptions.py", python_transforms,
-        ),
-        FileMap(
-            "channels/handler.py", "django/core/handlers/asgi.py", python_transforms,
-        ),
-        FileMap(
-            "channels/routing.py", "django/channels/routing.py", python_transforms,
-        ),
-        FileMap(
-            "channels/message.py", "django/channels/message.py", python_transforms,
-        ),
-        FileMap(
-            "channels/sessions.py", "django/channels/sessions.py", python_transforms,
-        ),
-        FileMap(
-            "channels/staticfiles.py", "django/contrib/staticfiles/consumers.py", python_transforms,
-        ),
-        FileMap(
-            "channels/utils.py", "django/channels/utils.py", python_transforms,
-        ),
-        FileMap(
-            "channels/worker.py", "django/channels/worker.py", python_transforms,
-        ),
-        FileMap(
-            "channels/management/commands/runworker.py",
-            "django/core/management/commands/runworker.py",
-            python_transforms,
-        ),
-        # Tests
-        FileMap(
-            "channels/test/base.py", "django/test/channels.py", python_transforms,
-        ),
-        NewFile(
-            "tests/channels_tests/__init__.py",
-        ),
-        FileMap(
-            "tests/test_handler.py", "tests/channels_tests/test_handler.py", python_transforms,
-        ),
-        FileMap(
-            "tests/test_routing.py", "tests/channels_tests/test_routing.py", python_transforms,
-        ),
-        FileMap(
-            "tests/test_request.py", "tests/channels_tests/test_request.py", python_transforms,
-        ),
-        FileMap(
-            "tests/test_sessions.py", "tests/channels_tests/test_sessions.py", python_transforms,
-        ),
-        # Docs
-        FileMap(
-            "docs/backends.rst", "docs/ref/channels/backends.txt", docs_transforms,
-        ),
-        FileMap(
-            "docs/concepts.rst", "docs/topics/channels/concepts.txt", docs_transforms,
-        ),
-        FileMap(
-            "docs/deploying.rst", "docs/topics/channels/deploying.txt", docs_transforms,
-        ),
-        FileMap(
-            "docs/getting-started.rst", "docs/intro/channels.txt", docs_transforms,
-        ),
-        FileMap(
-            "docs/reference.rst", "docs/ref/channels/api.txt", docs_transforms,
-        ),
-        FileMap(
-            "docs/testing.rst", "docs/topics/channels/testing.txt", docs_transforms,
-        ),
-        FileMap(
-            "docs/cross-compat.rst", "docs/topics/channels/cross-compat.txt", docs_transforms,
-        ),
-    ]
-
-    def __init__(self, source, destination):
-        self.source = os.path.abspath(source)
-        self.destination = os.path.abspath(destination)
-
-    def run(self):
-        print("Patchinator running.\n Source: %s\n Destination: %s" % (self.source, self.destination))
-        for operation in self.operations:
-            operation.run(self.source, self.destination)
-
-
-if __name__ == '__main__':
-    try:
-        Patchinator(os.path.dirname(__file__), sys.argv[1]).run()
-    except IndexError:
-        print("Supply the target Django directory on the command line")
diff --git a/tests/test_worker.py b/tests/test_worker.py
index a8975fd..745336f 100644
--- a/tests/test_worker.py
+++ b/tests/test_worker.py
@@ -1,10 +1,13 @@
 from __future__ import unicode_literals
 
+import time
+import threading
+
 from channels import DEFAULT_CHANNEL_LAYER, Channel, route
 from channels.asgi import channel_layers
 from channels.exceptions import ConsumeLater
 from channels.test import ChannelTestCase
-from channels.worker import Worker
+from channels.worker import Worker, WorkerGroup
 
 try:
     from unittest import mock
@@ -93,3 +96,36 @@ class WorkerTests(ChannelTestCase):
         worker.run()
         self.assertEqual(consumer.call_count, 1)
         self.assertEqual(channel_layer.send.call_count, 0)
+
+
+class WorkerGroupTests(ChannelTestCase):
+
+    def test_graceful_stop(self):
+
+        callback_is_running = threading.Event()
+
+        def callback(channel, message):
+            callback_is_running.set()
+            # emulate some delay to validate graceful stop
+            time.sleep(0.1)
+            callback_is_running.clear()
+
+        consumer = mock.Mock()
+        Channel('test').send({'test': 'test'}, immediately=True)
+        channel_layer = channel_layers[DEFAULT_CHANNEL_LAYER]
+        channel_layer.router.add_route(route('test', consumer))
+        old_send = channel_layer.send
+        channel_layer.send = mock.Mock(side_effect=old_send)  # proxy 'send' for counting
+        worker_group = WorkerGroup(channel_layer, n_threads=3,
+                                   signal_handlers=False, stop_gracefully=True,
+                                   callback=callback)
+        worker_group_t = threading.Thread(target=worker_group.run)
+        worker_group_t.daemon = True
+        worker_group_t.start()
+        # wait when a worker starts the callback and terminate the worker group
+        callback_is_running.wait()
+        self.assertRaises(SystemExit, worker_group.sigterm_handler, None, None)
+        self.assertFalse(callback_is_running.is_set())
+
+        self.assertEqual(consumer.call_count, 1)
+        self.assertEqual(channel_layer.send.call_count, 0)
diff --git a/tests/test_wsclient.py b/tests/test_wsclient.py
index a8924ca..0b94621 100644
--- a/tests/test_wsclient.py
+++ b/tests/test_wsclient.py
@@ -52,8 +52,8 @@ class WSClientTests(ChannelTestCase):
         self.assertEqual(content['path'], '/')
 
         self.assertTrue('headers' in content)
-        self.assertTrue('cookie' in content['headers'])
-        self.assertTrue(b'sessionid' in content['headers']['cookie'])
+        self.assertIn(b'cookie', [x[0] for x in content['headers']])
+        self.assertIn(b'sessionid', [x[1] for x in content['headers'] if x[0] == b'cookie'][0])
 
     def test_ordering_in_content(self):
         client = WSClient(ordered=True)

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-django-channels.git



More information about the Python-modules-commits mailing list