[Python-modules-commits] [python-asgi-redis] 01/02: importing python-asgi-redis_1.4.3.orig.tar.gz

Michael Fladischer fladi at moszumanska.debian.org
Fri Nov 24 08:47:09 UTC 2017


This is an automated email from the git hooks/post-receive script.

fladi pushed a commit to branch debian/master
in repository python-asgi-redis.

commit 85519c6bdd8332caa7cc40630de5ea822ff0a95f
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date:   Mon Nov 20 16:57:34 2017 +0100

    importing python-asgi-redis_1.4.3.orig.tar.gz
---
 .gitignore                     |   6 +
 .travis.yml                    |  22 +
 CHANGELOG.txt                  | 134 ++++++
 Dockerfile                     |  17 +
 LICENSE                        |  27 ++
 Makefile                       |  16 +
 README.rst                     | 229 ++++++++++
 asgi_redis/__init__.py         |   5 +
 asgi_redis/core.py             | 640 ++++++++++++++++++++++++++++
 asgi_redis/local.py            |  72 ++++
 asgi_redis/sentinel.py         | 178 ++++++++
 asgi_redis/twisted_utils.py    |  19 +
 docker-compose.yml             |  31 ++
 pytest.ini                     |   4 +
 setup.cfg                      |   2 +
 setup.py                       |  52 +++
 testing/redis/Dockerfile       |   5 +
 testing/redis/redis.conf       | 943 +++++++++++++++++++++++++++++++++++++++++
 testing/sentinel/Dockerfile    |   6 +
 testing/sentinel/sentinel.conf |  13 +
 tests/__init__.py              |   0
 tests/constants.py             |  10 +
 tests/test_core.py             | 125 ++++++
 tests/test_integration.py      |  52 +++
 tests/test_local.py            |  22 +
 tests/test_sentinel.py         | 176 ++++++++
 tests/test_twisted.py          |  50 +++
 tox.ini                        |   8 +
 28 files changed, 2864 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..68fc79c
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+*.egg-info
+dist/
+build/
+*.pyc
+/.tox
+.DS_Store
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..45ebad4
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,22 @@
+sudo: false
+dist: trusty
+language: python
+python:
+  - "2.7"
+  - "3.4"
+  - "3.5"
+  - "3.6"
+services:
+  - redis-server
+cache:
+  directories:
+  - $HOME/.cache/pip/wheels
+install:
+  - pip install -U pip wheel setuptools
+  - pip install 'git+https://github.com/django/channels.git#egg=channels-benchmark&subdirectory=testproject'
+  - pip install -e .[tests]
+  - pip freeze
+script: py.test
+# TODO: setup test coverage tracking service.
+# after_success:
+#   - codecov
diff --git a/CHANGELOG.txt b/CHANGELOG.txt
new file mode 100644
index 0000000..40da981
--- /dev/null
+++ b/CHANGELOG.txt
@@ -0,0 +1,134 @@
+1.4.3 (2017-09-14)
+------------------
+
+* The internal class layout of the channel layers has been adjusted, but the
+  public API remains the same.
+
+* Fixed compatability with newer releases of redis-py.
+
+
+1.4.2 (2017-06-20)
+------------------
+
+* receive() no longer blocks indefinitely, just for a while.
+
+* Built-in lua scripts have their SHA pre-set to avoid a guaranteed cache miss
+  on their first usage.
+
+
+1.4.1 (2017-06-15)
+------------------
+
+* A keyspace leak has been fixed where message body keys were not deleted after
+  receive, and instead left to expire.
+
+
+1.4.0 (2017-05-18)
+------------------
+
+* Sharded mode support is now more robust with send/receive deterministically
+  moving around the shard ring rather than picking random connections. This
+  means there is no longer a slight chance of messages being missed when there
+  are not significantly more readers on a channel than shards. Tests have
+  also been updated so they run fully on sharded mode thanks to this.
+
+* Sentinel support has been considerably improved, with connection caching
+  (via sentinal_refresh_interval), and automatic service discovery.
+
+* The Twisted backend now picks up the Redis password if one is configured.
+
+
+1.3.0 (2017-04-07)
+------------------
+
+* Change format of connection arguments to be a single dict called
+  ``connection_kwargs`` rather than individual options, as they change by
+  connection type. You will need to change your settings if you have any of
+  socket_connect_timeout, socket_timeout, socket_keepalive or
+  socket_keepalive_options set to move them into a ``connection_kwargs`` dict.
+
+1.2.1 (2017-04-02)
+------------------
+
+* Error with sending to multi-process channels with the same message fixed
+
+1.2.0 (2017-04-01)
+------------------
+
+* Process-specific channel behaviour changed to match new spec
+* Redis Sentinel channel layer added
+
+1.1.0 (2017-03-18)
+------------------
+
+* Support for the ASGI statistics extension
+* Distribution of items over multiple servers using consistent hashing is improved
+* Handles timeout exceptions in newer redis-py library versions correctly
+* Support for configuring the socket_connect_timeout, socket_timeout, socket_keepalive and socket_keepalive_options
+  options that are passed to redis-py.
+
+1.0.0 (2016-11-05)
+------------------
+
+* Renamed "receive_many" to "receive"
+* Improved (more explicit) error handling for Redis errors/old versions
+* Bad hosts (string not lost) configuration now errors explicitly
+
+0.14.1 (2016-08-24)
+-------------------
+
+* Removed unused reverse channels-to-groups mapping keys as they were not
+  cleaned up proactively and quickly filled up databases.
+
+0.14.0 (2016-07-16)
+-------------------
+
+* Implemented group_channels method.
+
+0.13.0 (2016-06-09)
+-------------------
+
+* Added local-and-remote backend option (uses asgi_ipc)
+
+0.12.0 (2016-05-25)
+-------------------
+
+* Added symmetric encryption for messages and at-rest data with key rotation.
+
+0.11.0 (2016-05-07)
+-------------------
+
+* Implement backpressure with per-channel and default capacities.
+
+0.10.0 (2016-03-27)
+-------------------
+
+* Group expiry code re-added and fixed.
+
+0.9.1 (2016-03-23)
+------------------
+
+* Remove old group expiry code that was killing groups after 60 seconds.
+
+0.9.0 (2016-03-21)
+------------------
+
+* Connections now pooled per backend shard
+* Random portion of channel names now 12 characters
+* Implements new ASGI single-response-channel pattern spec
+
+0.8.3 (2016-02-28)
+------------------
+
+* Nonblocking receive_many now uses Lua script rather than for loop.
+
+0.8.2 (2016-02-22)
+------------------
+
+* Nonblocking receive_many now works, but is inefficient
+* Python 3 fixes
+
+0.8.1 (2016-02-22)
+------------------
+
+* Fixed packaging issues
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..b7fc1d0
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,17 @@
+FROM ubuntu:16.04
+
+RUN DEBIAN_FRONTEND=noninteractive apt-get update && \
+    apt-get -yqq install \
+    git \
+    build-essential python-pip software-properties-common \
+    python-dev python3-dev \
+    libffi-dev libxml2-dev libxslt-dev libssl-dev
+
+RUN pip install -U pip && pip install tox
+
+ENV DOCKER_TEST_ENV true
+
+ADD . /src
+WORKDIR /src
+
+CMD ["tox"]
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..5f4f225
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) Django Software Foundation and individual contributors.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+    1. Redistributions of source code must retain the above copyright notice,
+       this list of conditions and the following disclaimer.
+
+    2. Redistributions in binary form must reproduce the above copyright
+       notice, this list of conditions and the following disclaimer in the
+       documentation and/or other materials provided with the distribution.
+
+    3. Neither the name of Django nor the names of its contributors may be used
+       to endorse or promote products derived from this software without
+       specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..1a1f55e
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,16 @@
+.PHONY: release
+
+all:
+
+release:
+ifndef version
+	$(error Please supply a version)
+endif
+	@echo Releasing version $(version)
+ifeq (,$(findstring $(version),$(shell git log --oneline -1)))
+	$(error Last commit does not match version)
+endif
+	git tag $(version)
+	git push
+	git push --tags
+	python setup.py sdist bdist_wheel upload
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..bc0c8fa
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,229 @@
+asgi_redis
+==========
+
+.. image:: https://api.travis-ci.org/django/asgi_redis.svg
+    :target: https://travis-ci.org/django/asgi_redis
+
+.. image:: https://img.shields.io/pypi/v/asgi_redis.svg
+    :target: https://pypi.python.org/pypi/asgi_redis
+
+An ASGI channel layer that uses Redis as its backing store, and supports
+both a single-server and sharded configurations, as well as group support.
+
+
+Usage
+-----
+
+You'll need to instantiate the channel layer with at least ``hosts``,
+and other options if you need them.
+
+Example:
+
+.. code-block:: python
+
+    channel_layer = RedisChannelLayer(
+        host="redis",
+        db=4,
+        channel_capacity={
+            "http.request": 200,
+            "http.response*": 10,
+        }
+    )
+
+``hosts``
+~~~~~~~~~
+
+The server(s) to connect to, as either URIs or ``(host, port)`` tuples. Defaults to ``['localhost', 6379]``. Pass multiple hosts to enable sharding, but note that changing the host list will lose some sharded data.
+
+``prefix``
+~~~~~~~~~~
+
+Prefix to add to all Redis keys. Defaults to ``asgi:``. If you're running
+two or more entirely separate channel layers through the same Redis instance,
+make sure they have different prefixes. All servers talking to the same layer
+should have the same prefix, though.
+
+``expiry``
+~~~~~~~~~~
+
+Message expiry in seconds. Defaults to ``60``. You generally shouldn't need
+to change this, but you may want to turn it down if you have peaky traffic you
+wish to drop, or up if you have peaky traffic you want to backlog until you
+get to it.
+
+``group_expiry``
+~~~~~~~~~~~~~~~~
+
+Group expiry in seconds. Defaults to ``86400``. Interface servers will drop
+connections after this amount of time; it's recommended you reduce it for a
+healthier system that encourages disconnections.
+
+``capacity``
+~~~~~~~~~~~~
+
+Default channel capacity. Defaults to ``100``. Once a channel is at capacity,
+it will refuse more messages. How this affects different parts of the system
+varies; a HTTP server will refuse connections, for example, while Django
+sending a response will just wait until there's space.
+
+``channel_capacity``
+~~~~~~~~~~~~~~~~~~~~
+
+Per-channel capacity configuration. This lets you tweak the channel capacity
+based on the channel name, and supports both globbing and regular expressions.
+
+It should be a dict mapping channel name pattern to desired capacity; if the
+dict key is a string, it's intepreted as a glob, while if it's a compiled
+``re`` object, it's treated as a regular expression.
+
+This example sets ``http.request`` to 200, all ``http.response!`` channels
+to 10, and all ``websocket.send!`` channels to 20:
+
+.. code-block:: python
+
+    channel_capacity={
+        "http.request": 200,
+        "http.response!*": 10,
+        re.compile(r"^websocket.send\!.+"): 20,
+    }
+
+If you want to enforce a matching order, use an ``OrderedDict`` as the
+argument; channels will then be matched in the order the dict provides them.
+
+``symmetric_encryption_keys``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Pass this to enable the optional symmetric encryption mode of the backend. To
+use it, make sure you have the ``cryptography`` package installed, or specify
+the ``cryptography`` extra when you install ``asgi_redis``::
+
+    pip install asgi_redis[cryptography]
+
+``symmetric_encryption_keys`` should be a list of strings, with each string
+being an encryption key. The first key is always used for encryption; all are
+considered for decryption, so you can rotate keys without downtime - just add
+a new key at the start and move the old one down, then remove the old one
+after the message expiry time has passed.
+
+Data is encrypted both on the wire and at rest in Redis, though we advise
+you also route your Redis connections over TLS for higher security; the Redis
+protocol is still unencrypted, and the channel and group key names could
+potentially contain metadata patterns of use to attackers.
+
+Keys **should have at least 32 bytes of entropy** - they are passed through
+the SHA256 hash function before being used as an encryption key. Any string
+will work, but the shorter the string, the easier the encryption is to break.
+
+If you're using Django, you may also wish to set this to your site's
+``SECRET_KEY`` setting via the ``CHANNEL_LAYERS`` setting:
+
+.. code-block:: python
+
+    CHANNEL_LAYERS = {
+        "default": {
+            "BACKEND": "asgi_redis.RedisChannelLayer",
+            "ROUTING": "my_project.routing.channel_routing",
+            "CONFIG": {
+                "hosts": ["redis://:password@127.0.0.1:6379/0"],
+                "symmetric_encryption_keys": [SECRET_KEY],
+            },
+        },
+    }
+
+``connection_kwargs``
+---------------------
+
+Optional extra arguments to pass to the ``redis-py`` connection class. Options
+include ``socket_connect_timeout``, ``socket_timeout``, ``socket_keepalive``,
+and ``socket_keepalive_options``. See the
+`redis-py documentation <https://redis-py.readthedocs.io/en/latest/>`_ for more.
+
+
+Local-and-Remote Mode
+---------------------
+
+A "local and remote" mode is also supported, where the Redis channel layer
+works in conjunction with a machine-local channel layer (``asgi_ipc``) in order
+to route all normal channels over the local layer, while routing all
+single-reader and process-specific channels over the Redis layer.
+
+This allows traffic on things like ``http.request`` and ``websocket.receive``
+to stay in the local layer and not go through Redis, while still allowing Group
+send and sends to arbitrary channels terminated on other machines to work
+correctly. It will improve performance and decrease the load on your
+Redis cluster, but **it requires all normal channels are consumed on the
+same machine**.
+
+In practice, this means you MUST run workers that consume every channel your
+application has code to handle on the same machine as your HTTP or WebSocket
+terminator. If you fail to do this, requests to that machine will get routed
+into only the local queue and hang as nothing is reading them.
+
+To use it, just use the ``asgi_redis.RedisLocalChannelLayer`` class in your
+configuration instead of ``RedisChannelLayer`` and make sure you have the
+``asgi_ipc`` package installed; no other change is needed.
+
+
+Sentinel Mode
+-------------
+
+"Sentinel" mode is also supported, where the Redis channel layer will connect to
+a redis sentinel cluster to find the present Redis master before writing or reading
+data.
+
+Sentinel mode supports sharding, but does not support multiple Sentinel clusters. To
+run sharding of keys across multiple Redis clusters, use a single sentinel cluster,
+but have that sentinel cluster monitor multiple "services". Then in the configuration
+for the RedisSentinelChannelLayer, add a list of the service names. You can also
+leave the list of services blank, and the layer will pull all services that are
+configured on the sentinel master.
+
+Redis Sentinel mode does not support URL-style connection strings, just tuple-based ones.
+
+Configuration for Sentinel mode looks like this:
+
+.. code-block:: python
+
+    CHANNEL_LAYERS = {
+        "default": {
+            "BACKEND": "asgi_redis.RedisSentinelChannelLayer",
+            "CONFIG": {
+                "hosts": [("10.0.0.1", 26739), ("10.0.0.2", 26379), ("10.0.0.3", 26379)],
+                "services": ["shard1", "shard2", "shard3"],
+            },
+        },
+    }
+
+The "shard1", "shard2", etc entries correspond to the name of the service configured in  your
+redis `sentinel.conf` file. For example, if your `sentinel.conf` says ``sentinel monitor local 127.0.0.1 6379 1``
+then you would want to include "local" as a service in the `RedisSentinelChannelLayer` configuration.
+
+You may also pass a ``sentinel_refresh_interval`` value in the ``CONFIG``, which
+will enable caching of the Sentinel results for the specified number of seconds.
+This is recommended to reduce the need to query Sentinel every time; even a
+low value of 5 seconds will significantly reduce overhead.
+
+Dependencies
+------------
+
+Redis >= 2.6 is required for `asgi_redis`. It supports Python 2.7, 3.4,
+3.5 and 3.6.
+
+Contributing
+------------
+
+Please refer to the
+`main Channels contributing docs <https://github.com/django/channels/blob/master/CONTRIBUTING.rst>`_.
+That also contains advice on how to set up the development environment and run the tests.
+
+Maintenance and Security
+------------------------
+
+To report security issues, please contact security at djangoproject.com. For GPG
+signatures and more security process information, see
+https://docs.djangoproject.com/en/dev/internals/security/.
+
+To report bugs or request new features, please open a new GitHub issue.
+
+This repository is part of the Channels project. For the shepherd and maintenance team, please see the
+`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.
diff --git a/asgi_redis/__init__.py b/asgi_redis/__init__.py
new file mode 100644
index 0000000..016108d
--- /dev/null
+++ b/asgi_redis/__init__.py
@@ -0,0 +1,5 @@
+from .core import RedisChannelLayer
+from .local import RedisLocalChannelLayer
+from .sentinel import RedisSentinelChannelLayer
+
+__version__ = '1.4.3'
diff --git a/asgi_redis/core.py b/asgi_redis/core.py
new file mode 100644
index 0000000..479411e
--- /dev/null
+++ b/asgi_redis/core.py
@@ -0,0 +1,640 @@
+from __future__ import unicode_literals
+
+import base64
+import binascii
+import hashlib
+import itertools
+import msgpack
+import random
+import redis
+import six
+import string
+import time
+import uuid
+
+try:
+    import txredisapi
+except ImportError:
+    pass
+
+from asgiref.base_layer import BaseChannelLayer
+from .twisted_utils import defer
+
+
+class UnsupportedRedis(Exception):
+    pass
+
+
+class BaseRedisChannelLayer(BaseChannelLayer):
+
+    blpop_timeout = 5
+    global_statistics_expiry = 86400
+    channel_statistics_expiry = 3600
+    global_stats_key = '#global#'  # needs to be invalid as a channel name
+
+    def __init__(
+        self,
+        expiry=60,
+        hosts=None,
+        prefix="asgi:",
+        group_expiry=86400,
+        capacity=100,
+        channel_capacity=None,
+        symmetric_encryption_keys=None,
+        stats_prefix="asgi-meta:",
+        connection_kwargs=None,
+    ):
+        super(BaseRedisChannelLayer, self).__init__(
+            expiry=expiry,
+            group_expiry=group_expiry,
+            capacity=capacity,
+            channel_capacity=channel_capacity,
+        )
+
+        assert isinstance(prefix, six.text_type), "Prefix must be unicode"
+        socket_timeout = connection_kwargs and connection_kwargs.get("socket_timeout", None)
+        if socket_timeout and socket_timeout < self.blpop_timeout:
+            raise ValueError("The socket timeout must be at least %s seconds" % self.blpop_timeout)
+
+        self.prefix = prefix
+        self.stats_prefix = stats_prefix
+        # Decide on a unique client prefix to use in ! sections
+        # TODO: ensure uniqueness better, e.g. Redis keys with SETNX
+        self.client_prefix = "".join(random.choice(string.ascii_letters) for i in range(8))
+        self._setup_encryption(symmetric_encryption_keys)
+
+    ### Setup ###
+
+    def _setup_encryption(self, symmetric_encryption_keys):
+        # See if we can do encryption if they asked
+        if symmetric_encryption_keys:
+            if isinstance(symmetric_encryption_keys, six.string_types):
+                raise ValueError("symmetric_encryption_keys must be a list of possible keys")
+            try:
+                from cryptography.fernet import MultiFernet
+            except ImportError:
+                raise ValueError("Cannot run with encryption without 'cryptography' installed.")
+            sub_fernets = [self.make_fernet(key) for key in symmetric_encryption_keys]
+            self.crypter = MultiFernet(sub_fernets)
+        else:
+            self.crypter = None
+
+    def _register_scripts(self):
+        connection = self.connection(None)
+        self.chansend = connection.register_script(self.lua_chansend)
+        self.lpopmany = connection.register_script(self.lua_lpopmany)
+        self.delprefix = connection.register_script(self.lua_delprefix)
+        self.incrstatcounters = connection.register_script(self.lua_incrstatcounters)
+
+    ### ASGI API ###
+
+    extensions = ["groups", "flush", "statistics"]
+    try:
+        import txredisapi
+    except ImportError:
+        pass
+    else:
+        extensions.append("twisted")
+
+    def send(self, channel, message):
+        # Typecheck
+        assert isinstance(message, dict), "message is not a dict"
+        assert self.valid_channel_name(channel), "Channel name not valid"
+        # Make sure the message does not contain reserved keys
+        assert "__asgi_channel__" not in message
+        # If it's a process-local channel, strip off local part and stick full name in message
+        if "!" in channel:
+            message = dict(message.items())
+            message['__asgi_channel__'] = channel
+            channel = self.non_local_name(channel)
+        # Write out message into expiring key (avoids big items in list)
+        # TODO: Use extended set, drop support for older redis?
+        message_key = self.prefix + uuid.uuid4().hex
+        channel_key = self.prefix + channel
+        # Pick a connection to the right server - consistent for response
+        # channels, random for normal channels
+        if "!" in channel or "?" in channel:
+            index = self.consistent_hash(channel)
+            connection = self.connection(index)
+        else:
+            index = next(self._send_index_generator)
+            connection = self.connection(index)
+        # Use the Lua function to do the set-and-push
+        try:
+            self.chansend(
+                keys=[message_key, channel_key],
+                args=[self.serialize(message), self.expiry, self.get_capacity(channel)],
+                client=connection,
+            )
+            self._incr_statistics_counter(
+                stat_name=self.STAT_MESSAGES_COUNT,
+                channel=channel,
+                connection=connection,
+            )
+        except redis.exceptions.ResponseError as e:
+            # The Lua script handles capacity checking and sends the "full" error back
+            if e.args[0] == "full":
+                self._incr_statistics_counter(
+                    stat_name=self.STAT_CHANNEL_FULL,
+                    channel=channel,
+                    connection=connection,
+                )
+                raise self.ChannelFull
+            elif "unknown command" in e.args[0]:
+                raise UnsupportedRedis(
+                    "Redis returned an error (%s). Please ensure you're running a "
+                    " version of redis that is supported by asgi_redis." % e.args[0])
+            else:
+                # Let any other exception bubble up
+                raise
+
+    def receive(self, channels, block=False):
+        # List name get
+        indexes = self._receive_list_names(channels)
+        # Short circuit if no channels
+        if indexes is None:
+            return None, None
+        # Get a message from one of our channels
+        while True:
+            got_expired_content = False
+            # Try each index:channels pair at least once or until a result is returned
+            for index, list_names in indexes.items():
+                # Shuffle list_names to avoid the first ones starving others of workers
+                random.shuffle(list_names)
+                # Open a connection
+                connection = self.connection(index)
+                # Pop off any waiting message
+                if block:
+                    result = connection.blpop(list_names, timeout=self.blpop_timeout)
+                else:
+                    result = self.lpopmany(keys=list_names, client=connection)
+                if result:
+                    content = connection.get(result[1])
+                    connection.delete(result[1])
+                    if content is None:
+                        # If the content key expired, keep going.
+                        got_expired_content = True
+                        continue
+                    # Return the channel it's from and the message
+                    channel = result[0][len(self.prefix):].decode("utf8")
+                    message = self.deserialize(content)
+                    # If there is a full channel name stored in the message, unpack it.
+                    if "__asgi_channel__" in message:
+                        channel = message['__asgi_channel__']
+                        del message['__asgi_channel__']
+                    return channel, message
+            # If we only got expired content, try again
+            if got_expired_content:
+                continue
+            else:
+                return None, None
+
+    def _receive_list_names(self, channels):
+        """
+        Inner logic of receive; takes channels, groups by shard, and
+        returns {connection_index: list_names ...} if a query is needed or
+        None for a vacuously empty response.
+        """
+        # Short circuit if no channels
+        if not channels:
+            return None
+        # Check channel names are valid
+        channels = list(channels)
+        assert all(
+            self.valid_channel_name(channel, receive=True) for channel in channels
+        ), "One or more channel names invalid"
+        # Work out what servers to listen on for the given channels
+        indexes = {}
+        index = next(self._receive_index_generator)
+        for channel in channels:
+            if "!" in channel or "?" in channel:
+                indexes.setdefault(self.consistent_hash(channel), []).append(
+                    self.prefix + channel,
+                )
+            else:
+                indexes.setdefault(index, []).append(
+                    self.prefix + channel,
+                )
+        return indexes
+
+    def new_channel(self, pattern):
+        assert isinstance(pattern, six.text_type)
+        # Keep making channel names till one isn't present.
+        while True:
+            random_string = "".join(random.choice(string.ascii_letters) for i in range(12))
+            assert pattern.endswith("?")
+            new_name = pattern + random_string
+            # Get right connection
+            index = self.consistent_hash(new_name)
+            connection = self.connection(index)
+            # Check to see if it's in the connected Redis.
+            # This fails to stop collisions for sharding where the channel is
+            # non-single-listener, but that seems very unlikely.
+            key = self.prefix + new_name
+            if not connection.exists(key):
+                return new_name
+
+    ### ASGI Group extension ###
+
+    def group_add(self, group, channel):
+        """
+        Adds the channel to the named group for at least 'expiry'
+        seconds (expiry defaults to message expiry if not provided).
+        """
+        assert self.valid_group_name(group), "Group name not valid"
+        assert self.valid_channel_name(channel), "Channel name not valid"
+        group_key = self._group_key(group)
+        connection = self.connection(self.consistent_hash(group))
+        # Add to group sorted set with creation time as timestamp
+        connection.zadd(
+            group_key,
+            **{channel: time.time()}
+        )
+        # Set both expiration to be group_expiry, since everything in
+        # it at this point is guaranteed to expire before that
+        connection.expire(group_key, self.group_expiry)
+
+    def group_discard(self, group, channel):
+        """
+        Removes the channel from the named group if it is in the group;
+        does nothing otherwise (does not error)
+        """
+        assert self.valid_group_name(group), "Group name not valid"
+        assert self.valid_channel_name(channel), "Channel name not valid"
+        key = self._group_key(group)
+        self.connection(self.consistent_hash(group)).zrem(
+            key,
+            channel,
+        )
+
+    def group_channels(self, group):
+        """
+        Returns all channels in the group as an iterable.
+        """
+        key = self._group_key(group)
+        connection = self.connection(self.consistent_hash(group))
+        # Discard old channels based on group_expiry
+        connection.zremrangebyscore(key, 0, int(time.time()) - self.group_expiry)
+        # Return current lot
+        return [x.decode("utf8") for x in connection.zrange(
+            key,
+            0,
+            -1,
+        )]
+
+    def send_group(self, group, message):
+        """
+        Sends a message to the entire group.
+        """
+        assert self.valid_group_name(group), "Group name not valid"
+        # TODO: More efficient implementation (lua script per shard?)
+        for channel in self.group_channels(group):
+            try:
+                self.send(channel, message)
+            except self.ChannelFull:
+                pass
+
+    def _group_key(self, group):
+        return ("%s:group:%s" % (self.prefix, group)).encode("utf8")
+
+    ### Twisted extension ###
+
+    @defer.inlineCallbacks
+    def receive_twisted(self, channels):
+        """
+        Twisted-native implementation of receive.
+        """
+        # List name get
+        indexes = self._receive_list_names(channels)
+        # Short circuit if no channels
+        if indexes is None:
+            defer.returnValue((None, None))
+        # Get a message from one of our channels
+        while True:
+            got_expired_content = False
+            # Try each index:channels pair at least once or until a result is returned
+            for index, list_names in indexes.items():
+                # Shuffle list_names to avoid the first ones starving others of workers
+                random.shuffle(list_names)
+                # Get a sync connection for conn details
+                sync_connection = self.connection(index)
+                twisted_connection = yield txredisapi.ConnectionPool(
+                    host=sync_connection.connection_pool.connection_kwargs['host'],
+                    port=sync_connection.connection_pool.connection_kwargs['port'],
+                    dbid=sync_connection.connection_pool.connection_kwargs['db'],
+                    password=sync_connection.connection_pool.connection_kwargs['password'],
+                )
+                try:
+                    # Pop off any waiting message
+                    result = yield twisted_connection.blpop(list_names, timeout=self.blpop_timeout)
+                    if result:
+                        content = yield twisted_connection.get(result[1])
+                        # If the content key expired, keep going.
+                        if content is None:
+                            got_expired_content = True
+                            continue
+                        # Return the channel it's from and the message
+                        channel = result[0][len(self.prefix):]
+                        message = self.deserialize(content)
+                        # If there is a full channel name stored in the message, unpack it.
+                        if "__asgi_channel__" in message:
+                            channel = message['__asgi_channel__']
+                            del message['__asgi_channel__']
+                        defer.returnValue((channel, message))
+                finally:
+                    yield twisted_connection.disconnect()
+            # If we only got expired content, try again
+            if got_expired_content:
+                continue
+            else:
+                defer.returnValue((None, None))
+
+    ### statistics extension ###
+
+    STAT_MESSAGES_COUNT = 'messages_count'
+    STAT_MESSAGES_PENDING = 'messages_pending'
+    STAT_MESSAGES_MAX_AGE = 'messages_max_age'
+    STAT_CHANNEL_FULL = 'channel_full_count'
+
+    def _count_global_stats(self, connection_list):
+        statistics = {
+            self.STAT_MESSAGES_COUNT: 0,
+            self.STAT_CHANNEL_FULL: 0,
+        }
+        prefix = self.stats_prefix + self.global_stats_key
+        for connection in connection_list:
+            messages_count, channel_full_count = connection.mget(
+                ':'.join((prefix, self.STAT_MESSAGES_COUNT)),
+                ':'.join((prefix, self.STAT_CHANNEL_FULL)),
+            )
+            statistics[self.STAT_MESSAGES_COUNT] += int(messages_count or 0)
+            statistics[self.STAT_CHANNEL_FULL] += int(channel_full_count or 0)
+
+        return statistics
+
+    def _count_channel_stats(self, channel, connections):
+        statistics = {
+            self.STAT_MESSAGES_COUNT: 0,
+            self.STAT_MESSAGES_PENDING: 0,
+            self.STAT_MESSAGES_MAX_AGE: 0,
+            self.STAT_CHANNEL_FULL: 0,
+        }
+        prefix = self.stats_prefix + channel
+
+        channel_key = self.prefix + channel
+        for connection in connections:
+            messages_count, channel_full_count = connection.mget(
+                ':'.join((prefix, self.STAT_MESSAGES_COUNT)),
+                ':'.join((prefix, self.STAT_CHANNEL_FULL)),
+            )
+            statistics[self.STAT_MESSAGES_COUNT] += int(messages_count or 0)
+            statistics[self.STAT_CHANNEL_FULL] += int(channel_full_count or 0)
+            statistics[self.STAT_MESSAGES_PENDING] += connection.llen(channel_key)
+            oldest_message = connection.lindex(channel_key, 0)
+            if oldest_message:
+                messages_age = self.expiry - connection.ttl(oldest_message)
+                statistics[self.STAT_MESSAGES_MAX_AGE] = max(statistics[self.STAT_MESSAGES_MAX_AGE], messages_age)
+        return statistics
+
+    def _incr_statistics_counter(self, stat_name, channel, connection):
+        """ helper function to intrement counter stats in one go """
+        self.incrstatcounters(
+            keys=[
+                "{prefix}{channel}:{stat_name}".format(
+                    prefix=self.stats_prefix,
+                    channel=channel,
+                    stat_name=stat_name,
+                ),
+                "{prefix}{global_key}:{stat_name}".format(
+                    prefix=self.stats_prefix,
+                    global_key=self.global_stats_key,
+                    stat_name=stat_name,
+                )
+            ],
+            args=[self.channel_statistics_expiry, self.global_statistics_expiry],
+            client=connection,
+        )
+
+    ### Serialization ###
+
+    def serialize(self, message):
+        """
+        Serializes message to a byte string.
+        """
+        value = msgpack.packb(message, use_bin_type=True)
+        if self.crypter:
+            value = self.crypter.encrypt(value)
+        return value
+
+    def deserialize(self, message):
+        """
+        Deserializes from a byte string.
+        """
+        if self.crypter:
+            message = self.crypter.decrypt(message, self.expiry + 10)
+        return msgpack.unpackb(message, encoding="utf8")
+
+    ### Redis Lua scripts ###
+
+    # Single-command channel send. Returns error if over capacity.
+    # Keys: message, channel_list
+    # Args: content, expiry, capacity
+    lua_chansend = """
+        if redis.call('llen', KEYS[2]) >= tonumber(ARGV[3]) then
+            return redis.error_reply("full")
+        end
+        redis.call('set', KEYS[1], ARGV[1])
+        redis.call('expire', KEYS[1], ARGV[2])
+        redis.call('rpush', KEYS[2], KEYS[1])
+        redis.call('expire', KEYS[2], ARGV[2] + 1)
+    """
+
+    # Single-command to increment counter stats.
+    # Keys: channel_stat, global_stat
+    # Args: channel_stat_expiry, global_stat_expiry
... 2066 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asgi-redis.git



More information about the Python-modules-commits mailing list