[Python-modules-commits] [python-asgi-redis] 01/02: importing python-asgi-redis_1.4.3.orig.tar.gz
Michael Fladischer
fladi at moszumanska.debian.org
Fri Nov 24 08:47:09 UTC 2017
This is an automated email from the git hooks/post-receive script.
fladi pushed a commit to branch debian/master
in repository python-asgi-redis.
commit 85519c6bdd8332caa7cc40630de5ea822ff0a95f
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date: Mon Nov 20 16:57:34 2017 +0100
importing python-asgi-redis_1.4.3.orig.tar.gz
---
.gitignore | 6 +
.travis.yml | 22 +
CHANGELOG.txt | 134 ++++++
Dockerfile | 17 +
LICENSE | 27 ++
Makefile | 16 +
README.rst | 229 ++++++++++
asgi_redis/__init__.py | 5 +
asgi_redis/core.py | 640 ++++++++++++++++++++++++++++
asgi_redis/local.py | 72 ++++
asgi_redis/sentinel.py | 178 ++++++++
asgi_redis/twisted_utils.py | 19 +
docker-compose.yml | 31 ++
pytest.ini | 4 +
setup.cfg | 2 +
setup.py | 52 +++
testing/redis/Dockerfile | 5 +
testing/redis/redis.conf | 943 +++++++++++++++++++++++++++++++++++++++++
testing/sentinel/Dockerfile | 6 +
testing/sentinel/sentinel.conf | 13 +
tests/__init__.py | 0
tests/constants.py | 10 +
tests/test_core.py | 125 ++++++
tests/test_integration.py | 52 +++
tests/test_local.py | 22 +
tests/test_sentinel.py | 176 ++++++++
tests/test_twisted.py | 50 +++
tox.ini | 8 +
28 files changed, 2864 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..68fc79c
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+*.egg-info
+dist/
+build/
+*.pyc
+/.tox
+.DS_Store
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..45ebad4
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,22 @@
+sudo: false
+dist: trusty
+language: python
+python:
+ - "2.7"
+ - "3.4"
+ - "3.5"
+ - "3.6"
+services:
+ - redis-server
+cache:
+ directories:
+ - $HOME/.cache/pip/wheels
+install:
+ - pip install -U pip wheel setuptools
+ - pip install 'git+https://github.com/django/channels.git#egg=channels-benchmark&subdirectory=testproject'
+ - pip install -e .[tests]
+ - pip freeze
+script: py.test
+# TODO: setup test coverage tracking service.
+# after_success:
+# - codecov
diff --git a/CHANGELOG.txt b/CHANGELOG.txt
new file mode 100644
index 0000000..40da981
--- /dev/null
+++ b/CHANGELOG.txt
@@ -0,0 +1,134 @@
+1.4.3 (2017-09-14)
+------------------
+
+* The internal class layout of the channel layers has been adjusted, but the
+ public API remains the same.
+
+* Fixed compatability with newer releases of redis-py.
+
+
+1.4.2 (2017-06-20)
+------------------
+
+* receive() no longer blocks indefinitely, just for a while.
+
+* Built-in lua scripts have their SHA pre-set to avoid a guaranteed cache miss
+ on their first usage.
+
+
+1.4.1 (2017-06-15)
+------------------
+
+* A keyspace leak has been fixed where message body keys were not deleted after
+ receive, and instead left to expire.
+
+
+1.4.0 (2017-05-18)
+------------------
+
+* Sharded mode support is now more robust with send/receive deterministically
+ moving around the shard ring rather than picking random connections. This
+ means there is no longer a slight chance of messages being missed when there
+ are not significantly more readers on a channel than shards. Tests have
+ also been updated so they run fully on sharded mode thanks to this.
+
+* Sentinel support has been considerably improved, with connection caching
+ (via sentinal_refresh_interval), and automatic service discovery.
+
+* The Twisted backend now picks up the Redis password if one is configured.
+
+
+1.3.0 (2017-04-07)
+------------------
+
+* Change format of connection arguments to be a single dict called
+ ``connection_kwargs`` rather than individual options, as they change by
+ connection type. You will need to change your settings if you have any of
+ socket_connect_timeout, socket_timeout, socket_keepalive or
+ socket_keepalive_options set to move them into a ``connection_kwargs`` dict.
+
+1.2.1 (2017-04-02)
+------------------
+
+* Error with sending to multi-process channels with the same message fixed
+
+1.2.0 (2017-04-01)
+------------------
+
+* Process-specific channel behaviour changed to match new spec
+* Redis Sentinel channel layer added
+
+1.1.0 (2017-03-18)
+------------------
+
+* Support for the ASGI statistics extension
+* Distribution of items over multiple servers using consistent hashing is improved
+* Handles timeout exceptions in newer redis-py library versions correctly
+* Support for configuring the socket_connect_timeout, socket_timeout, socket_keepalive and socket_keepalive_options
+ options that are passed to redis-py.
+
+1.0.0 (2016-11-05)
+------------------
+
+* Renamed "receive_many" to "receive"
+* Improved (more explicit) error handling for Redis errors/old versions
+* Bad hosts (string not lost) configuration now errors explicitly
+
+0.14.1 (2016-08-24)
+-------------------
+
+* Removed unused reverse channels-to-groups mapping keys as they were not
+ cleaned up proactively and quickly filled up databases.
+
+0.14.0 (2016-07-16)
+-------------------
+
+* Implemented group_channels method.
+
+0.13.0 (2016-06-09)
+-------------------
+
+* Added local-and-remote backend option (uses asgi_ipc)
+
+0.12.0 (2016-05-25)
+-------------------
+
+* Added symmetric encryption for messages and at-rest data with key rotation.
+
+0.11.0 (2016-05-07)
+-------------------
+
+* Implement backpressure with per-channel and default capacities.
+
+0.10.0 (2016-03-27)
+-------------------
+
+* Group expiry code re-added and fixed.
+
+0.9.1 (2016-03-23)
+------------------
+
+* Remove old group expiry code that was killing groups after 60 seconds.
+
+0.9.0 (2016-03-21)
+------------------
+
+* Connections now pooled per backend shard
+* Random portion of channel names now 12 characters
+* Implements new ASGI single-response-channel pattern spec
+
+0.8.3 (2016-02-28)
+------------------
+
+* Nonblocking receive_many now uses Lua script rather than for loop.
+
+0.8.2 (2016-02-22)
+------------------
+
+* Nonblocking receive_many now works, but is inefficient
+* Python 3 fixes
+
+0.8.1 (2016-02-22)
+------------------
+
+* Fixed packaging issues
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..b7fc1d0
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,17 @@
+FROM ubuntu:16.04
+
+RUN DEBIAN_FRONTEND=noninteractive apt-get update && \
+ apt-get -yqq install \
+ git \
+ build-essential python-pip software-properties-common \
+ python-dev python3-dev \
+ libffi-dev libxml2-dev libxslt-dev libssl-dev
+
+RUN pip install -U pip && pip install tox
+
+ENV DOCKER_TEST_ENV true
+
+ADD . /src
+WORKDIR /src
+
+CMD ["tox"]
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..5f4f225
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) Django Software Foundation and individual contributors.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ 1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ 2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+ 3. Neither the name of Django nor the names of its contributors may be used
+ to endorse or promote products derived from this software without
+ specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..1a1f55e
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,16 @@
+.PHONY: release
+
+all:
+
+release:
+ifndef version
+ $(error Please supply a version)
+endif
+ @echo Releasing version $(version)
+ifeq (,$(findstring $(version),$(shell git log --oneline -1)))
+ $(error Last commit does not match version)
+endif
+ git tag $(version)
+ git push
+ git push --tags
+ python setup.py sdist bdist_wheel upload
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..bc0c8fa
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,229 @@
+asgi_redis
+==========
+
+.. image:: https://api.travis-ci.org/django/asgi_redis.svg
+ :target: https://travis-ci.org/django/asgi_redis
+
+.. image:: https://img.shields.io/pypi/v/asgi_redis.svg
+ :target: https://pypi.python.org/pypi/asgi_redis
+
+An ASGI channel layer that uses Redis as its backing store, and supports
+both a single-server and sharded configurations, as well as group support.
+
+
+Usage
+-----
+
+You'll need to instantiate the channel layer with at least ``hosts``,
+and other options if you need them.
+
+Example:
+
+.. code-block:: python
+
+ channel_layer = RedisChannelLayer(
+ host="redis",
+ db=4,
+ channel_capacity={
+ "http.request": 200,
+ "http.response*": 10,
+ }
+ )
+
+``hosts``
+~~~~~~~~~
+
+The server(s) to connect to, as either URIs or ``(host, port)`` tuples. Defaults to ``['localhost', 6379]``. Pass multiple hosts to enable sharding, but note that changing the host list will lose some sharded data.
+
+``prefix``
+~~~~~~~~~~
+
+Prefix to add to all Redis keys. Defaults to ``asgi:``. If you're running
+two or more entirely separate channel layers through the same Redis instance,
+make sure they have different prefixes. All servers talking to the same layer
+should have the same prefix, though.
+
+``expiry``
+~~~~~~~~~~
+
+Message expiry in seconds. Defaults to ``60``. You generally shouldn't need
+to change this, but you may want to turn it down if you have peaky traffic you
+wish to drop, or up if you have peaky traffic you want to backlog until you
+get to it.
+
+``group_expiry``
+~~~~~~~~~~~~~~~~
+
+Group expiry in seconds. Defaults to ``86400``. Interface servers will drop
+connections after this amount of time; it's recommended you reduce it for a
+healthier system that encourages disconnections.
+
+``capacity``
+~~~~~~~~~~~~
+
+Default channel capacity. Defaults to ``100``. Once a channel is at capacity,
+it will refuse more messages. How this affects different parts of the system
+varies; a HTTP server will refuse connections, for example, while Django
+sending a response will just wait until there's space.
+
+``channel_capacity``
+~~~~~~~~~~~~~~~~~~~~
+
+Per-channel capacity configuration. This lets you tweak the channel capacity
+based on the channel name, and supports both globbing and regular expressions.
+
+It should be a dict mapping channel name pattern to desired capacity; if the
+dict key is a string, it's intepreted as a glob, while if it's a compiled
+``re`` object, it's treated as a regular expression.
+
+This example sets ``http.request`` to 200, all ``http.response!`` channels
+to 10, and all ``websocket.send!`` channels to 20:
+
+.. code-block:: python
+
+ channel_capacity={
+ "http.request": 200,
+ "http.response!*": 10,
+ re.compile(r"^websocket.send\!.+"): 20,
+ }
+
+If you want to enforce a matching order, use an ``OrderedDict`` as the
+argument; channels will then be matched in the order the dict provides them.
+
+``symmetric_encryption_keys``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Pass this to enable the optional symmetric encryption mode of the backend. To
+use it, make sure you have the ``cryptography`` package installed, or specify
+the ``cryptography`` extra when you install ``asgi_redis``::
+
+ pip install asgi_redis[cryptography]
+
+``symmetric_encryption_keys`` should be a list of strings, with each string
+being an encryption key. The first key is always used for encryption; all are
+considered for decryption, so you can rotate keys without downtime - just add
+a new key at the start and move the old one down, then remove the old one
+after the message expiry time has passed.
+
+Data is encrypted both on the wire and at rest in Redis, though we advise
+you also route your Redis connections over TLS for higher security; the Redis
+protocol is still unencrypted, and the channel and group key names could
+potentially contain metadata patterns of use to attackers.
+
+Keys **should have at least 32 bytes of entropy** - they are passed through
+the SHA256 hash function before being used as an encryption key. Any string
+will work, but the shorter the string, the easier the encryption is to break.
+
+If you're using Django, you may also wish to set this to your site's
+``SECRET_KEY`` setting via the ``CHANNEL_LAYERS`` setting:
+
+.. code-block:: python
+
+ CHANNEL_LAYERS = {
+ "default": {
+ "BACKEND": "asgi_redis.RedisChannelLayer",
+ "ROUTING": "my_project.routing.channel_routing",
+ "CONFIG": {
+ "hosts": ["redis://:password@127.0.0.1:6379/0"],
+ "symmetric_encryption_keys": [SECRET_KEY],
+ },
+ },
+ }
+
+``connection_kwargs``
+---------------------
+
+Optional extra arguments to pass to the ``redis-py`` connection class. Options
+include ``socket_connect_timeout``, ``socket_timeout``, ``socket_keepalive``,
+and ``socket_keepalive_options``. See the
+`redis-py documentation <https://redis-py.readthedocs.io/en/latest/>`_ for more.
+
+
+Local-and-Remote Mode
+---------------------
+
+A "local and remote" mode is also supported, where the Redis channel layer
+works in conjunction with a machine-local channel layer (``asgi_ipc``) in order
+to route all normal channels over the local layer, while routing all
+single-reader and process-specific channels over the Redis layer.
+
+This allows traffic on things like ``http.request`` and ``websocket.receive``
+to stay in the local layer and not go through Redis, while still allowing Group
+send and sends to arbitrary channels terminated on other machines to work
+correctly. It will improve performance and decrease the load on your
+Redis cluster, but **it requires all normal channels are consumed on the
+same machine**.
+
+In practice, this means you MUST run workers that consume every channel your
+application has code to handle on the same machine as your HTTP or WebSocket
+terminator. If you fail to do this, requests to that machine will get routed
+into only the local queue and hang as nothing is reading them.
+
+To use it, just use the ``asgi_redis.RedisLocalChannelLayer`` class in your
+configuration instead of ``RedisChannelLayer`` and make sure you have the
+``asgi_ipc`` package installed; no other change is needed.
+
+
+Sentinel Mode
+-------------
+
+"Sentinel" mode is also supported, where the Redis channel layer will connect to
+a redis sentinel cluster to find the present Redis master before writing or reading
+data.
+
+Sentinel mode supports sharding, but does not support multiple Sentinel clusters. To
+run sharding of keys across multiple Redis clusters, use a single sentinel cluster,
+but have that sentinel cluster monitor multiple "services". Then in the configuration
+for the RedisSentinelChannelLayer, add a list of the service names. You can also
+leave the list of services blank, and the layer will pull all services that are
+configured on the sentinel master.
+
+Redis Sentinel mode does not support URL-style connection strings, just tuple-based ones.
+
+Configuration for Sentinel mode looks like this:
+
+.. code-block:: python
+
+ CHANNEL_LAYERS = {
+ "default": {
+ "BACKEND": "asgi_redis.RedisSentinelChannelLayer",
+ "CONFIG": {
+ "hosts": [("10.0.0.1", 26739), ("10.0.0.2", 26379), ("10.0.0.3", 26379)],
+ "services": ["shard1", "shard2", "shard3"],
+ },
+ },
+ }
+
+The "shard1", "shard2", etc entries correspond to the name of the service configured in your
+redis `sentinel.conf` file. For example, if your `sentinel.conf` says ``sentinel monitor local 127.0.0.1 6379 1``
+then you would want to include "local" as a service in the `RedisSentinelChannelLayer` configuration.
+
+You may also pass a ``sentinel_refresh_interval`` value in the ``CONFIG``, which
+will enable caching of the Sentinel results for the specified number of seconds.
+This is recommended to reduce the need to query Sentinel every time; even a
+low value of 5 seconds will significantly reduce overhead.
+
+Dependencies
+------------
+
+Redis >= 2.6 is required for `asgi_redis`. It supports Python 2.7, 3.4,
+3.5 and 3.6.
+
+Contributing
+------------
+
+Please refer to the
+`main Channels contributing docs <https://github.com/django/channels/blob/master/CONTRIBUTING.rst>`_.
+That also contains advice on how to set up the development environment and run the tests.
+
+Maintenance and Security
+------------------------
+
+To report security issues, please contact security at djangoproject.com. For GPG
+signatures and more security process information, see
+https://docs.djangoproject.com/en/dev/internals/security/.
+
+To report bugs or request new features, please open a new GitHub issue.
+
+This repository is part of the Channels project. For the shepherd and maintenance team, please see the
+`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.
diff --git a/asgi_redis/__init__.py b/asgi_redis/__init__.py
new file mode 100644
index 0000000..016108d
--- /dev/null
+++ b/asgi_redis/__init__.py
@@ -0,0 +1,5 @@
+from .core import RedisChannelLayer
+from .local import RedisLocalChannelLayer
+from .sentinel import RedisSentinelChannelLayer
+
+__version__ = '1.4.3'
diff --git a/asgi_redis/core.py b/asgi_redis/core.py
new file mode 100644
index 0000000..479411e
--- /dev/null
+++ b/asgi_redis/core.py
@@ -0,0 +1,640 @@
+from __future__ import unicode_literals
+
+import base64
+import binascii
+import hashlib
+import itertools
+import msgpack
+import random
+import redis
+import six
+import string
+import time
+import uuid
+
+try:
+ import txredisapi
+except ImportError:
+ pass
+
+from asgiref.base_layer import BaseChannelLayer
+from .twisted_utils import defer
+
+
+class UnsupportedRedis(Exception):
+ pass
+
+
+class BaseRedisChannelLayer(BaseChannelLayer):
+
+ blpop_timeout = 5
+ global_statistics_expiry = 86400
+ channel_statistics_expiry = 3600
+ global_stats_key = '#global#' # needs to be invalid as a channel name
+
+ def __init__(
+ self,
+ expiry=60,
+ hosts=None,
+ prefix="asgi:",
+ group_expiry=86400,
+ capacity=100,
+ channel_capacity=None,
+ symmetric_encryption_keys=None,
+ stats_prefix="asgi-meta:",
+ connection_kwargs=None,
+ ):
+ super(BaseRedisChannelLayer, self).__init__(
+ expiry=expiry,
+ group_expiry=group_expiry,
+ capacity=capacity,
+ channel_capacity=channel_capacity,
+ )
+
+ assert isinstance(prefix, six.text_type), "Prefix must be unicode"
+ socket_timeout = connection_kwargs and connection_kwargs.get("socket_timeout", None)
+ if socket_timeout and socket_timeout < self.blpop_timeout:
+ raise ValueError("The socket timeout must be at least %s seconds" % self.blpop_timeout)
+
+ self.prefix = prefix
+ self.stats_prefix = stats_prefix
+ # Decide on a unique client prefix to use in ! sections
+ # TODO: ensure uniqueness better, e.g. Redis keys with SETNX
+ self.client_prefix = "".join(random.choice(string.ascii_letters) for i in range(8))
+ self._setup_encryption(symmetric_encryption_keys)
+
+ ### Setup ###
+
+ def _setup_encryption(self, symmetric_encryption_keys):
+ # See if we can do encryption if they asked
+ if symmetric_encryption_keys:
+ if isinstance(symmetric_encryption_keys, six.string_types):
+ raise ValueError("symmetric_encryption_keys must be a list of possible keys")
+ try:
+ from cryptography.fernet import MultiFernet
+ except ImportError:
+ raise ValueError("Cannot run with encryption without 'cryptography' installed.")
+ sub_fernets = [self.make_fernet(key) for key in symmetric_encryption_keys]
+ self.crypter = MultiFernet(sub_fernets)
+ else:
+ self.crypter = None
+
+ def _register_scripts(self):
+ connection = self.connection(None)
+ self.chansend = connection.register_script(self.lua_chansend)
+ self.lpopmany = connection.register_script(self.lua_lpopmany)
+ self.delprefix = connection.register_script(self.lua_delprefix)
+ self.incrstatcounters = connection.register_script(self.lua_incrstatcounters)
+
+ ### ASGI API ###
+
+ extensions = ["groups", "flush", "statistics"]
+ try:
+ import txredisapi
+ except ImportError:
+ pass
+ else:
+ extensions.append("twisted")
+
+ def send(self, channel, message):
+ # Typecheck
+ assert isinstance(message, dict), "message is not a dict"
+ assert self.valid_channel_name(channel), "Channel name not valid"
+ # Make sure the message does not contain reserved keys
+ assert "__asgi_channel__" not in message
+ # If it's a process-local channel, strip off local part and stick full name in message
+ if "!" in channel:
+ message = dict(message.items())
+ message['__asgi_channel__'] = channel
+ channel = self.non_local_name(channel)
+ # Write out message into expiring key (avoids big items in list)
+ # TODO: Use extended set, drop support for older redis?
+ message_key = self.prefix + uuid.uuid4().hex
+ channel_key = self.prefix + channel
+ # Pick a connection to the right server - consistent for response
+ # channels, random for normal channels
+ if "!" in channel or "?" in channel:
+ index = self.consistent_hash(channel)
+ connection = self.connection(index)
+ else:
+ index = next(self._send_index_generator)
+ connection = self.connection(index)
+ # Use the Lua function to do the set-and-push
+ try:
+ self.chansend(
+ keys=[message_key, channel_key],
+ args=[self.serialize(message), self.expiry, self.get_capacity(channel)],
+ client=connection,
+ )
+ self._incr_statistics_counter(
+ stat_name=self.STAT_MESSAGES_COUNT,
+ channel=channel,
+ connection=connection,
+ )
+ except redis.exceptions.ResponseError as e:
+ # The Lua script handles capacity checking and sends the "full" error back
+ if e.args[0] == "full":
+ self._incr_statistics_counter(
+ stat_name=self.STAT_CHANNEL_FULL,
+ channel=channel,
+ connection=connection,
+ )
+ raise self.ChannelFull
+ elif "unknown command" in e.args[0]:
+ raise UnsupportedRedis(
+ "Redis returned an error (%s). Please ensure you're running a "
+ " version of redis that is supported by asgi_redis." % e.args[0])
+ else:
+ # Let any other exception bubble up
+ raise
+
+ def receive(self, channels, block=False):
+ # List name get
+ indexes = self._receive_list_names(channels)
+ # Short circuit if no channels
+ if indexes is None:
+ return None, None
+ # Get a message from one of our channels
+ while True:
+ got_expired_content = False
+ # Try each index:channels pair at least once or until a result is returned
+ for index, list_names in indexes.items():
+ # Shuffle list_names to avoid the first ones starving others of workers
+ random.shuffle(list_names)
+ # Open a connection
+ connection = self.connection(index)
+ # Pop off any waiting message
+ if block:
+ result = connection.blpop(list_names, timeout=self.blpop_timeout)
+ else:
+ result = self.lpopmany(keys=list_names, client=connection)
+ if result:
+ content = connection.get(result[1])
+ connection.delete(result[1])
+ if content is None:
+ # If the content key expired, keep going.
+ got_expired_content = True
+ continue
+ # Return the channel it's from and the message
+ channel = result[0][len(self.prefix):].decode("utf8")
+ message = self.deserialize(content)
+ # If there is a full channel name stored in the message, unpack it.
+ if "__asgi_channel__" in message:
+ channel = message['__asgi_channel__']
+ del message['__asgi_channel__']
+ return channel, message
+ # If we only got expired content, try again
+ if got_expired_content:
+ continue
+ else:
+ return None, None
+
+ def _receive_list_names(self, channels):
+ """
+ Inner logic of receive; takes channels, groups by shard, and
+ returns {connection_index: list_names ...} if a query is needed or
+ None for a vacuously empty response.
+ """
+ # Short circuit if no channels
+ if not channels:
+ return None
+ # Check channel names are valid
+ channels = list(channels)
+ assert all(
+ self.valid_channel_name(channel, receive=True) for channel in channels
+ ), "One or more channel names invalid"
+ # Work out what servers to listen on for the given channels
+ indexes = {}
+ index = next(self._receive_index_generator)
+ for channel in channels:
+ if "!" in channel or "?" in channel:
+ indexes.setdefault(self.consistent_hash(channel), []).append(
+ self.prefix + channel,
+ )
+ else:
+ indexes.setdefault(index, []).append(
+ self.prefix + channel,
+ )
+ return indexes
+
+ def new_channel(self, pattern):
+ assert isinstance(pattern, six.text_type)
+ # Keep making channel names till one isn't present.
+ while True:
+ random_string = "".join(random.choice(string.ascii_letters) for i in range(12))
+ assert pattern.endswith("?")
+ new_name = pattern + random_string
+ # Get right connection
+ index = self.consistent_hash(new_name)
+ connection = self.connection(index)
+ # Check to see if it's in the connected Redis.
+ # This fails to stop collisions for sharding where the channel is
+ # non-single-listener, but that seems very unlikely.
+ key = self.prefix + new_name
+ if not connection.exists(key):
+ return new_name
+
+ ### ASGI Group extension ###
+
+ def group_add(self, group, channel):
+ """
+ Adds the channel to the named group for at least 'expiry'
+ seconds (expiry defaults to message expiry if not provided).
+ """
+ assert self.valid_group_name(group), "Group name not valid"
+ assert self.valid_channel_name(channel), "Channel name not valid"
+ group_key = self._group_key(group)
+ connection = self.connection(self.consistent_hash(group))
+ # Add to group sorted set with creation time as timestamp
+ connection.zadd(
+ group_key,
+ **{channel: time.time()}
+ )
+ # Set both expiration to be group_expiry, since everything in
+ # it at this point is guaranteed to expire before that
+ connection.expire(group_key, self.group_expiry)
+
+ def group_discard(self, group, channel):
+ """
+ Removes the channel from the named group if it is in the group;
+ does nothing otherwise (does not error)
+ """
+ assert self.valid_group_name(group), "Group name not valid"
+ assert self.valid_channel_name(channel), "Channel name not valid"
+ key = self._group_key(group)
+ self.connection(self.consistent_hash(group)).zrem(
+ key,
+ channel,
+ )
+
+ def group_channels(self, group):
+ """
+ Returns all channels in the group as an iterable.
+ """
+ key = self._group_key(group)
+ connection = self.connection(self.consistent_hash(group))
+ # Discard old channels based on group_expiry
+ connection.zremrangebyscore(key, 0, int(time.time()) - self.group_expiry)
+ # Return current lot
+ return [x.decode("utf8") for x in connection.zrange(
+ key,
+ 0,
+ -1,
+ )]
+
+ def send_group(self, group, message):
+ """
+ Sends a message to the entire group.
+ """
+ assert self.valid_group_name(group), "Group name not valid"
+ # TODO: More efficient implementation (lua script per shard?)
+ for channel in self.group_channels(group):
+ try:
+ self.send(channel, message)
+ except self.ChannelFull:
+ pass
+
+ def _group_key(self, group):
+ return ("%s:group:%s" % (self.prefix, group)).encode("utf8")
+
+ ### Twisted extension ###
+
+ @defer.inlineCallbacks
+ def receive_twisted(self, channels):
+ """
+ Twisted-native implementation of receive.
+ """
+ # List name get
+ indexes = self._receive_list_names(channels)
+ # Short circuit if no channels
+ if indexes is None:
+ defer.returnValue((None, None))
+ # Get a message from one of our channels
+ while True:
+ got_expired_content = False
+ # Try each index:channels pair at least once or until a result is returned
+ for index, list_names in indexes.items():
+ # Shuffle list_names to avoid the first ones starving others of workers
+ random.shuffle(list_names)
+ # Get a sync connection for conn details
+ sync_connection = self.connection(index)
+ twisted_connection = yield txredisapi.ConnectionPool(
+ host=sync_connection.connection_pool.connection_kwargs['host'],
+ port=sync_connection.connection_pool.connection_kwargs['port'],
+ dbid=sync_connection.connection_pool.connection_kwargs['db'],
+ password=sync_connection.connection_pool.connection_kwargs['password'],
+ )
+ try:
+ # Pop off any waiting message
+ result = yield twisted_connection.blpop(list_names, timeout=self.blpop_timeout)
+ if result:
+ content = yield twisted_connection.get(result[1])
+ # If the content key expired, keep going.
+ if content is None:
+ got_expired_content = True
+ continue
+ # Return the channel it's from and the message
+ channel = result[0][len(self.prefix):]
+ message = self.deserialize(content)
+ # If there is a full channel name stored in the message, unpack it.
+ if "__asgi_channel__" in message:
+ channel = message['__asgi_channel__']
+ del message['__asgi_channel__']
+ defer.returnValue((channel, message))
+ finally:
+ yield twisted_connection.disconnect()
+ # If we only got expired content, try again
+ if got_expired_content:
+ continue
+ else:
+ defer.returnValue((None, None))
+
+ ### statistics extension ###
+
+ STAT_MESSAGES_COUNT = 'messages_count'
+ STAT_MESSAGES_PENDING = 'messages_pending'
+ STAT_MESSAGES_MAX_AGE = 'messages_max_age'
+ STAT_CHANNEL_FULL = 'channel_full_count'
+
+ def _count_global_stats(self, connection_list):
+ statistics = {
+ self.STAT_MESSAGES_COUNT: 0,
+ self.STAT_CHANNEL_FULL: 0,
+ }
+ prefix = self.stats_prefix + self.global_stats_key
+ for connection in connection_list:
+ messages_count, channel_full_count = connection.mget(
+ ':'.join((prefix, self.STAT_MESSAGES_COUNT)),
+ ':'.join((prefix, self.STAT_CHANNEL_FULL)),
+ )
+ statistics[self.STAT_MESSAGES_COUNT] += int(messages_count or 0)
+ statistics[self.STAT_CHANNEL_FULL] += int(channel_full_count or 0)
+
+ return statistics
+
+ def _count_channel_stats(self, channel, connections):
+ statistics = {
+ self.STAT_MESSAGES_COUNT: 0,
+ self.STAT_MESSAGES_PENDING: 0,
+ self.STAT_MESSAGES_MAX_AGE: 0,
+ self.STAT_CHANNEL_FULL: 0,
+ }
+ prefix = self.stats_prefix + channel
+
+ channel_key = self.prefix + channel
+ for connection in connections:
+ messages_count, channel_full_count = connection.mget(
+ ':'.join((prefix, self.STAT_MESSAGES_COUNT)),
+ ':'.join((prefix, self.STAT_CHANNEL_FULL)),
+ )
+ statistics[self.STAT_MESSAGES_COUNT] += int(messages_count or 0)
+ statistics[self.STAT_CHANNEL_FULL] += int(channel_full_count or 0)
+ statistics[self.STAT_MESSAGES_PENDING] += connection.llen(channel_key)
+ oldest_message = connection.lindex(channel_key, 0)
+ if oldest_message:
+ messages_age = self.expiry - connection.ttl(oldest_message)
+ statistics[self.STAT_MESSAGES_MAX_AGE] = max(statistics[self.STAT_MESSAGES_MAX_AGE], messages_age)
+ return statistics
+
+ def _incr_statistics_counter(self, stat_name, channel, connection):
+ """ helper function to intrement counter stats in one go """
+ self.incrstatcounters(
+ keys=[
+ "{prefix}{channel}:{stat_name}".format(
+ prefix=self.stats_prefix,
+ channel=channel,
+ stat_name=stat_name,
+ ),
+ "{prefix}{global_key}:{stat_name}".format(
+ prefix=self.stats_prefix,
+ global_key=self.global_stats_key,
+ stat_name=stat_name,
+ )
+ ],
+ args=[self.channel_statistics_expiry, self.global_statistics_expiry],
+ client=connection,
+ )
+
+ ### Serialization ###
+
+ def serialize(self, message):
+ """
+ Serializes message to a byte string.
+ """
+ value = msgpack.packb(message, use_bin_type=True)
+ if self.crypter:
+ value = self.crypter.encrypt(value)
+ return value
+
+ def deserialize(self, message):
+ """
+ Deserializes from a byte string.
+ """
+ if self.crypter:
+ message = self.crypter.decrypt(message, self.expiry + 10)
+ return msgpack.unpackb(message, encoding="utf8")
+
+ ### Redis Lua scripts ###
+
+ # Single-command channel send. Returns error if over capacity.
+ # Keys: message, channel_list
+ # Args: content, expiry, capacity
+ lua_chansend = """
+ if redis.call('llen', KEYS[2]) >= tonumber(ARGV[3]) then
+ return redis.error_reply("full")
+ end
+ redis.call('set', KEYS[1], ARGV[1])
+ redis.call('expire', KEYS[1], ARGV[2])
+ redis.call('rpush', KEYS[2], KEYS[1])
+ redis.call('expire', KEYS[2], ARGV[2] + 1)
+ """
+
+ # Single-command to increment counter stats.
+ # Keys: channel_stat, global_stat
+ # Args: channel_stat_expiry, global_stat_expiry
... 2066 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asgi-redis.git
More information about the Python-modules-commits
mailing list