[Python-modules-commits] [python-asgiref] 01/03: importing python-asgiref_1.1.2.orig.tar.gz

Michael Fladischer fladi at moszumanska.debian.org
Mon Jul 17 12:40:01 UTC 2017


This is an automated email from the git hooks/post-receive script.

fladi pushed a commit to branch master
in repository python-asgiref.

commit e038655ea5362f28380f495b539e6a6f7ebc945c
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date:   Mon Jul 17 13:18:56 2017 +0200

    importing python-asgiref_1.1.2.orig.tar.gz
---
 .gitignore               |   7 +
 .travis.yml              |  17 +++
 CHANGELOG.txt            |  29 ++++
 LICENSE                  |  27 ++++
 Makefile                 |  16 +++
 README.rst               |  64 +++++++++
 asgiref/__init__.py      |   1 +
 asgiref/base_layer.py    | 141 +++++++++++++++++++
 asgiref/conformance.py   | 346 +++++++++++++++++++++++++++++++++++++++++++++++
 asgiref/inmemory.py      | 180 ++++++++++++++++++++++++
 asgiref/wsgi.py          | 104 ++++++++++++++
 setup.cfg                |   2 +
 setup.py                 |  25 ++++
 tests/__init__.py        |   0
 tests/test_base_layer.py |  93 +++++++++++++
 tests/test_inmemory.py   |  25 ++++
 tests/test_wsgi.py       |  68 ++++++++++
 tox.ini                  |   5 +
 18 files changed, 1150 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100755
