[Python-modules-commits] [python-asgiref] 01/03: importing python-asgiref_1.1.2.orig.tar.gz
Michael Fladischer
fladi at moszumanska.debian.org
Mon Jul 17 12:40:01 UTC 2017
This is an automated email from the git hooks/post-receive script.
fladi pushed a commit to branch master
in repository python-asgiref.
commit e038655ea5362f28380f495b539e6a6f7ebc945c
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date: Mon Jul 17 13:18:56 2017 +0200
importing python-asgiref_1.1.2.orig.tar.gz
---
.gitignore | 7 +
.travis.yml | 17 +++
CHANGELOG.txt | 29 ++++
LICENSE | 27 ++++
Makefile | 16 +++
README.rst | 64 +++++++++
asgiref/__init__.py | 1 +
asgiref/base_layer.py | 141 +++++++++++++++++++
asgiref/conformance.py | 346 +++++++++++++++++++++++++++++++++++++++++++++++
asgiref/inmemory.py | 180 ++++++++++++++++++++++++
asgiref/wsgi.py | 104 ++++++++++++++
setup.cfg | 2 +
setup.py | 25 ++++
tests/__init__.py | 0
tests/test_base_layer.py | 93 +++++++++++++
tests/test_inmemory.py | 25 ++++
tests/test_wsgi.py | 68 ++++++++++
tox.ini | 5 +
18 files changed, 1150 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100755
index 0000000..9a904ec
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+*.egg-info
+dist/
+build/
+__pycache__/
+*.pyc
+.tox/
+*~
diff --git a/.travis.yml b/.travis.yml
new file mode 100755
index 0000000..1d0847f
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,17 @@
+# Faster container-based infrastructure
+sudo: false
+
+language: python
+
+# Python versions supported by channel's supported Django versions
+python:
+ - "2.7"
+ - "3.4"
+ - "3.5"
+
+install:
+ - pip install -e .
+ # Useful for debugging
+ - pip freeze
+
+script: python -m unittest discover
diff --git a/CHANGELOG.txt b/CHANGELOG.txt
new file mode 100755
index 0000000..c6e570e
--- /dev/null
+++ b/CHANGELOG.txt
@@ -0,0 +1,29 @@
+1.1.2 (2017-05-16)
+-----------------
+
+* Conformance test suite now allows for retries and tests group_send's behaviour with capacity
+* valid_channel_names now has a receive parameter
+
+1.1.1 (2017-04-02)
+------------------
+
+* Error with sending to multi-process channels with the same message fixed
+
+1.1.0 (2017-04-01)
+------------------
+
+* Process-specific channel behaviour has been changed, and the base layer
+ and conformance suites updated to match.
+
+1.0.1 (2017-03-19)
+------------------
+
+* Improved channel and group name validation
+* Test rearrangements and improvements
+
+1.0.0 (2016-04-11)
+------------------
+
+* `receive_many` is now `receive`
+* In-memory layer deepcopies messages so they cannot be mutated post-send
+* Better errors for bad channel/group names
diff --git a/LICENSE b/LICENSE
new file mode 100755
index 0000000..5f4f225
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) Django Software Foundation and individual contributors.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ 1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ 2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+ 3. Neither the name of Django nor the names of its contributors may be used
+ to endorse or promote products derived from this software without
+ specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile b/Makefile
new file mode 100755
index 0000000..1a1f55e
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,16 @@
+.PHONY: release
+
+all:
+
+release:
+ifndef version
+ $(error Please supply a version)
+endif
+ @echo Releasing version $(version)
+ifeq (,$(findstring $(version),$(shell git log --oneline -1)))
+ $(error Last commit does not match version)
+endif
+ git tag $(version)
+ git push
+ git push --tags
+ python setup.py sdist bdist_wheel upload
diff --git a/README.rst b/README.rst
new file mode 100755
index 0000000..d3af269
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,64 @@
+asgiref
+=======
+
+.. image:: https://api.travis-ci.org/django/asgiref.svg
+ :target: https://travis-ci.org/django/asgiref
+
+.. image:: https://img.shields.io/pypi/v/asgiref.svg
+ :target: https://pypi.python.org/pypi/asgiref
+
+Contains various reference ASGI implementations, including:
+
+* A base channel layer, ``asgiref.base_layer``
+* An in-memory channel layer, ``asgiref.inmemory``
+* WSGI-to-ASGI and ASGI-to-WSGI adapters, in ``asgiref.wsgi``
+
+
+Base Channel Layer
+------------------
+
+Provides an optional template to start ASGI channel layers from with the two
+exceptions you need provided and all API functions stubbed out.
+
+Also comes with logic for doing per-channel capacities using channel names and
+globbing; use ``self.get_capacity`` and pass the arguments through to the base
+``__init__`` if you want to use it.
+
+
+In-memory Channel Layer
+-----------------------
+
+Simply instantiate ``asgiref.inmemory.ChannelLayer``, or use the pre-made
+``asgiref.inmemory.channel_layer`` for easy use. Implements the ``group``
+extension, and is designed to support running multiple ASGI programs
+in separate threads within one process (the channel layer is threadsafe).
+
+
+WSGI-ASGI Adapters
+------------------
+
+These are not yet complete and should not be used.
+
+Dependencies
+------------
+
+All Channels projects currently support Python 2.7, 3.4 and 3.5.
+
+Contributing
+------------
+
+Please refer to the
+`main Channels contributing docs <https://github.com/django/channels/blob/master/CONTRIBUTING.rst>`_.
+That also contains advice on how to set up the development environment and run the tests.
+
+Maintenance and Security
+------------------------
+
+To report security issues, please contact security at djangoproject.com. For GPG
+signatures and more security process information, see
+https://docs.djangoproject.com/en/dev/internals/security/.
+
+To report bugs or request new features, please open a new GitHub issue.
+
+This repository is part of the Channels project. For the shepherd and maintenance team, please see the
+`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.
diff --git a/asgiref/__init__.py b/asgiref/__init__.py
new file mode 100755
index 0000000..72f26f5
--- /dev/null
+++ b/asgiref/__init__.py
@@ -0,0 +1 @@
+__version__ = "1.1.2"
diff --git a/asgiref/base_layer.py b/asgiref/base_layer.py
new file mode 100755
index 0000000..ef1b8b2
--- /dev/null
+++ b/asgiref/base_layer.py
@@ -0,0 +1,141 @@
+from __future__ import unicode_literals
+
+import fnmatch
+import re
+import six
+import warnings
+
+
+class BaseChannelLayer(object):
+ """
+ Base channel layer object; outlines the core API and contains some handy
+ reuseable code bits. You don't have to inherit from this, but it might
+ save you time.
+ """
+
+ def __init__(self, expiry=60, group_expiry=86400, capacity=100, channel_capacity=None):
+ self.expiry = expiry
+ self.capacity = capacity
+ self.group_expiry = group_expiry
+ self.channel_capacity = self.compile_capacities(channel_capacity or {})
+
+ ### ASGI API ###
+
+ extensions = []
+
+ class ChannelFull(Exception):
+ pass
+
+ class MessageTooLarge(Exception):
+ pass
+
+ def send(self, channel, message):
+ raise NotImplementedError()
+
+ def receive(self, channels, block=False):
+ raise NotImplementedError()
+
+ def receive_many(self, channels, block=False):
+ """
+ receive_many is deprecated, but this is provided for backwards compatability.
+ """
+ warnings.warn("receive_many is deprecated; please use receive", DeprecationWarning)
+ return self.receive(channels, block)
+
+ def new_channel(self, pattern):
+ raise NotImplementedError()
+
+ ### ASGI Group API ###
+
+ def group_add(self, group, channel):
+ raise NotImplementedError()
+
+ def group_discard(self, group, channel):
+ raise NotImplementedError()
+
+ def group_channels(self, group):
+ raise NotImplementedError()
+
+ def send_group(self, group, message):
+ raise NotImplementedError()
+
+ ### ASGI Flush API ###
+
+ def flush(self):
+ raise NotImplementedError()
+
+ ### Capacity utility functions
+
+ def compile_capacities(self, channel_capacity):
+ """
+ Takes an input channel_capacity dict and returns the compiled list
+ of regexes that get_capacity will look for as self.channel_capacity/
+ """
+ result = []
+ for pattern, value in channel_capacity.items():
+ # If they passed in a precompiled regex, leave it, else intepret
+ # it as a glob.
+ if hasattr(pattern, "match"):
+ result.append((pattern, value))
+ else:
+ result.append((re.compile(fnmatch.translate(pattern)), value))
+ return result
+
+ def get_capacity(self, channel):
+ """
+ Gets the correct capacity for the given channel; either the default,
+ or a matching result from channel_capacity. Returns the first matching
+ result; if you want to control the order of matches, use an ordered dict
+ as input.
+ """
+ for pattern, capacity in self.channel_capacity:
+ if pattern.match(channel):
+ return capacity
+ return self.capacity
+
+ def match_type_and_length(self, name):
+ if (len(name) < 100) and (isinstance(name, six.text_type)):
+ return True
+ return False
+
+ ### Name validation functions
+
+ channel_name_regex = re.compile(r"^[a-zA-Z\d\-_.]+((\?|\!)[\d\w\-_.]*)?$")
+ group_name_regex = re.compile(r"^[a-zA-Z\d\-_.]+$")
+ invalid_name_error = "{} name must be a valid unicode string containing only ASCII alphanumerics, hyphens, underscores, or periods."
+
+ def valid_channel_name(self, name, receive=False):
+ if self.match_type_and_length(name):
+ if bool(self.channel_name_regex.match(name)):
+ # Check cases for special channels
+ if "?" in name and name.endswith("?"):
+ raise TypeError("Single-reader channel names must not end with ?")
+ elif "!" in name and not name.endswith("!") and receive:
+ raise TypeError("Process-local channel names in receive() must end at the !")
+ return True
+ raise TypeError("Channel name must be a valid unicode string containing only ASCII alphanumerics, hyphens, or periods, not '{}'.".format(name))
+
+ def valid_group_name(self, name):
+ if self.match_type_and_length(name):
+ if bool(self.group_name_regex.match(name)):
+ return True
+ raise TypeError("Group name must be a valid unicode string containing only ASCII alphanumerics, hyphens, or periods.")
+
+ def valid_channel_names(self, names, receive=False):
+ _non_empty_list = True if names else False
+ _names_type = isinstance(names, list)
+ assert _non_empty_list and _names_type, "names must be a non-empty list"
+
+ assert all(self.valid_channel_name(channel, receive=receive) for channel in names)
+ return True
+
+ def non_local_name(self, name):
+ """
+ Given a channel name, returns the "non-local" part. If the channel name
+ is a process-specific channel (contains !) this means the part up to
+ and including the !; if it is anything else, this means the full name.
+ """
+ if "!" in name:
+ return name[:name.find("!")+1]
+ else:
+ return name
diff --git a/asgiref/conformance.py b/asgiref/conformance.py
new file mode 100755
index 0000000..acb76a3
--- /dev/null
+++ b/asgiref/conformance.py
@@ -0,0 +1,346 @@
+"""
+ASGI spec conformance test suite.
+
+Calling the functions with an ASGI channel layer instance will return you a
+single TestCase instance that checks for conformity on that instance.
+
+You MUST also pass along an expiry value to the sets of tests, to allow the
+suite to wait correctly for expiry. It's suggested you configure the layer
+for 1-second expiry during tests, and use a 1.1 second expiry delay.
+
+The channel layers should be empty to start with, and discarded after use,
+as they'll be full of test data. If a layer supports the "flush" extension,
+it'll be flushed before every test.
+"""
+
+from __future__ import unicode_literals
+import six
+import time
+import unittest
+
+
+class ConformanceTestCase(unittest.TestCase):
+ """
+ Tests that core ASGI functionality is maintained.
+ """
+
+ channel_layer = None
+ expiry_delay = None
+ capacity_limit = None
+ receive_tries = 1
+
+ def receive(self, channels):
+ """
+ Allows tests to automatically call channel_layer.receive() more than once.
+ This is necessary for testing ChannelLayer implementations that do not guarantee a response will
+ be returned on every receive() call, even when there are messages on the channel. This would be
+ the case, for example, for a channel layer designed for a multi-worker environment with multiple
+ backing hosts, that checks a different host on each call.
+ """
+ for _ in range(self.receive_tries):
+ channel, message = self.channel_layer.receive(channels)
+ if channel is not None:
+ return channel, message
+ return None, None
+
+ @classmethod
+ def setUpClass(cls):
+ # Don't let this actual class run, it's abstract
+ if cls is ConformanceTestCase:
+ raise unittest.SkipTest("Skipping base class tests")
+
+ def setUp(self):
+ if self.channel_layer is None:
+ raise ValueError("You must define 'channel_layer' when subclassing the conformance tests.")
+ if self.expiry_delay is None:
+ raise ValueError("You must define 'expiry_delay' when subclassing the conformance tests.")
+ if "flush" in self.channel_layer.extensions:
+ self.channel_layer.flush()
+
+ def skip_if_no_extension(self, extension):
+ """
+ Handy function for skipping things without an extension.
+ We can't use the decorators, as we need access to self.
+ """
+ if extension not in self.channel_layer.extensions:
+ raise unittest.SkipTest("No %s extension" % extension)
+
+ def test_send_recv(self):
+ """
+ Tests that channels can send and receive messages right.
+ """
+ self.channel_layer.send("sr_test", {"value": "blue"})
+ self.channel_layer.send("sr_test", {"value": "green"})
+ self.channel_layer.send("sr_test", {"value": "yellow"})
+ self.channel_layer.send("sr_test2", {"value": "red"})
+ # Receive from the first channel twice
+ response_messages = []
+ for i in range(3):
+ channel, message = self.receive(["sr_test"])
+ response_messages.append(message)
+ self.assertEqual(channel, "sr_test")
+ for response in response_messages:
+ self.assertTrue("value" in response)
+ # Check that all messages were returned; order is not guaranteed
+ self.assertEqual(set([r["value"] for r in response_messages]), set(["blue", "green", "yellow"]))
+ # And the other channel with multi select
+ channel, message = self.receive(["sr_test", "sr_test2"])
+ self.assertEqual(channel, "sr_test2")
+ self.assertEqual(message, {"value": "red"})
+
+ def test_single_process_receive(self):
+ """
+ Tests that single-process receive gets anything with the right prefix.
+ """
+ self.channel_layer.send("spr_test!a", {"best": "ponies"})
+ channel, message = self.receive(["spr_test!"])
+ self.assertEqual(channel, "spr_test!a")
+ self.assertEqual(message, {"best": "ponies"})
+ self.channel_layer.send("spr_test!b", {"best": "pangolins"})
+ channel, message = self.receive(["spr_test!"])
+ self.assertEqual(channel, "spr_test!b")
+ self.assertEqual(message, {"best": "pangolins"})
+
+ def test_single_process_receive_error(self):
+ """
+ Tests that single-process receive isn't allowed with a local part.
+ """
+ with self.assertRaises(Exception):
+ self.receive(["spr_test!c"])
+
+ def test_message_expiry(self):
+ """
+ Tests that messages expire correctly.
+ """
+ self.channel_layer.send("me_test", {"value": "blue"})
+ time.sleep(self.expiry_delay)
+ channel, message = self.receive(["me_test"])
+ self.assertIs(channel, None)
+ self.assertIs(message, None)
+
+ def test_new_channel_single_reader(self):
+ """
+ Tests that new single-reader channel names are made correctly.
+ """
+ pattern = "test.foo?"
+ name1 = self.channel_layer.new_channel(pattern)
+ self.assertFalse(name1.endswith("?"))
+ self.assertTrue("?" in name1)
+ self.assertEqual(name1.find("?"), name1.rfind("?"))
+ self.assertIsInstance(name1, six.text_type)
+ # Send a message and make sure new_channel on second pass changes
+ self.channel_layer.send(name1, {"value": "blue"})
+ name2 = self.channel_layer.new_channel(pattern)
+ # Make sure we can consume off of that new channel
+ channel, message = self.receive([name1, name2])
+ self.assertEqual(channel, name1)
+ self.assertEqual(message, {"value": "blue"})
+
+ def test_new_channel_failures(self):
+ """
+ Tests that we don't allow bad new channel names.
+ """
+ with self.assertRaises(Exception):
+ self.channel_layer.new_channel("test!")
+ with self.assertRaises(Exception):
+ self.channel_layer.new_channel("test.foo")
+
+ def test_strings(self):
+ """
+ Ensures byte strings and unicode strings both make it through
+ serialization properly.
+ """
+ # Message. Double-nested to ensure serializers are recursing properly.
+ message = {
+ "values": {
+ # UTF-8 sequence for british pound, but we want it not interpreted into that.
+ "utf-bytes": b"\xc2\xa3",
+ # Actual unicode for british pound, should come back as 1 char
+ "unicode": "\u00a3",
+ # Emoji, in case someone is using 3-byte-wide unicode storage
+ "emoji": "\u1F612",
+ # Random control characters and null
+ "control": b"\x01\x00\x03\x21",
+ }
+ }
+ # Send it and receive it
+ self.channel_layer.send("str_test", message)
+ _, received = self.receive(["str_test"])
+ # Compare
+ self.assertIsInstance(received["values"]["utf-bytes"], six.binary_type)
+ self.assertIsInstance(received["values"]["unicode"], six.text_type)
+ self.assertIsInstance(received["values"]["emoji"], six.text_type)
+ self.assertIsInstance(received["values"]["control"], six.binary_type)
+ self.assertEqual(received["values"]["utf-bytes"], message["values"]["utf-bytes"])
+ self.assertEqual(received["values"]["unicode"], message["values"]["unicode"])
+ self.assertEqual(received["values"]["emoji"], message["values"]["emoji"])
+ self.assertEqual(received["values"]["control"], message["values"]["control"])
+
+ def test_groups(self):
+ """
+ Tests that basic group addition and send works
+ """
+ self.skip_if_no_extension("groups")
+ # Make a group and send to it
+ self.channel_layer.group_add("tgroup", "tg_test")
+ self.channel_layer.group_add("tgroup", "tg_test2")
+ self.channel_layer.group_add("tgroup", "tg_test3")
+ self.channel_layer.group_discard("tgroup", "tg_test3")
+ self.channel_layer.send_group("tgroup", {"value": "orange"})
+ # Receive from the two channels in the group and ensure messages
+ channel, message = self.receive(["tg_test"])
+ self.assertEqual(channel, "tg_test")
+ self.assertEqual(message, {"value": "orange"})
+ channel, message = self.receive(["tg_test2"])
+ self.assertEqual(channel, "tg_test2")
+ self.assertEqual(message, {"value": "orange"})
+ # Make sure another channel does not get a message
+ channel, message = self.receive(["tg_test3"])
+ self.assertIs(channel, None)
+ self.assertIs(message, None)
+
+ def test_groups_process(self):
+ """
+ Tests that group membership and sending works with process-specific channels.
+ """
+ self.skip_if_no_extension("groups")
+ # Make a group and send to it
+ self.channel_layer.group_add("tgroup", "tgp!test")
+ self.channel_layer.group_add("tgroup", "tgp!test2")
+ self.channel_layer.group_add("tgroup", "tgp!test3")
+ self.channel_layer.group_discard("tgroup", "tgp!test2")
+ self.channel_layer.send_group("tgroup", {"value": "orange"})
+ # Receive from the two channels in the group and ensure messages
+ channel, message = self.receive(["tgp!"])
+ self.assertIn(channel, ["tgp!test", "tgp!test3"])
+ self.assertEqual(message, {"value": "orange"})
+ channel, message = self.receive(["tgp!"])
+ self.assertIn(channel, ["tgp!test", "tgp!test3"])
+ self.assertEqual(message, {"value": "orange"})
+ # Make sure another channel does not get a message
+ channel, message = self.receive(["tgp!"])
+ self.assertIs(channel, None)
+ self.assertIs(message, None)
+
+ def test_group_channels(self):
+ """
+ Tests that group membership check works
+ """
+ self.skip_if_no_extension("groups")
+ # Make a group
+ self.channel_layer.group_add("tgroup", "tg_test")
+ self.channel_layer.group_add("tgroup", "tg_test2")
+ self.channel_layer.group_add("tgroup", "tg_test3")
+ # Check group members
+ self.assertEqual(
+ set(self.channel_layer.group_channels("tgroup")),
+ {"tg_test", "tg_test2", "tg_test3"},
+ )
+ # Discard from group
+ self.channel_layer.group_discard("tgroup", "tg_test3")
+ self.assertEqual(
+ set(self.channel_layer.group_channels("tgroup")),
+ {"tg_test", "tg_test2"},
+ )
+
+ def test_flush(self):
+ """
+ Tests that messages go away after a flush.
+ """
+ self.skip_if_no_extension("flush")
+ # Send something to flush
+ self.channel_layer.send("fl_test", {"value": "blue"})
+ self.channel_layer.flush()
+ channel, message = self.receive(["fl_test"])
+ self.assertIs(channel, None)
+ self.assertIs(message, None)
+
+ def test_flush_groups(self):
+ """
+ Tests that groups go away after a flush.
+ """
+ self.skip_if_no_extension("groups")
+ self.skip_if_no_extension("flush")
+ # Add things to a group and send to it
+ self.channel_layer.group_add("tfg_group", "tfg_test")
+ self.channel_layer.send_group("tfg_group", {"value": "blue"})
+ self.channel_layer.flush()
+ channel, message = self.receive(["tfg_test"])
+ self.assertIs(channel, None)
+ self.assertIs(message, None)
+
+ def test_group_expiry(self):
+ """
+ Tests that group expiry is provided, and test it if it's less than
+ 20 seconds.
+ """
+ self.skip_if_no_extension("groups")
+ # Check group expiry is provided, and see if we can continue
+ expiry = getattr(self.channel_layer, "group_expiry", None)
+ if expiry is None:
+ self.fail("group_expiry is not defined")
+ if expiry > 20:
+ raise unittest.SkipTest("Expiry too long for test")
+ # Add things to a group
+ self.channel_layer.group_add("tge_group", "tge_test")
+ # Wait group expiry plus one
+ time.sleep(expiry + 1)
+ # Ensure message never arrives
+ self.channel_layer.send_group("tge_group", {"value": "blue"})
+ channel, message = self.receive(["tge_test"])
+ self.assertIs(channel, None)
+ self.assertIs(message, None)
+
+ def test_capacity(self):
+ """
+ Tests that the capacity limiter on send() raises ChannelFull
+ after the right number of messages. Only runs if capacity_limit is set.
+ """
+ if self.capacity_limit is None:
+ raise unittest.SkipTest("No test capacity specified")
+ for _ in range(self.capacity_limit):
+ self.channel_layer.send("cap_test", {"hey": "there"})
+ with self.assertRaises(self.channel_layer.ChannelFull):
+ self.channel_layer.send("cap_test", {"hey": "there"})
+
+ def test_capacity_process(self):
+ """
+ Tests that the capacity limiter works on process-specific channels overall
+ """
+ if self.capacity_limit is None or self.capacity_limit < 2:
+ raise unittest.SkipTest("Test capacity is unspecified or too low")
+ for i in range(self.capacity_limit):
+ self.channel_layer.send("capp!%s" % i, {"hey": "there"})
+ with self.assertRaises(self.channel_layer.ChannelFull):
+ self.channel_layer.send("capp!final", {"hey": "there"})
+
+ def test_capacity_group(self):
+ """
+ Tests that the capacity limiter on group_send() never raises
+ ChannelFull.
+ """
+ self.skip_if_no_extension("groups")
+ self.channel_layer.group_add("tcg_group", "tcg_test")
+ if self.capacity_limit is None:
+ raise unittest.SkipTest("No test capacity specified")
+ for _ in range(self.capacity_limit + 1):
+ self.channel_layer.send_group("tcg_group", {"hey": "there"})
+
+ def test_exceptions(self):
+ """
+ Tests that the two exception classes exist on the channel layer
+ """
+ self.assertTrue(hasattr(self.channel_layer, "MessageTooLarge"))
+ self.assertTrue(hasattr(self.channel_layer, "ChannelFull"))
+
+ def test_message_alteration_after_send(self):
+ """
+ Tests that a message can be altert after it was send through a channel
+ without affecting the object inside the queue.
+ """
+ message = {'value': [1, 2, 3]}
+ self.channel_layer.send('channel', message)
+ message['value'][0] = 'new value'
+
+ _, message = self.receive(['channel'])
+ self.assertEqual(message, {'value': [1, 2, 3]})
diff --git a/asgiref/inmemory.py b/asgiref/inmemory.py
new file mode 100755
index 0000000..589dcde
--- /dev/null
+++ b/asgiref/inmemory.py
@@ -0,0 +1,180 @@
+from __future__ import unicode_literals
+
+from copy import deepcopy
+import random
+import six
+import string
+import time
+import threading
+from collections import deque
+
+from .base_layer import BaseChannelLayer
+
+
+class ChannelLayer(BaseChannelLayer):
+ """
+ In memory channel layer object; a single one is instantiated as
+ "channel_layer" for easy shared use. Only allows global capacity config.
+ """
+
+ def __init__(self, expiry=60, group_expiry=86400, capacity=10, channel_capacity=None):
+ super(ChannelLayer, self).__init__(
+ expiry=expiry,
+ group_expiry=group_expiry,
+ capacity=capacity,
+ channel_capacity=channel_capacity,
+ )
+ self.thread_lock = threading.Lock()
+ # Storage for state
+ self._channels = {}
+ self._groups = {}
+
+ ### ASGI API ###
+
+ extensions = ["groups", "flush"]
+
+ def send(self, channel, message):
+ # Make sure the message is a dict at least (no deep inspection)
+ assert isinstance(message, dict), "Message is not a dict"
+ # Channel name should be text
+ assert self.valid_channel_name(channel), "Channel name not valid"
+ # Make sure the message does not contain reserved keys
+ assert "__asgi_channel__" not in message
+ # If it's a process-local channel, strip off local part and stick full name in message
+ if "!" in channel:
+ message = dict(message.items())
+ message['__asgi_channel__'] = channel
+ channel = self.non_local_name(channel)
+ # Add it to a deque for the appropriate channel name
+ with self.thread_lock:
+ queue = self._channels.setdefault(channel, deque())
+ if len(queue) >= self.capacity:
+ raise self.ChannelFull(channel)
+ queue.append((
+ time.time() + self.expiry,
+ deepcopy(message),
+ ))
+
+ def receive(self, channels, block=False):
+ # Check channel names
+ assert all(
+ self.valid_channel_name(channel, receive=True) for channel in channels
+ ), "One or more channel names invalid"
+ # Trim any local parts off process-local channel names
+ channels = [
+ self.non_local_name(name)
+ for name in channels
+ ]
+ # Shuffle channel names to ensure approximate global ordering
+ channels = list(channels)
+ random.shuffle(channels)
+ # Expire old messages
+ self._clean_expired()
+ # Go through channels and see if a message is available:
+ with self.thread_lock:
+ for channel in channels:
+ if self._channels.get(channel, None):
+ _, message = self._channels[channel].popleft()
+ # If there is a full channel name stored in the message, unpack it.
+ if "__asgi_channel__" in message:
+ channel = message['__asgi_channel__']
+ del message['__asgi_channel__']
+ return channel, message
+ # No message available
+ return None, None
+
+ def new_channel(self, pattern):
+ assert isinstance(pattern, six.text_type)
+ assert pattern.endswith("?"), "New channel pattern must end with ?"
+ # Keep making channel names till one isn't present.
+ while True:
+ random_string = "".join(random.choice(string.ascii_letters) for i in range(8))
+ new_name = pattern + random_string
+ # Basic check for existence
+ if new_name not in self._channels:
+ return new_name
+
+ ### ASGI Group API ###
+
+ def group_add(self, group, channel):
+ # Both should be text and valid
+ assert self.valid_channel_name(channel), "Invalid channel name"
+ assert self.valid_group_name(group), "Invalid group name"
+ # Add to group dict
+ with self.thread_lock:
+ self._groups.setdefault(group, {})
+ self._groups[group][channel] = time.time()
+
+ def group_discard(self, group, channel):
+ # Both should be text and valid
+ assert self.valid_channel_name(channel), "Invalid channel name"
+ assert self.valid_group_name(group), "Invalid group name"
+ # Remove from group set
+ with self.thread_lock:
+ if group in self._groups:
+ if channel in self._groups[group]:
+ del self._groups[group][channel]
+ if not self._groups[group]:
+ del self._groups[group]
+
+ def group_channels(self, group):
+ return self._groups.get(group, set())
+
+ def send_group(self, group, message):
+ # Check types
+ assert isinstance(message, dict), "Message is not a dict"
+ assert self.valid_group_name(group), "Invalid group name"
+ # Run clean
+ self._clean_expired()
+ # Send to each channel
+ for channel in self._groups.get(group, set()):
+ try:
+ self.send(channel, message)
+ except self.ChannelFull:
+ pass
+
+ ### ASGI Flush API ###
+
+ def flush(self):
+ self._channels = {}
+ self._groups = {}
+
+ ### Expire cleanup ###
+
+ def _clean_expired(self):
+ """
+ Goes through all messages and groups and removes those that are expired.
+ Any channel with an expired message is removed from all groups.
+ """
+ with self.thread_lock:
+ # Channel cleanup
+ for channel, queue in list(self._channels.items()):
+ remove = False
+ # See if it's expired
+ while queue and queue[0][0] < time.time():
+ queue.popleft()
+ remove = True
+ # Any removal prompts group discard
+ if remove:
+ self._remove_from_groups(channel)
+ # Is the channel now empty and needs deleting?
+ if not queue:
+ del self._channels[channel]
+ # Group cleanup
+ for group, channels in list(self._groups.items()):
+ for channel, added in list(channels.items()):
+ if added < (time.time() - self.group_expiry):
+ del self._groups[group][channel]
+ if not self._groups[group]:
+ del self._groups[group]
+
+ def _remove_from_groups(self, channel):
+ """
+ Removes a channel from all groups. Used when a message on it expires.
+ """
+ for channels in self._groups.values():
+ if channel in channels:
+ del channels[channel]
+
+# Global single instance for easy use
+channel_layer = ChannelLayer()
diff --git a/asgiref/wsgi.py b/asgiref/wsgi.py
new file mode 100755
index 0000000..c96edf5
--- /dev/null
+++ b/asgiref/wsgi.py
@@ -0,0 +1,104 @@
+from __future__ import unicode_literals
+
+import six
+
+
+STATUS_MAP = {
+ 200: b"OK",
+ 201: b"Created",
+ 204: b"No Content",
+ 400: b"Bad Request",
+ 401: b"Unauthorized",
+ 403: b"Forbidden",
+ 404: b"Not Found",
+ 500: b"Internal Server Error",
+}
+
+
+class WsgiToAsgiAdapter(object):
+ """
+ Class which acts as a WSGI application and translates the requests onto
+ HTTP requests on an ASGI backend.
+
+ Fully synchronous, so you'll likely want to run a lot of threads/processes;
+ the main application logic isn't in memory, so the footprint should be
+ lower.
+ """
+
+ def __init__(self, channel_layer):
+ self.channel_layer = channel_layer
+
+ def get_reply_channel(self):
+ """
+ Overrideable for tests, where you want a fixed reply channel.
+ """
+ return self.channel_layer.new_channel("http.response!")
+
+ def __call__(self, environ, start_response):
+ # Translate the environ into an ASGI-style request
+ request = self.translate_request(environ)
+ # Run it through ASGI
+ reply_channel = self.get_reply_channel()
+ request["reply_channel"] = reply_channel
+ self.channel_layer.send("http.request", request)
+ # Translate the first response message
+ response = self.get_message(reply_channel)
+ status = (
+ six.text_type(response["status"]).encode("latin1") +
+ b" " +
+ response.get(
+ "status_text",
+ STATUS_MAP.get(response["status"], b"Unknown Status")
+ )
+ )
+ start_response(
+ status,
+ [(n.encode("latin1"), v) for n, v in response.get("headers", [])],
+ )
+ yield response.get("content", b"")
+ while response.get("more_content", False):
+ response = self.get_message(reply_channel)
+ yield response.get("content", b"")
+
+ def translate_request(self, environ):
+ """
+ Turns a WSGI environ into an ASGI http.request message
+ """
+ request = {}
+ # HTTP version
+ if "SERVER_PROTOCOL" in environ and "HTTP/" in environ["SERVER_PROTOCOL"]:
+ request["http_version"] = environ["SERVER_PROTOCOL"][5:]
+ else:
+ # Lowest possible feature set
+ request["http_version"] = "1.0"
+ # Method
+ request["method"] = environ["REQUEST_METHOD"].upper()
+ # Query string
+ request["query_string"] = environ.get("QUERY_STRING", "").encode("latin1")
+ # Path and root path
+ request["root_path"] = environ.get("SCRIPT_NAME", "").encode("latin1")
+ request["path"] = environ.get("PATH_INFO", "").encode("latin1")
... 281 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asgiref.git
More information about the Python-modules-commits
mailing list