[Python-modules-commits] [python-asgi-ipc] 01/13: importing python-asgi-ipc_1.4.0.orig.tar.gz

Michael Fladischer fladi at moszumanska.debian.org
Fri Nov 17 12:24:55 UTC 2017


This is an automated email from the git hooks/post-receive script.

fladi pushed a commit to branch debian/master
in repository python-asgi-ipc.

commit f0375ece468f77fb25b147a6ff67b4a1dd4b8ba6
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date:   Tue Jul 18 09:43:10 2017 +0200

    importing python-asgi-ipc_1.4.0.orig.tar.gz
---
 .gitignore             |   6 ++
 .travis.yml            |  10 +++
 CHANGELOG.txt          |  17 +++++
 LICENSE                |  27 +++++++
 Makefile               |  16 +++++
 README.rst             | 136 +++++++++++++++++++++++++++++++++++
 asgi_ipc/__init__.py   |   4 ++
 asgi_ipc/core.py       | 151 ++++++++++++++++++++++++++++++++++++++
 asgi_ipc/store.py      | 192 +++++++++++++++++++++++++++++++++++++++++++++++++
 setup.cfg              |   2 +
 setup.py               |  28 ++++++++
 tests/__init__.py      |   0
 tests/test_asgi_ipc.py |  12 ++++
 tox.ini                |   5 ++
 14 files changed, 606 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..b8593f0
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+*.egg-info
+*.pyc
+dist/
+build/
+/.tox
+__pycache__
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..a3764a8
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,10 @@
+sudo: false
+language: python
+python:
+  - "2.7"
+  - "3.4"
+  - "3.5"
+install:
+  - pip install -e .
+script:
+  - python -m unittest discover
diff --git a/CHANGELOG.txt b/CHANGELOG.txt
new file mode 100644
index 0000000..f96249d
--- /dev/null
+++ b/CHANGELOG.txt
@@ -0,0 +1,17 @@
+1.4.0 (2017-05-24)
+------------------
+
+* The message storage backend has been overhauled to lock and serialize less,
+  improving performance considerably and removing all double-receive bugs.
+  Throughput on even a low-powered laptop should be in the thousands of messages
+  per second.
+
+1.3.1 (2017-04-02)
+------------------
+
+* Error with sending to multi-process channels with the same message fixed
+
+1.3.0 (2017-04-01)
+------------------
+
+* Updated to new process-specific channel style
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..5f4f225
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) Django Software Foundation and individual contributors.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+    1. Redistributions of source code must retain the above copyright notice,
+       this list of conditions and the following disclaimer.
+
+    2. Redistributions in binary form must reproduce the above copyright
+       notice, this list of conditions and the following disclaimer in the
+       documentation and/or other materials provided with the distribution.
+
+    3. Neither the name of Django nor the names of its contributors may be used
+       to endorse or promote products derived from this software without
+       specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..1a1f55e
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,16 @@
+.PHONY: release
+
+all:
+
+release:
+ifndef version
+	$(error Please supply a version)
+endif
+	@echo Releasing version $(version)
+ifeq (,$(findstring $(version),$(shell git log --oneline -1)))
+	$(error Last commit does not match version)
+endif
+	git tag $(version)
+	git push
+	git push --tags
+	python setup.py sdist bdist_wheel upload
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..10aab12
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,136 @@
+asgi_ipc
+========
+
+.. image:: https://api.travis-ci.org/django/asgi_ipc.svg
+    :target: https://travis-ci.org/django/asgi_ipc
+
+.. image:: https://img.shields.io/pypi/v/asgi_ipc.svg
+    :target: https://pypi.python.org/pypi/asgi_ipc
+
+An ASGI channel layer that uses POSIX shared memory IPC as its backing store
+(only works between processes on the same machine).
+
+IPC is still a bit of a wild west of UNIX compatability, so if you find weird
+errors, please file an issue with details and exact system specifications. In
+partcular, this module is tested and works well on Linux kernels and the Windows
+Subsystem for Linux; it also works on Mac OS but will not be able to detect
+deadlock situations due to limitations in the kernel.
+
+
+Usage
+-----
+
+You'll need to instantiate the channel layer with a path prefix to create
+IPC objects underneath; any channel layers with the same prefix will talk to
+each other as long as they're on the same machine.
+
+Example:
+
+.. code-block:: python
+
+    import asgi_ipc as asgi
+
+    channel_layer = asgi.IPCChannelLayer(
+        prefix="aeracode",
+        message_memory=200 * 1024 * 1024,
+    )
+
+    channel_layer.send("my_channel", {"text": "Hello ASGI"})
+    print(channel_layer.receive(["my_channel", ]))
+
+``prefix``
+~~~~~~~~~~
+
+Prefix to use for IPC objects under the root namespace. Defaults to ``asgi``.
+IPC layers on the same machine with the same prefix will talk to each other.
+
+``message_memory``
+~~~~~~~~~~~~~~~~~~
+
+The amount of shared memory to allocate to the channel storage, in bytes.
+Defaults to 100MB. All of your in-flight messages must fit into this,
+otherwise you'll get ``ChannelFull`` errors if the memory space is full up.
+
+ASGI messages can be a maximum of one megabyte, and are usually much smaller.
+The IPC routing metadata on top of each message is approximately 50 bytes.
+
+``group_memory``
+~~~~~~~~~~~~~~~~
+
+The amount of shared memory to allocate to the group storage, in bytes.
+Defaults to 20MB. All of your group membership data must fit into this space,
+otherwise your group memberships may fail to persist.
+
+You can fit approximately 4000 group-channel membership associations into one
+megabyte of memory.
+
+``expiry``
+~~~~~~~~~~
+
+Message expiry in seconds. Defaults to ``60``. You generally shouldn't need
+to change this, but you may want to turn it down if you have peaky traffic you
+wish to drop, or up if you have peaky traffic you want to backlog until you
+get to it.
+
+``group_expiry``
+~~~~~~~~~~~~~~~~
+
+Group expiry in seconds. Defaults to ``86400``. Interface servers will drop
+connections after this amount of time; it's recommended you reduce it for a
+healthier system that encourages disconnections.
+
+``capacity``
+~~~~~~~~~~~~
+
+Default channel capacity. Defaults to ``100``. Once a channel is at capacity,
+it will refuse more messages. How this affects different parts of the system
+varies; a HTTP server will refuse connections, for example, while Django
+sending a response will just wait until there's space.
+
+``channel_capacity``
+~~~~~~~~~~~~~~~~~~~~
+
+Per-channel capacity configuration. This lets you tweak the channel capacity
+based on the channel name, and supports both globbing and regular expressions.
+
+It should be a dict mapping channel name pattern to desired capacity; if the
+dict key is a string, it's intepreted as a glob, while if it's a compiled
+``re`` object, it's treated as a regular expression.
+
+This example sets ``http.request`` to 200, all ``http.response!`` channels
+to 10, and all ``websocket.send!`` channels to 20:
+
+.. code-block:: python
+
+    channel_capacity={
+        "http.request": 200,
+        "http.response!*": 10,
+        re.compile(r"^websocket.send\!.+"): 20,
+    }
+
+If you want to enforce a matching order, use an ``OrderedDict`` as the
+argument; channels will then be matched in the order the dict provides them.
+
+Dependencies
+------------
+
+All Channels projects currently support Python 2.7, 3.4 and 3.5.
+
+Contributing
+------------
+
+Please refer to the
+`main Channels contributing docs <https://github.com/django/channels/blob/master/CONTRIBUTING.rst>`_.
+That also contains advice on how to set up the development environment and run the tests.
+
+Maintenance and Security
+------------------------
+
+To report security issues, please contact security at djangoproject.com. For GPG
+signatures and more security process information, see
+https://docs.djangoproject.com/en/dev/internals/security/.
+
+To report bugs or request new features, please open a new GitHub issue.
+
+This repository is part of the Channels project. For the shepherd and maintenance team, please see the
+`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.
diff --git a/asgi_ipc/__init__.py b/asgi_ipc/__init__.py
new file mode 100644
index 0000000..92cc83c
--- /dev/null
+++ b/asgi_ipc/__init__.py
@@ -0,0 +1,4 @@
+import pkg_resources
+__version__ = pkg_resources.require('asgi_ipc')[0].version
+
+from .core import IPCChannelLayer  # noqa
diff --git a/asgi_ipc/core.py b/asgi_ipc/core.py
new file mode 100644
index 0000000..ba9623d
--- /dev/null
+++ b/asgi_ipc/core.py
@@ -0,0 +1,151 @@
+from __future__ import unicode_literals
+
+import msgpack
+import random
+import six
+import string
+import time
+from asgiref.base_layer import BaseChannelLayer
+
+from .store import ChannelMemoryStore, GroupMemoryStore
+
+
+class IPCChannelLayer(BaseChannelLayer):
+    """
+    Posix IPC backed channel layer, using the posix_ipc module's shared memory
+    and sempahore components.
+
+    It uses mmap'd shared memory areas to store msgpack'd versions of the
+    datastructures; see the store module for more information.
+    """
+
+    def __init__(self,
+        prefix="asgi",
+        expiry=60,
+        group_expiry=86400,
+        capacity=10,
+        channel_capacity=None,
+        message_memory=100*1024*1024,
+        group_memory=20*1024*1024,
+    ):
+        super(IPCChannelLayer, self).__init__(
+            expiry=expiry,
+            group_expiry=group_expiry,
+            capacity=capacity,
+            channel_capacity=channel_capacity,
+        )
+        self.prefix = prefix
+        # Table with queued messages
+        self.message_store = ChannelMemoryStore("%s-messages" % prefix, message_memory)
+        # Table containing all groups to flush
+        self.group_store = GroupMemoryStore("%s-groups" % prefix, group_memory)
+
+    # --------
+    # ASGI API
+    # --------
+
+    extensions = ["flush", "groups"]
+
+    def send(self, channel, message):
+        # Type check
+        assert isinstance(message, dict), "message is not a dict"
+        assert self.valid_channel_name(channel), "channel name not valid"
+        # Make sure the message does not contain reserved keys
+        assert "__asgi_channel__" not in message
+        # If it's a process-local channel, strip off local part and stick full name in message
+        if "!" in channel:
+            message = dict(message.items())
+            message['__asgi_channel__'] = channel
+            channel = self.non_local_name(channel)
+        # Write message into the correct message queue with a size check
+        channel_size = self.message_store.length(channel)
+        if channel_size >= self.get_capacity(channel):
+            raise self.ChannelFull()
+        else:
+            self.message_store.append(
+                channel,
+                msgpack.packb(message, use_bin_type=True),
+                time.time() + self.expiry,
+            )
+
+    def receive(self, channels, block=False):
+        if not channels:
+            return None, None
+        channels = list(channels)
+        assert all(
+            self.valid_channel_name(channel, receive=True) for channel in channels
+        ), "one or more channel names invalid"
+        random.shuffle(channels)
+        # Try to pop off all of the named channels
+        for channel in channels:
+            # See if there is an unexpired message
+            try:
+                message = self.message_store.pop(channel)
+            except IndexError:
+                continue
+            message = msgpack.unpackb(message, encoding="utf8")
+            # If there is a full channel name stored in the message, unpack it.
+            if "__asgi_channel__" in message:
+                channel = message['__asgi_channel__']
+                del message['__asgi_channel__']
+            return channel, message
+        return None, None
+
+    def new_channel(self, pattern):
+        assert isinstance(pattern, six.text_type)
+        # Keep making channel names till one isn't present.
+        while True:
+            random_string = "".join(random.sample(string.ascii_letters, 12))
+            assert pattern.endswith("?")
+            new_name = pattern + random_string
+            if not self.message_store.length(new_name):
+                return new_name
+            else:
+                continue
+
+    # ----------------
+    # Groups extension
+    # ----------------
+
+    def group_add(self, group, channel):
+        """
+        Adds the channel to the named group
+        """
+        assert self.valid_group_name(group), "Invalid group name"
+        self.group_store.add(group, channel, time.time() + self.group_expiry)
+
+    def group_discard(self, group, channel):
+        """
+        Removes the channel from the named group if it is in the group;
+        does nothing otherwise (does not error)
+        """
+        assert self.valid_group_name(group), "Invalid group name"
+        self.group_store.discard(group, channel)
+
+    def send_group(self, group, message):
+        """
+        Sends a message to the entire group.
+        """
+        assert self.valid_group_name(group), "Invalid group name"
+        for channel in self.group_channels(group):
+            try:
+                self.send(channel, message)
+            except self.ChannelFull:
+                pass
+
+    def group_channels(self, group):
+        return self.group_store.flush_expired(group)
+
+    # ---------------
+    # Flush extension
+    # ---------------
+
+    def flush(self):
+        """
+        Deletes all messages and groups.
+        """
+        self.message_store.flush_all()
+        self.group_store.flush_all()
+
+    def __str__(self):
+        return "%s(prefix=%s)" % (self.__class__.__name__, self.prefix)
diff --git a/asgi_ipc/store.py b/asgi_ipc/store.py
new file mode 100644
index 0000000..672a36d
--- /dev/null
+++ b/asgi_ipc/store.py
@@ -0,0 +1,192 @@
+import posix_ipc
+import mmap
+import time
+import contextlib
+from six.moves import cPickle as pickle
+
+
+class BaseMemoryStore(object):
+    """
+    Implements the base of shared memory stores, providing an mmap-ed shared
+    memory area, a semaphore for controlling access to it, and common logic
+    to mutate a pickled value in that memory area atomically.
+
+    POSIX IPC Message Queues are not used as their default limits under most
+    kernels are too small (8KB messages and 256 queues max); channels is a
+    little... heavier than that.
+    """
+
+    # The number of seconds to wait for a lock before raising an error
+    TIMEOUT = 10
+
+    # The default value's factory
+    DEFAULT_FACTORY = dict
+
+    def __init__(self, path, memory_size):
+        self.memory_size = memory_size
+        self.path = path
+        self.semaphore = posix_ipc.Semaphore(path, flags=posix_ipc.O_CREAT, initial_value=1)
+        self.memory = posix_ipc.SharedMemory(path, flags=posix_ipc.O_CREAT, size=self.memory_size)
+        self.mmap = mmap.mmap(self.memory.fd, self.memory.size)
+        self.deadlock_error = RuntimeError(
+            "Semaphore appears to be deadlocked. Kill all channels processes "
+            "and remove files named like %s in /dev/shm" % self.path
+        )
+
+    @contextlib.contextmanager
+    def mutate_value(self):
+        """
+        Allows mutation of the value safely.
+        """
+        # Get the semaphore with an emergency timeout to detect deadlock conditions
+        try:
+            self.semaphore.acquire(self.TIMEOUT)
+        except posix_ipc.BusyError:
+            raise self.deadlock_error
+        try:
+            # Load the value from the shared memory segment (if populated)
+            self.mmap.seek(0)
+            try:
+                value = pickle.load(self.mmap)
+            except EOFError:
+                value = self.DEFAULT_FACTORY()
+            # Let the inside run
+            yield value
+            # Dump the value back into the shared memory segment
+            self.mmap.seek(0)
+            pickle.dump(value, self.mmap, protocol=2)
+        finally:
+            # Release semaphore
+            self.semaphore.release()
+
+    def get_value(self):
+        """
+        Returns the value in the store safely, but does not allow safe mutation.
+        """
+        # Get the semaphore with an emergency timeout to detect deadlock conditions
+        try:
+            self.semaphore.acquire(self.TIMEOUT)
+        except posix_ipc.BusyError:
+            raise self.deadlock_error
+        try:
+            # Load the value from the shared memory segment (if populated)
+            self.mmap.seek(0)
+            try:
+                value = pickle.load(self.mmap)
+            except EOFError:
+                value = self.DEFAULT_FACTORY()
+            return value
+        finally:
+            # Release semaphore
+            self.semaphore.release()
+
+    def flush_all(self):
+        # Get the semaphore with an emergency timeout to detect deadlock conditions
+        try:
+            self.semaphore.acquire(self.TIMEOUT)
+        except posix_ipc.BusyError:
+            raise self.deadlock_error
+        try:
+            # Just write over the mmap area
+            self.mmap.seek(0)
+            pickle.dump(self.DEFAULT_FACTORY(), self.mmap, protocol=2)
+        finally:
+            # Release semaphore
+            self.semaphore.release()
+
+
+class ChannelMemoryStore(BaseMemoryStore):
+    """
+    Implements a shared memory store that maps unicode strings to ordered
+    channels of binary blobs with expiry times.
+    """
+
+    def append(self, name, item, expiry):
+        """
+        Adds a binary blob to the right of the channel.
+        """
+        with self.mutate_value() as value:
+            value.setdefault(name, []).append((item, expiry))
+
+    def pop(self, name):
+        """
+        Tries to pop an item off of the left of the channel, raising IndexError if
+        no item was available to pop.
+        """
+        with self.mutate_value() as value:
+            # See if the channel even exists
+            if name not in value:
+                raise IndexError("Channel %s is empty" % name)
+            # Go through the channel until we find a nonexpired message
+            for i, item_and_expiry in enumerate(value[name]):
+                if item_and_expiry[1] >= time.time():
+                    value[name] = value[name][i + 1:]
+                    return item_and_expiry[0]
+            # All messages are expired!
+            del value[name]
+            raise IndexError("Channel %s is empty" % name)
+
+    def length(self, name):
+        """
+        Removes all expired items from the channel, and returns the
+        number of messages in the channel.
+        """
+        with self.mutate_value() as value:
+            new_contents = [
+                (item, expiry)
+                for item, expiry
+                in value.get(name, [])
+                if expiry >= time.time()
+            ]
+            if new_contents:
+                value[name] = new_contents
+                return len(new_contents)
+            else:
+                if name in value:
+                    del value[name]
+                return 0
+
+
+class GroupMemoryStore(BaseMemoryStore):
+    """
+    Implements a shared memory store that maps unicode strings to sets
+    of (unicode string, expiry time).
+    """
+
+    def add(self, name, item, expiry):
+        """
+        Adds a group member with an expiry time. If it already exists,
+        update the expiry time.
+        """
+        with self.mutate_value() as value:
+            value.setdefault(name, {})[item] = expiry
+
+    def discard(self, name, item):
+        """
+        Removes a group member if it exists. If not, silently returns.
+        """
+        with self.mutate_value() as value:
+            if name in value and item in value[name]:
+                del value[name][item]
+
+    def flush(self, name):
+        """
+        Removes all members from the group
+        """
+        with self.mutate_value() as value:
+            if name in value:
+                del value[name]
+
+    def flush_expired(self, name):
+        """
+        Removes all members from the group who have expired, and returns the
+        new list of members.
+        """
+        with self.mutate_value() as value:
+            value[name] = {
+                item: expiry
+                for item, expiry in value[name].items()
+                if expiry >= time.time()
+            }
+        return value[name].keys()
+
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..3c6e79c
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[bdist_wheel]
+universal=1
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..f2cdd6f
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,28 @@
+import os
+from setuptools import setup
+
+__version__ = '1.4.0'
+
+# We use the README as the long_description
+readme_path = os.path.join(os.path.dirname(__file__), "README.rst")
+
+
+setup(
+    name='asgi_ipc',
+    version=__version__,
+    url='http://github.com/django/asgi_ipc/',
+    author='Django Software Foundation',
+    author_email='foundation at djangoproject.com',
+    description='Posix IPC-backed ASGI channel layer implementation',
+    long_description=open(readme_path).read(),
+    license='BSD',
+    zip_safe=False,
+    packages=["asgi_ipc"],
+    include_package_data=True,
+    install_requires=[
+        'six',
+        'posix_ipc>=1.0.0',
+        'msgpack-python',
+        'asgiref~=1.1.2',
+    ]
+)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_asgi_ipc.py b/tests/test_asgi_ipc.py
new file mode 100644
index 0000000..1aae1dc
--- /dev/null
+++ b/tests/test_asgi_ipc.py
@@ -0,0 +1,12 @@
+from __future__ import unicode_literals
+
+from asgi_ipc import IPCChannelLayer
+from asgiref.conformance import ConformanceTestCase
+
+
+# Default conformance tests
+class IPCLayerTests(ConformanceTestCase):
+
+    channel_layer = IPCChannelLayer(expiry=1, group_expiry=2, capacity=5)
+    expiry_delay = 1.1
+    capacity_limit = 5
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..0e59285
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,5 @@
+[tox]
+envlist = py27,py34,py35,py36
+
+[testenv]
+commands = python -m unittest discover

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asgi-ipc.git



More information about the Python-modules-commits mailing list