index 0000000..9a904ec
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+*.egg-info
+dist/
+build/
+__pycache__/
+*.pyc
+.tox/
+*~
diff --git a/.travis.yml b/.travis.yml
new file mode 100755
index 0000000..1d0847f
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,17 @@
+# Faster container-based infrastructure
+sudo: false
+
+language: python
+
+# Python versions supported by channel's supported Django versions
+python:
+  - "2.7"
+  - "3.4"
+  - "3.5"
+
+install:
+  - pip install -e .
+  # Useful for debugging
+  - pip freeze
+
+script: python -m unittest discover
diff --git a/CHANGELOG.txt b/CHANGELOG.txt
new file mode 100755
index 0000000..c6e570e
--- /dev/null
+++ b/CHANGELOG.txt
@@ -0,0 +1,29 @@
+1.1.2 (2017-05-16)
+-----------------
+
+* Conformance test suite now allows for retries and tests group_send's behaviour with capacity
+* valid_channel_names now has a receive parameter
+
+1.1.1 (2017-04-02)
+------------------
+
+* Error with sending to multi-process channels with the same message fixed
+
+1.1.0 (2017-04-01)
+------------------
+
+* Process-specific channel behaviour has been changed, and the base layer
+  and conformance suites updated to match.
+
+1.0.1 (2017-03-19)
+------------------
+
+* Improved channel and group name validation
+* Test rearrangements and improvements
+
+1.0.0 (2016-04-11)
+------------------
+
+* `receive_many` is now `receive`
+* In-memory layer deepcopies messages so they cannot be mutated post-send
+* Better errors for bad channel/group names
diff --git a/LICENSE b/LICENSE
new file mode 100755
index 0000000..5f4f225
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) Django Software Foundation and individual contributors.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+    1. Redistributions of source code must retain the above copyright notice,
+       this list of conditions and the following disclaimer.
+
+    2. Redistributions in binary form must reproduce the above copyright
+       notice, this list of conditions and the following disclaimer in the
+       documentation and/or other materials provided with the distribution.
+
+    3. Neither the name of Django nor the names of its contributors may be used
+       to endorse or promote products derived from this software without
+       specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile b/Makefile
new file mode 100755
index 0000000..1a1f55e
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,16 @@
+.PHONY: release
+
+all:
+
+release:
+ifndef version
+	$(error Please supply a version)
+endif
+	@echo Releasing version $(version)
+ifeq (,$(findstring $(version),$(shell git log --oneline -1)))
+	$(error Last commit does not match version)
+endif
+	git tag $(version)
+	git push
+	git push --tags
+	python setup.py sdist bdist_wheel upload
diff --git a/README.rst b/README.rst
new file mode 100755
index 0000000..d3af269
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,64 @@
+asgiref
+=======
+
+.. image:: https://api.travis-ci.org/django/asgiref.svg
+    :target: https://travis-ci.org/django/asgiref
+
+.. image:: https://img.shields.io/pypi/v/asgiref.svg
+    :target: https://pypi.python.org/pypi/asgiref
+
+Contains various reference ASGI implementations, including:
+
+* A base channel layer, ``asgiref.base_layer``
+* An in-memory channel layer, ``asgiref.inmemory``
+* WSGI-to-ASGI and ASGI-to-WSGI adapters, in ``asgiref.wsgi``
+
+
+Base Channel Layer
+------------------
+
+Provides an optional template to start ASGI channel layers from with the two
+exceptions you need provided and all API functions stubbed out.
+
+Also comes with logic for doing per-channel capacities using channel names and
+globbing; use ``self.get_capacity`` and pass the arguments through to the base
+``__init__`` if you want to use it.
+
+
+In-memory Channel Layer
+-----------------------
+
+Simply instantiate ``asgiref.inmemory.ChannelLayer``, or use the pre-made
+``asgiref.inmemory.channel_layer`` for easy use. Implements the ``group``
+extension, and is designed to support running multiple ASGI programs
+in separate threads within one process (the channel layer is threadsafe).
+
+
+WSGI-ASGI Adapters
+------------------
+
+These are not yet complete and should not be used.
+
+Dependencies
+------------
+
+All Channels projects currently support Python 2.7, 3.4 and 3.5.
+
+Contributing
+------------
+
+Please refer to the
+`main Channels contributing docs <https://github.com/django/channels/blob/master/CONTRIBUTING.rst>`_.
+That also contains advice on how to set up the development environment and run the tests.
+
+Maintenance and Security
+------------------------
+
+To report security issues, please contact security at djangoproject.com. For GPG
+signatures and more security process information, see
+https://docs.djangoproject.com/en/dev/internals/security/.
+
+To report bugs or request new features, please open a new GitHub issue.
+
+This repository is part of the Channels project. For the shepherd and maintenance team, please see the
+`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.
diff --git a/asgiref/__init__.py b/asgiref/__init__.py
new file mode 100755
index 0000000..72f26f5
--- /dev/null
+++ b/asgiref/__init__.py
@@ -0,0 +1 @@
+__version__ = "1.1.2"
diff --git a/asgiref/base_layer.py b/asgiref/base_layer.py
new file mode 100755
index 0000000..ef1b8b2
--- /dev/null
+++ b/asgiref/base_layer.py
@@ -0,0 +1,141 @@
+from __future__ import unicode_literals
+
+import fnmatch
+import re
+import six
+import warnings
+
+
+class BaseChannelLayer(object):
+    """
+    Base channel layer object; outlines the core API and contains some handy
+    reuseable code bits. You don't have to inherit from this, but it might
+    save you time.
+    """
+
+    def __init__(self, expiry=60, group_expiry=86400, capacity=100, channel_capacity=None):
+        self.expiry = expiry
+        self.capacity = capacity
+        self.group_expiry = group_expiry
+        self.channel_capacity = self.compile_capacities(channel_capacity or {})
+
+    ### ASGI API ###
+
+    extensions = []
+
+    class ChannelFull(Exception):
+        pass
+
+    class MessageTooLarge(Exception):
+        pass
+
+    def send(self, channel, message):
+        raise NotImplementedError()
+
+    def receive(self, channels, block=False):
+        raise NotImplementedError()
+
+    def receive_many(self, channels, block=False):
+        """
+        receive_many is deprecated, but this is provided for backwards compatability.
+        """
+        warnings.warn("receive_many is deprecated; please use receive", DeprecationWarning)
+        return self.receive(channels, block)
+
+    def new_channel(self, pattern):
+        raise NotImplementedError()
+
+    ### ASGI Group API ###
+
+    def group_add(self, group, channel):
+        raise NotImplementedError()
+
+    def group_discard(self, group, channel):
+        raise NotImplementedError()
+
+    def group_channels(self, group):
+        raise NotImplementedError()
+
+    def send_group(self, group, message):
+        raise NotImplementedError()
+
+    ### ASGI Flush API ###
+
+    def flush(self):
+        raise NotImplementedError()
+
+    ### Capacity utility functions
+
+    def compile_capacities(self, channel_capacity):
+        """
+        Takes an input channel_capacity dict and returns the compiled list
+        of regexes that get_capacity will look for as self.channel_capacity/
+        """
+        result = []
+        for pattern, value in channel_capacity.items():
+            # If they passed in a precompiled regex, leave it, else intepret
+            # it as a glob.
+            if hasattr(pattern, "match"):
+                result.append((pattern, value))
+            else:
+                result.append((re.compile(fnmatch.translate(pattern)), value))
+        return result
+
+    def get_capacity(self, channel):
+        """
+        Gets the correct capacity for the given channel; either the default,
+        or a matching result from channel_capacity. Returns the first matching
+        result; if you want to control the order of matches, use an ordered dict
+        as input.
+        """
+        for pattern, capacity in self.channel_capacity:
+            if pattern.match(channel):
+                return capacity
+        return self.capacity
+
+    def match_type_and_length(self, name):
+        if (len(name) < 100) and (isinstance(name, six.text_type)):
+            return True
+        return False
+
+    ### Name validation functions
+
+    channel_name_regex = re.compile(r"^[a-zA-Z\d\-_.]+((\?|\!)[\d\w\-_.]*)?$")
+    group_name_regex = re.compile(r"^[a-zA-Z\d\-_.]+$")
+    invalid_name_error = "{} name must be a valid unicode string containing only ASCII alphanumerics, hyphens, underscores, or periods."
+
+    def valid_channel_name(self, name, receive=False):
+        if self.match_type_and_length(name):
+            if bool(self.channel_name_regex.match(name)):
+                # Check cases for special channels
+                if "?" in name and name.endswith("?"):
+                    raise TypeError("Single-reader channel names must not end with ?")
+                elif "!" in name and not name.endswith("!") and receive:
+                    raise TypeError("Process-local channel names in receive() must end at the !")
+                return True
+        raise TypeError("Channel name must be a valid unicode string containing only ASCII alphanumerics, hyphens, or periods, not '{}'.".format(name))
+
+    def valid_group_name(self, name):
+        if self.match_type_and_length(name):
+            if bool(self.group_name_regex.match(name)):
+                return True
+        raise TypeError("Group name must be a valid unicode string containing only ASCII alphanumerics, hyphens, or periods.")
+
+    def valid_channel_names(self, names, receive=False):
+        _non_empty_list = True if names else False
+        _names_type = isinstance(names, list)
+        assert _non_empty_list and _names_type, "names must be a non-empty list"
+
+        assert all(self.valid_channel_name(channel, receive=receive) for channel in names)
+        return True
+
+    def non_local_name(self, name):
+        """
+        Given a channel name, returns the "non-local" part. If the channel name
+        is a process-specific channel (contains !) this means the part up to
+        and including the !; if it is anything else, this means the full name.
+        """
+        if "!" in name:
+            return name[:name.find("!")+1]
+        else:
+            return name
diff --git a/asgiref/conformance.py b/asgiref/conformance.py
new file mode 100755
index 0000000..acb76a3
--- /dev/null
+++ b/asgiref/conformance.py
@@ -0,0 +1,346 @@
+"""
+ASGI spec conformance test suite.
+
+Calling the functions with an ASGI channel layer instance will return you a
+single TestCase instance that checks for conformity on that instance.
+
+You MUST also pass along an expiry value to the sets of tests, to allow the
+suite to wait correctly for expiry. It's suggested you configure the layer
+for 1-second expiry during tests, and use a 1.1 second expiry delay.
+
+The channel layers should be empty to start with, and discarded after use,
+as they'll be full of test data. If a layer supports the "flush" extension,
+it'll be flushed before every test.
+"""
+
+from __future__ import unicode_literals
+import six
+import time
+import unittest
+
+
+class ConformanceTestCase(unittest.TestCase):
+    """
+    Tests that core ASGI functionality is maintained.
+    """
+
+    channel_layer = None
+    expiry_delay = None
+    capacity_limit = None
+    receive_tries = 1
+
+    def receive(self, channels):
+        """
+        Allows tests to automatically call channel_layer.receive() more than once.
+        This is necessary for testing ChannelLayer implementations that do not guarantee a response will
+        be returned on every receive() call, even when there are messages on the channel. This would be
+        the case, for example, for a channel layer designed for a multi-worker environment with multiple
+        backing hosts, that checks a different host on each call.
+        """
+        for _ in range(self.receive_tries):
+            channel, message = self.channel_layer.receive(channels)
+            if channel is not None:
+                return channel, message
+        return None, None
+
+    @classmethod
+    def setUpClass(cls):
+        # Don't let this actual class run, it's abstract
+        if cls is ConformanceTestCase:
+            raise unittest.SkipTest("Skipping base class tests")
+
+    def setUp(self):
+        if self.channel_layer is None:
+            raise ValueError("You must define 'channel_layer' when subclassing the conformance tests.")
+        if self.expiry_delay is None:
+            raise ValueError("You must define 'expiry_delay' when subclassing the conformance tests.")
+        if "flush" in self.channel_layer.extensions:
+            self.channel_layer.flush()
+
+    def skip_if_no_extension(self, extension):
+        """
+        Handy function for skipping things without an extension.
+        We can't use the decorators, as we need access to self.
+        """
+        if extension not in self.channel_layer.extensions:
+            raise unittest.SkipTest("No %s extension" % extension)
+
+    def test_send_recv(self):
+        """
+        Tests that channels can send and receive messages right.
+        """
+        self.channel_layer.send("sr_test", {"value": "blue"})
+        self.channel_layer.send("sr_test", {"value": "green"})
+        self.channel_layer.send("sr_test", {"value": "yellow"})
+        self.channel_layer.send("sr_test2", {"value": "red"})
+        # Receive from the first channel twice
+        response_messages = []
+        for i in range(3):
+            channel, message = self.receive(["sr_test"])
+            response_messages.append(message)
+            self.assertEqual(channel, "sr_test")
+        for response in response_messages:
+            self.assertTrue("value" in response)
+        # Check that all messages were returned; order is not guaranteed
+        self.assertEqual(set([r["value"] for r in response_messages]), set(["blue", "green", "yellow"]))
+        # And the other channel with multi select
+        channel, message = self.receive(["sr_test", "sr_test2"])
+        self.assertEqual(channel, "sr_test2")
+        self.assertEqual(message, {"value": "red"})
+
+    def test_single_process_receive(self):
+        """
+        Tests that single-process receive gets anything with the right prefix.
+        """
+        self.channel_layer.send("spr_test!a", {"best": "ponies"})
+        channel, message = self.receive(["spr_test!"])
+        self.assertEqual(channel, "spr_test!a")
+        self.assertEqual(message, {"best": "ponies"})
+        self.channel_layer.send("spr_test!b", {"best": "pangolins"})
+        channel, message = self.receive(["spr_test!"])
+        self.assertEqual(channel, "spr_test!b")
+        self.assertEqual(message, {"best": "pangolins"})
+
+    def test_single_process_receive_error(self):
+        """
+        Tests that single-process receive isn't allowed with a local part.
+        """
+        with self.assertRaises(Exception):
+            self.receive(["spr_test!c"])
+
+    def test_message_expiry(self):
+        """
+        Tests that messages expire correctly.
+        """
+        self.channel_layer.send("me_test", {"value": "blue"})
+        time.sleep(self.expiry_delay)
+        channel, message = self.receive(["me_test"])
+        self.assertIs(channel, None)
+        self.assertIs(message, None)
+
+    def test_new_channel_single_reader(self):
+        """
+        Tests that new single-reader channel names are made correctly.
+        """
+        pattern = "test.foo?"
+        name1 = self.channel_layer.new_channel(pattern)
+        self.assertFalse(name1.endswith("?"))
+        self.assertTrue("?" in name1)
+        self.assertEqual(name1.find("?"), name1.rfind("?"))
+        self.assertIsInstance(name1, six.text_type)
+        # Send a message and make sure new_channel on second pass changes
+        self.channel_layer.send(name1, {"value": "blue"})
+        name2 = self.channel_layer.new_channel(pattern)
+        # Make sure we can consume off of that new channel
+        channel, message = self.receive([name1, name2])
+        self.assertEqual(channel, name1)
+        self.assertEqual(message, {"value": "blue"})
+
+    def test_new_channel_failures(self):
+        """
+        Tests that we don't allow bad new channel names.
+        """
+        with self.assertRaises(Exception):
+            self.channel_layer.new_channel("test!")
+        with self.assertRaises(Exception):
+            self.channel_layer.new_channel("test.foo")
+
+    def test_strings(self):
+        """
+        Ensures byte strings and unicode strings both make it through
+        serialization properly.
+        """
+        # Message. Double-nested to ensure serializers are recursing properly.
+        message = {
+            "values": {
+                # UTF-8 sequence for british pound, but we want it not interpreted into that.
+                "utf-bytes": b"\xc2\xa3",
+                # Actual unicode for british pound, should come back as 1 char
+                "unicode": "\u00a3",
+                # Emoji, in case someone is using 3-byte-wide unicode storage
+                "emoji": "\u1F612",
+                # Random control characters and null
+                "control": b"\x01\x00\x03\x21",
+            }
+        }
+        # Send it and receive it
+        self.channel_layer.send("str_test", message)
+        _, received = self.receive(["str_test"])
+        # Compare
+        self.assertIsInstance(received["values"]["utf-bytes"], six.binary_type)
+        self.assertIsInstance(received["values"]["unicode"], six.text_type)
+        self.assertIsInstance(received["values"]["emoji"], six.text_type)
+        self.assertIsInstance(received["values"]["control"], six.binary_type)
+        self.assertEqual(received["values"]["utf-bytes"], message["values"]["utf-bytes"])
+        self.assertEqual(received["values"]["unicode"], message["values"]["unicode"])
+        self.assertEqual(received["values"]["emoji"], message["values"]["emoji"])
+        self.assertEqual(received["values"]["control"], message["values"]["control"])
+
+    def test_groups(self):
+        """
+        Tests that basic group addition and send works
+        """
+        self.skip_if_no_extension("groups")
+        # Make a group and send to it
+        self.channel_layer.group_add("tgroup", "tg_test")
+        self.channel_layer.group_add("tgroup", "tg_test2")
+        self.channel_layer.group_add("tgroup", "tg_test3")
+        self.channel_layer.group_discard("tgroup", "tg_test3")
+        self.channel_layer.send_group("tgroup", {"value": "orange"})
+        # Receive from the two channels in the group and ensure messages
+        channel, message = self.receive(["tg_test"])
+        self.assertEqual(channel, "tg_test")
+        self.assertEqual(message, {"value": "orange"})
+        channel, message = self.receive(["tg_test2"])
+        self.assertEqual(channel, "tg_test2")
+        self.assertEqual(message, {"value": "orange"})
+        # Make sure another channel does not get a message
+        channel, message = self.receive(["tg_test3"])
+        self.assertIs(channel, None)
+        self.assertIs(message, None)
+
+    def test_groups_process(self):
+        """
+        Tests that group membership and sending works with process-specific channels.
+        """
+        self.skip_if_no_extension("groups")
+        # Make a group and send to it
+        self.channel_layer.group_add("tgroup", "tgp!test")
+        self.channel_layer.group_add("tgroup", "tgp!test2")
+        self.channel_layer.group_add("tgroup", "tgp!test3")
+        self.channel_layer.group_discard("tgroup", "tgp!test2")
+        self.channel_layer.send_group("tgroup", {"value": "orange"})
+        # Receive from the two channels in the group and ensure messages
+        channel, message = self.receive(["tgp!"])
+        self.assertIn(channel, ["tgp!test", "tgp!test3"])
+        self.assertEqual(message, {"value": "orange"})
+        channel, message = self.receive(["tgp!"])
+        self.assertIn(channel, ["tgp!test", "tgp!test3"])
+        self.assertEqual(message, {"value": "orange"})
+        # Make sure another channel does not get a message
+        channel, message = self.receive(["tgp!"])
+        self.assertIs(channel, None)
+        self.assertIs(message, None)
+
+    def test_group_channels(self):
+        """
+        Tests that group membership check works
+        """
+        self.skip_if_no_extension("groups")
+        # Make a group
+        self.channel_layer.group_add("tgroup", "tg_test")
+        self.channel_layer.group_add("tgroup", "tg_test2")
+        self.channel_layer.group_add("tgroup", "tg_test3")
+        # Check group members
+        self.assertEqual(
+            set(self.channel_layer.group_channels("tgroup")),
+            {"tg_test", "tg_test2", "tg_test3"},
+        )
+        # Discard from group
+        self.channel_layer.group_discard("tgroup", "tg_test3")
+        self.assertEqual(
+            set(self.channel_layer.group_channels("tgroup")),
+            {"tg_test", "tg_test2"},
+        )
+
+    def test_flush(self):
+        """
+        Tests that messages go away after a flush.
+        """
+        self.skip_if_no_extension("flush")
+        # Send something to flush
+        self.channel_layer.send("fl_test", {"value": "blue"})
+        self.channel_layer.flush()
+        channel, message = self.receive(["fl_test"])
+        self.assertIs(channel, None)
+        self.assertIs(message, None)
+
+    def test_flush_groups(self):
+        """
+        Tests that groups go away after a flush.
+        """
+        self.skip_if_no_extension("groups")
+        self.skip_if_no_extension("flush")
+        # Add things to a group and send to it
+        self.channel_layer.group_add("tfg_group", "tfg_test")
+        self.channel_layer.send_group("tfg_group", {"value": "blue"})
+        self.channel_layer.flush()
+        channel, message = self.receive(["tfg_test"])
+        self.assertIs(channel, None)
+        self.assertIs(message, None)
+
+    def test_group_expiry(self):
+        """
+        Tests that group expiry is provided, and test it if it's less than
+        20 seconds.
+        """
+        self.skip_if_no_extension("groups")
+        # Check group expiry is provided, and see if we can continue
+        expiry = getattr(self.channel_layer, "group_expiry", None)
+        if expiry is None:
+            self.fail("group_expiry is not defined")
+        if expiry > 20:
+            raise unittest.SkipTest("Expiry too long for test")
+        # Add things to a group
+        self.channel_layer.group_add("tge_group", "tge_test")
+        # Wait group expiry plus one
+        time.sleep(expiry + 1)
+        # Ensure message never arrives
+        self.channel_layer.send_group("tge_group", {"value": "blue"})
+        channel, message = self.receive(["tge_test"])
+        self.assertIs(channel, None)
+        self.assertIs(message, None)
+
+    def test_capacity(self):
+        """
+        Tests that the capacity limiter on send() raises ChannelFull
+        after the right number of messages. Only runs if capacity_limit is set.
+        """
+        if self.capacity_limit is None:
+            raise unittest.SkipTest("No test capacity specified")
+        for _ in range(self.capacity_limit):
+            self.channel_layer.send("cap_test", {"hey": "there"})
+        with self.assertRaises(self.channel_layer.ChannelFull):
+            self.channel_layer.send("cap_test", {"hey": "there"})
+
+    def test_capacity_process(self):
+        """
+        Tests that the capacity limiter works on process-specific channels overall
+        """
+        if self.capacity_limit is None or self.capacity_limit < 2:
+            raise unittest.SkipTest("Test capacity is unspecified or too low")
+        for i in range(self.capacity_limit):
+            self.channel_layer.send("capp!%s" % i, {"hey": "there"})
+        with self.assertRaises(self.channel_layer.ChannelFull):
+            self.channel_layer.send("capp!final", {"hey": "there"})
+
+    def test_capacity_group(self):
+        """
+        Tests that the capacity limiter on group_send() never raises
+        ChannelFull.
+        """
+        self.skip_if_no_extension("groups")
+        self.channel_layer.group_add("tcg_group", "tcg_test")
+        if self.capacity_limit is None:
+            raise unittest.SkipTest("No test capacity specified")
+        for _ in range(self.capacity_limit + 1):
+            self.channel_layer.send_group("tcg_group", {"hey": "there"})
+
+    def test_exceptions(self):
+        """
+        Tests that the two exception classes exist on the channel layer
+        """
+        self.assertTrue(hasattr(self.channel_layer, "MessageTooLarge"))
+        self.assertTrue(hasattr(self.channel_layer, "ChannelFull"))
+
+    def test_message_alteration_after_send(self):
+        """
+        Tests that a message can be altert after it was send through a channel
+        without affecting the object inside the queue.
+        """
+        message = {'value': [1, 2, 3]}
+        self.channel_layer.send('channel', message)
+        message['value'][0] = 'new value'
+
+        _, message = self.receive(['channel'])
+        self.assertEqual(message, {'value': [1, 2, 3]})
diff --git a/asgiref/inmemory.py b/asgiref/inmemory.py
new file mode 100755
index 0000000..589dcde
--- /dev/null
+++ b/asgiref/inmemory.py
@@ -0,0 +1,180 @@
+from __future__ import unicode_literals
+
+from copy import deepcopy
+import random
+import six
+import string
+import time
+import threading
+from collections import deque
+
+from .base_layer import BaseChannelLayer
+
+
+class ChannelLayer(BaseChannelLayer):
+    """
+    In memory channel layer object; a single one is instantiated as
+    "channel_layer" for easy shared use. Only allows global capacity config.
+    """
+
+    def __init__(self, expiry=60, group_expiry=86400, capacity=10, channel_capacity=None):
+        super(ChannelLayer, self).__init__(
+            expiry=expiry,
+            group_expiry=group_expiry,
+            capacity=capacity,
+            channel_capacity=channel_capacity,
+        )
+        self.thread_lock = threading.Lock()
+        # Storage for state
+        self._channels = {}
+        self._groups = {}
+
+    ### ASGI API ###
+
+    extensions = ["groups", "flush"]
+
+    def send(self, channel, message):
+        # Make sure the message is a dict at least (no deep inspection)
+        assert isinstance(message, dict), "Message is not a dict"
+        # Channel name should be text
+        assert self.valid_channel_name(channel), "Channel name not valid"
+        # Make sure the message does not contain reserved keys
+        assert "__asgi_channel__" not in message
+        # If it's a process-local channel, strip off local part and stick full name in message
+        if "!" in channel:
+            message = dict(message.items())
+            message['__asgi_channel__'] = channel
+            channel = self.non_local_name(channel)
+        # Add it to a deque for the appropriate channel name
+        with self.thread_lock:
+            queue = self._channels.setdefault(channel, deque())
+            if len(queue) >= self.capacity:
+                raise self.ChannelFull(channel)
+            queue.append((
+                time.time() + self.expiry,
+                deepcopy(message),
+            ))
+
+    def receive(self, channels, block=False):
+        # Check channel names
+        assert all(
+            self.valid_channel_name(channel, receive=True) for channel in channels
+        ), "One or more channel names invalid"
+        # Trim any local parts off process-local channel names
+        channels = [
+            self.non_local_name(name)
+            for name in channels
+        ]
+        # Shuffle channel names to ensure approximate global ordering
+        channels = list(channels)
+        random.shuffle(channels)
+        # Expire old messages
+        self._clean_expired()
+        # Go through channels and see if a message is available:
+        with self.thread_lock:
+            for channel in channels:
+                if self._channels.get(channel, None):
+                    _, message = self._channels[channel].popleft()
+                    # If there is a full channel name stored in the message, unpack it.
+                    if "__asgi_channel__" in message:
+                        channel = message['__asgi_channel__']
+                        del message['__asgi_channel__']
+                    return channel, message
+        # No message available
+        return None, None
+
+    def new_channel(self, pattern):
+        assert isinstance(pattern, six.text_type)
+        assert pattern.endswith("?"), "New channel pattern must end with ?"
+        # Keep making channel names till one isn't present.
+        while True:
+            random_string = "".join(random.choice(string.ascii_letters) for i in range(8))
+            new_name = pattern + random_string
+            # Basic check for existence
+            if new_name not in self._channels:
+                return new_name
+
+    ### ASGI Group API ###
+
+    def group_add(self, group, channel):
+        # Both should be text and valid
+        assert self.valid_channel_name(channel), "Invalid channel name"
+        assert self.valid_group_name(group), "Invalid group name"
+        # Add to group dict
+        with self.thread_lock:
+            self._groups.setdefault(group, {})
+            self._groups[group][channel] = time.time()
+
+    def group_discard(self, group, channel):
+        # Both should be text and valid
+        assert self.valid_channel_name(channel), "Invalid channel name"
+        assert self.valid_group_name(group), "Invalid group name"
+        # Remove from group set
+        with self.thread_lock:
+            if group in self._groups:
+                if channel in self._groups[group]:
+                    del self._groups[group][channel]
+                if not self._groups[group]:
+                    del self._groups[group]
+
+    def group_channels(self, group):
+        return self._groups.get(group, set())
+
+    def send_group(self, group, message):
+        # Check types
+        assert isinstance(message, dict), "Message is not a dict"
+        assert self.valid_group_name(group), "Invalid group name"
+        # Run clean
+        self._clean_expired()
+        # Send to each channel
+        for channel in self._groups.get(group, set()):
+            try:
+                self.send(channel, message)
+            except self.ChannelFull:
+                pass
+
+    ### ASGI Flush API ###
+
+    def flush(self):
+        self._channels = {}
+        self._groups = {}
+
+    ### Expire cleanup ###
+
+    def _clean_expired(self):
+        """
+        Goes through all messages and groups and removes those that are expired.
+        Any channel with an expired message is removed from all groups.
+        """
+        with self.thread_lock:
+            # Channel cleanup
+            for channel, queue in list(self._channels.items()):
+                remove = False
+                # See if it's expired
+                while queue and queue[0][0] < time.time():
+                    queue.popleft()
+                    remove = True
+                # Any removal prompts group discard
+                if remove:
+                    self._remove_from_groups(channel)
+                # Is the channel now empty and needs deleting?
+                if not queue:
+                    del self._channels[channel]
+            # Group cleanup
+            for group, channels in list(self._groups.items()):
+                for channel, added in list(channels.items()):
+                    if added < (time.time() - self.group_expiry):
+                        del self._groups[group][channel]
+                        if not self._groups[group]:
+                            del self._groups[group]
+
+    def _remove_from_groups(self, channel):
+        """
+        Removes a channel from all groups. Used when a message on it expires.
+        """
+        for channels in self._groups.values():
+            if channel in channels:
+                del channels[channel]
+
+# Global single instance for easy use
+channel_layer = ChannelLayer()
diff --git a/asgiref/wsgi.py b/asgiref/wsgi.py
new file mode 100755
index 0000000..c96edf5
--- /dev/null
+++ b/asgiref/wsgi.py
@@ -0,0 +1,104 @@
+from __future__ import unicode_literals
+
+import six
+
+
+STATUS_MAP = {
+    200: b"OK",
+    201: b"Created",
+    204: b"No Content",
+    400: b"Bad Request",
+    401: b"Unauthorized",
+    403: b"Forbidden",
+    404: b"Not Found",
+    500: b"Internal Server Error",
+}
+
+
+class WsgiToAsgiAdapter(object):
+    """
+    Class which acts as a WSGI application and translates the requests onto
+    HTTP requests on an ASGI backend.
+
+    Fully synchronous, so you'll likely want to run a lot of threads/processes;
+    the main application logic isn't in memory, so the footprint should be
+    lower.
+    """
+
+    def __init__(self, channel_layer):
+        self.channel_layer = channel_layer
+
+    def get_reply_channel(self):
+        """
+        Overrideable for tests, where you want a fixed reply channel.
+        """
+        return self.channel_layer.new_channel("http.response!")
+
+    def __call__(self, environ, start_response):
+        # Translate the environ into an ASGI-style request
+        request = self.translate_request(environ)
+        # Run it through ASGI
+        reply_channel = self.get_reply_channel()
+        request["reply_channel"] = reply_channel
+        self.channel_layer.send("http.request", request)
+        # Translate the first response message
+        response = self.get_message(reply_channel)
+        status = (
+            six.text_type(response["status"]).encode("latin1") +
+            b" " +
+            response.get(
+                "status_text",
+                STATUS_MAP.get(response["status"], b"Unknown Status")
+            )
+        )
+        start_response(
+            status,
+            [(n.encode("latin1"), v) for n, v in response.get("headers", [])],
+        )
+        yield response.get("content", b"")
+        while response.get("more_content", False):
+            response = self.get_message(reply_channel)
+            yield response.get("content", b"")
+
+    def translate_request(self, environ):
+        """
+        Turns a WSGI environ into an ASGI http.request message
+        """
+        request = {}
+        # HTTP version
+        if "SERVER_PROTOCOL" in environ and "HTTP/" in environ["SERVER_PROTOCOL"]:
+            request["http_version"] = environ["SERVER_PROTOCOL"][5:]
+        else:
+            # Lowest possible feature set
+            request["http_version"] = "1.0"
+        # Method
+        request["method"] = environ["REQUEST_METHOD"].upper()
+        # Query string
+        request["query_string"] = environ.get("QUERY_STRING", "").encode("latin1")
+        # Path and root path
+        request["root_path"] =  environ.get("SCRIPT_NAME", "").encode("latin1")
+        request["path"] = environ.get("PATH_INFO", "").encode("latin1")
... 281 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asgiref.git



More information about the Python-modules-commits mailing list