[Python-modules-commits] [python-asgi-ipc] 01/13: importing python-asgi-ipc_1.4.0.orig.tar.gz
Michael Fladischer
fladi at moszumanska.debian.org
Fri Nov 17 12:24:55 UTC 2017
This is an automated email from the git hooks/post-receive script.
fladi pushed a commit to branch debian/master
in repository python-asgi-ipc.
commit f0375ece468f77fb25b147a6ff67b4a1dd4b8ba6
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date: Tue Jul 18 09:43:10 2017 +0200
importing python-asgi-ipc_1.4.0.orig.tar.gz
---
.gitignore | 6 ++
.travis.yml | 10 +++
CHANGELOG.txt | 17 +++++
LICENSE | 27 +++++++
Makefile | 16 +++++
README.rst | 136 +++++++++++++++++++++++++++++++++++
asgi_ipc/__init__.py | 4 ++
asgi_ipc/core.py | 151 ++++++++++++++++++++++++++++++++++++++
asgi_ipc/store.py | 192 +++++++++++++++++++++++++++++++++++++++++++++++++
setup.cfg | 2 +
setup.py | 28 ++++++++
tests/__init__.py | 0
tests/test_asgi_ipc.py | 12 ++++
tox.ini | 5 ++
14 files changed, 606 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..b8593f0
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+*.egg-info
+*.pyc
+dist/
+build/
+/.tox
+__pycache__
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..a3764a8
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,10 @@
+sudo: false
+language: python
+python:
+ - "2.7"
+ - "3.4"
+ - "3.5"
+install:
+ - pip install -e .
+script:
+ - python -m unittest discover
diff --git a/CHANGELOG.txt b/CHANGELOG.txt
new file mode 100644
index 0000000..f96249d
--- /dev/null
+++ b/CHANGELOG.txt
@@ -0,0 +1,17 @@
+1.4.0 (2017-05-24)
+------------------
+
+* The message storage backend has been overhauled to lock and serialize less,
+ improving performance considerably and removing all double-receive bugs.
+ Throughput on even a low-powered laptop should be in the thousands of messages
+ per second.
+
+1.3.1 (2017-04-02)
+------------------
+
+* Error with sending to multi-process channels with the same message fixed
+
+1.3.0 (2017-04-01)
+------------------
+
+* Updated to new process-specific channel style
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..5f4f225
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) Django Software Foundation and individual contributors.
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ 1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ 2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+ 3. Neither the name of Django nor the names of its contributors may be used
+ to endorse or promote products derived from this software without
+ specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..1a1f55e
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,16 @@
+.PHONY: release
+
+all:
+
+release:
+ifndef version
+ $(error Please supply a version)
+endif
+ @echo Releasing version $(version)
+ifeq (,$(findstring $(version),$(shell git log --oneline -1)))
+ $(error Last commit does not match version)
+endif
+ git tag $(version)
+ git push
+ git push --tags
+ python setup.py sdist bdist_wheel upload
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..10aab12
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,136 @@
+asgi_ipc
+========
+
+.. image:: https://api.travis-ci.org/django/asgi_ipc.svg
+ :target: https://travis-ci.org/django/asgi_ipc
+
+.. image:: https://img.shields.io/pypi/v/asgi_ipc.svg
+ :target: https://pypi.python.org/pypi/asgi_ipc
+
+An ASGI channel layer that uses POSIX shared memory IPC as its backing store
+(only works between processes on the same machine).
+
+IPC is still a bit of a wild west of UNIX compatability, so if you find weird
+errors, please file an issue with details and exact system specifications. In
+partcular, this module is tested and works well on Linux kernels and the Windows
+Subsystem for Linux; it also works on Mac OS but will not be able to detect
+deadlock situations due to limitations in the kernel.
+
+
+Usage
+-----
+
+You'll need to instantiate the channel layer with a path prefix to create
+IPC objects underneath; any channel layers with the same prefix will talk to
+each other as long as they're on the same machine.
+
+Example:
+
+.. code-block:: python
+
+ import asgi_ipc as asgi
+
+ channel_layer = asgi.IPCChannelLayer(
+ prefix="aeracode",
+ message_memory=200 * 1024 * 1024,
+ )
+
+ channel_layer.send("my_channel", {"text": "Hello ASGI"})
+ print(channel_layer.receive(["my_channel", ]))
+
+``prefix``
+~~~~~~~~~~
+
+Prefix to use for IPC objects under the root namespace. Defaults to ``asgi``.
+IPC layers on the same machine with the same prefix will talk to each other.
+
+``message_memory``
+~~~~~~~~~~~~~~~~~~
+
+The amount of shared memory to allocate to the channel storage, in bytes.
+Defaults to 100MB. All of your in-flight messages must fit into this,
+otherwise you'll get ``ChannelFull`` errors if the memory space is full up.
+
+ASGI messages can be a maximum of one megabyte, and are usually much smaller.
+The IPC routing metadata on top of each message is approximately 50 bytes.
+
+``group_memory``
+~~~~~~~~~~~~~~~~
+
+The amount of shared memory to allocate to the group storage, in bytes.
+Defaults to 20MB. All of your group membership data must fit into this space,
+otherwise your group memberships may fail to persist.
+
+You can fit approximately 4000 group-channel membership associations into one
+megabyte of memory.
+
+``expiry``
+~~~~~~~~~~
+
+Message expiry in seconds. Defaults to ``60``. You generally shouldn't need
+to change this, but you may want to turn it down if you have peaky traffic you
+wish to drop, or up if you have peaky traffic you want to backlog until you
+get to it.
+
+``group_expiry``
+~~~~~~~~~~~~~~~~
+
+Group expiry in seconds. Defaults to ``86400``. Interface servers will drop
+connections after this amount of time; it's recommended you reduce it for a
+healthier system that encourages disconnections.
+
+``capacity``
+~~~~~~~~~~~~
+
+Default channel capacity. Defaults to ``100``. Once a channel is at capacity,
+it will refuse more messages. How this affects different parts of the system
+varies; a HTTP server will refuse connections, for example, while Django
+sending a response will just wait until there's space.
+
+``channel_capacity``
+~~~~~~~~~~~~~~~~~~~~
+
+Per-channel capacity configuration. This lets you tweak the channel capacity
+based on the channel name, and supports both globbing and regular expressions.
+
+It should be a dict mapping channel name pattern to desired capacity; if the
+dict key is a string, it's intepreted as a glob, while if it's a compiled
+``re`` object, it's treated as a regular expression.
+
+This example sets ``http.request`` to 200, all ``http.response!`` channels
+to 10, and all ``websocket.send!`` channels to 20:
+
+.. code-block:: python
+
+ channel_capacity={
+ "http.request": 200,
+ "http.response!*": 10,
+ re.compile(r"^websocket.send\!.+"): 20,
+ }
+
+If you want to enforce a matching order, use an ``OrderedDict`` as the
+argument; channels will then be matched in the order the dict provides them.
+
+Dependencies
+------------
+
+All Channels projects currently support Python 2.7, 3.4 and 3.5.
+
+Contributing
+------------
+
+Please refer to the
+`main Channels contributing docs <https://github.com/django/channels/blob/master/CONTRIBUTING.rst>`_.
+That also contains advice on how to set up the development environment and run the tests.
+
+Maintenance and Security
+------------------------
+
+To report security issues, please contact security at djangoproject.com. For GPG
+signatures and more security process information, see
+https://docs.djangoproject.com/en/dev/internals/security/.
+
+To report bugs or request new features, please open a new GitHub issue.
+
+This repository is part of the Channels project. For the shepherd and maintenance team, please see the
+`main Channels readme <https://github.com/django/channels/blob/master/README.rst>`_.
diff --git a/asgi_ipc/__init__.py b/asgi_ipc/__init__.py
new file mode 100644
index 0000000..92cc83c
--- /dev/null
+++ b/asgi_ipc/__init__.py
@@ -0,0 +1,4 @@
+import pkg_resources
+__version__ = pkg_resources.require('asgi_ipc')[0].version
+
+from .core import IPCChannelLayer # noqa
diff --git a/asgi_ipc/core.py b/asgi_ipc/core.py
new file mode 100644
index 0000000..ba9623d
--- /dev/null
+++ b/asgi_ipc/core.py
@@ -0,0 +1,151 @@
+from __future__ import unicode_literals
+
+import msgpack
+import random
+import six
+import string
+import time
+from asgiref.base_layer import BaseChannelLayer
+
+from .store import ChannelMemoryStore, GroupMemoryStore
+
+
+class IPCChannelLayer(BaseChannelLayer):
+ """
+ Posix IPC backed channel layer, using the posix_ipc module's shared memory
+ and sempahore components.
+
+ It uses mmap'd shared memory areas to store msgpack'd versions of the
+ datastructures; see the store module for more information.
+ """
+
+ def __init__(self,
+ prefix="asgi",
+ expiry=60,
+ group_expiry=86400,
+ capacity=10,
+ channel_capacity=None,
+ message_memory=100*1024*1024,
+ group_memory=20*1024*1024,
+ ):
+ super(IPCChannelLayer, self).__init__(
+ expiry=expiry,
+ group_expiry=group_expiry,
+ capacity=capacity,
+ channel_capacity=channel_capacity,
+ )
+ self.prefix = prefix
+ # Table with queued messages
+ self.message_store = ChannelMemoryStore("%s-messages" % prefix, message_memory)
+ # Table containing all groups to flush
+ self.group_store = GroupMemoryStore("%s-groups" % prefix, group_memory)
+
+ # --------
+ # ASGI API
+ # --------
+
+ extensions = ["flush", "groups"]
+
+ def send(self, channel, message):
+ # Type check
+ assert isinstance(message, dict), "message is not a dict"
+ assert self.valid_channel_name(channel), "channel name not valid"
+ # Make sure the message does not contain reserved keys
+ assert "__asgi_channel__" not in message
+ # If it's a process-local channel, strip off local part and stick full name in message
+ if "!" in channel:
+ message = dict(message.items())
+ message['__asgi_channel__'] = channel
+ channel = self.non_local_name(channel)
+ # Write message into the correct message queue with a size check
+ channel_size = self.message_store.length(channel)
+ if channel_size >= self.get_capacity(channel):
+ raise self.ChannelFull()
+ else:
+ self.message_store.append(
+ channel,
+ msgpack.packb(message, use_bin_type=True),
+ time.time() + self.expiry,
+ )
+
+ def receive(self, channels, block=False):
+ if not channels:
+ return None, None
+ channels = list(channels)
+ assert all(
+ self.valid_channel_name(channel, receive=True) for channel in channels
+ ), "one or more channel names invalid"
+ random.shuffle(channels)
+ # Try to pop off all of the named channels
+ for channel in channels:
+ # See if there is an unexpired message
+ try:
+ message = self.message_store.pop(channel)
+ except IndexError:
+ continue
+ message = msgpack.unpackb(message, encoding="utf8")
+ # If there is a full channel name stored in the message, unpack it.
+ if "__asgi_channel__" in message:
+ channel = message['__asgi_channel__']
+ del message['__asgi_channel__']
+ return channel, message
+ return None, None
+
+ def new_channel(self, pattern):
+ assert isinstance(pattern, six.text_type)
+ # Keep making channel names till one isn't present.
+ while True:
+ random_string = "".join(random.sample(string.ascii_letters, 12))
+ assert pattern.endswith("?")
+ new_name = pattern + random_string
+ if not self.message_store.length(new_name):
+ return new_name
+ else:
+ continue
+
+ # ----------------
+ # Groups extension
+ # ----------------
+
+ def group_add(self, group, channel):
+ """
+ Adds the channel to the named group
+ """
+ assert self.valid_group_name(group), "Invalid group name"
+ self.group_store.add(group, channel, time.time() + self.group_expiry)
+
+ def group_discard(self, group, channel):
+ """
+ Removes the channel from the named group if it is in the group;
+ does nothing otherwise (does not error)
+ """
+ assert self.valid_group_name(group), "Invalid group name"
+ self.group_store.discard(group, channel)
+
+ def send_group(self, group, message):
+ """
+ Sends a message to the entire group.
+ """
+ assert self.valid_group_name(group), "Invalid group name"
+ for channel in self.group_channels(group):
+ try:
+ self.send(channel, message)
+ except self.ChannelFull:
+ pass
+
+ def group_channels(self, group):
+ return self.group_store.flush_expired(group)
+
+ # ---------------
+ # Flush extension
+ # ---------------
+
+ def flush(self):
+ """
+ Deletes all messages and groups.
+ """
+ self.message_store.flush_all()
+ self.group_store.flush_all()
+
+ def __str__(self):
+ return "%s(prefix=%s)" % (self.__class__.__name__, self.prefix)
diff --git a/asgi_ipc/store.py b/asgi_ipc/store.py
new file mode 100644
index 0000000..672a36d
--- /dev/null
+++ b/asgi_ipc/store.py
@@ -0,0 +1,192 @@
+import posix_ipc
+import mmap
+import time
+import contextlib
+from six.moves import cPickle as pickle
+
+
+class BaseMemoryStore(object):
+ """
+ Implements the base of shared memory stores, providing an mmap-ed shared
+ memory area, a semaphore for controlling access to it, and common logic
+ to mutate a pickled value in that memory area atomically.
+
+ POSIX IPC Message Queues are not used as their default limits under most
+ kernels are too small (8KB messages and 256 queues max); channels is a
+ little... heavier than that.
+ """
+
+ # The number of seconds to wait for a lock before raising an error
+ TIMEOUT = 10
+
+ # The default value's factory
+ DEFAULT_FACTORY = dict
+
+ def __init__(self, path, memory_size):
+ self.memory_size = memory_size
+ self.path = path
+ self.semaphore = posix_ipc.Semaphore(path, flags=posix_ipc.O_CREAT, initial_value=1)
+ self.memory = posix_ipc.SharedMemory(path, flags=posix_ipc.O_CREAT, size=self.memory_size)
+ self.mmap = mmap.mmap(self.memory.fd, self.memory.size)
+ self.deadlock_error = RuntimeError(
+ "Semaphore appears to be deadlocked. Kill all channels processes "
+ "and remove files named like %s in /dev/shm" % self.path
+ )
+
+ @contextlib.contextmanager
+ def mutate_value(self):
+ """
+ Allows mutation of the value safely.
+ """
+ # Get the semaphore with an emergency timeout to detect deadlock conditions
+ try:
+ self.semaphore.acquire(self.TIMEOUT)
+ except posix_ipc.BusyError:
+ raise self.deadlock_error
+ try:
+ # Load the value from the shared memory segment (if populated)
+ self.mmap.seek(0)
+ try:
+ value = pickle.load(self.mmap)
+ except EOFError:
+ value = self.DEFAULT_FACTORY()
+ # Let the inside run
+ yield value
+ # Dump the value back into the shared memory segment
+ self.mmap.seek(0)
+ pickle.dump(value, self.mmap, protocol=2)
+ finally:
+ # Release semaphore
+ self.semaphore.release()
+
+ def get_value(self):
+ """
+ Returns the value in the store safely, but does not allow safe mutation.
+ """
+ # Get the semaphore with an emergency timeout to detect deadlock conditions
+ try:
+ self.semaphore.acquire(self.TIMEOUT)
+ except posix_ipc.BusyError:
+ raise self.deadlock_error
+ try:
+ # Load the value from the shared memory segment (if populated)
+ self.mmap.seek(0)
+ try:
+ value = pickle.load(self.mmap)
+ except EOFError:
+ value = self.DEFAULT_FACTORY()
+ return value
+ finally:
+ # Release semaphore
+ self.semaphore.release()
+
+ def flush_all(self):
+ # Get the semaphore with an emergency timeout to detect deadlock conditions
+ try:
+ self.semaphore.acquire(self.TIMEOUT)
+ except posix_ipc.BusyError:
+ raise self.deadlock_error
+ try:
+ # Just write over the mmap area
+ self.mmap.seek(0)
+ pickle.dump(self.DEFAULT_FACTORY(), self.mmap, protocol=2)
+ finally:
+ # Release semaphore
+ self.semaphore.release()
+
+
+class ChannelMemoryStore(BaseMemoryStore):
+ """
+ Implements a shared memory store that maps unicode strings to ordered
+ channels of binary blobs with expiry times.
+ """
+
+ def append(self, name, item, expiry):
+ """
+ Adds a binary blob to the right of the channel.
+ """
+ with self.mutate_value() as value:
+ value.setdefault(name, []).append((item, expiry))
+
+ def pop(self, name):
+ """
+ Tries to pop an item off of the left of the channel, raising IndexError if
+ no item was available to pop.
+ """
+ with self.mutate_value() as value:
+ # See if the channel even exists
+ if name not in value:
+ raise IndexError("Channel %s is empty" % name)
+ # Go through the channel until we find a nonexpired message
+ for i, item_and_expiry in enumerate(value[name]):
+ if item_and_expiry[1] >= time.time():
+ value[name] = value[name][i + 1:]
+ return item_and_expiry[0]
+ # All messages are expired!
+ del value[name]
+ raise IndexError("Channel %s is empty" % name)
+
+ def length(self, name):
+ """
+ Removes all expired items from the channel, and returns the
+ number of messages in the channel.
+ """
+ with self.mutate_value() as value:
+ new_contents = [
+ (item, expiry)
+ for item, expiry
+ in value.get(name, [])
+ if expiry >= time.time()
+ ]
+ if new_contents:
+ value[name] = new_contents
+ return len(new_contents)
+ else:
+ if name in value:
+ del value[name]
+ return 0
+
+
+class GroupMemoryStore(BaseMemoryStore):
+ """
+ Implements a shared memory store that maps unicode strings to sets
+ of (unicode string, expiry time).
+ """
+
+ def add(self, name, item, expiry):
+ """
+ Adds a group member with an expiry time. If it already exists,
+ update the expiry time.
+ """
+ with self.mutate_value() as value:
+ value.setdefault(name, {})[item] = expiry
+
+ def discard(self, name, item):
+ """
+ Removes a group member if it exists. If not, silently returns.
+ """
+ with self.mutate_value() as value:
+ if name in value and item in value[name]:
+ del value[name][item]
+
+ def flush(self, name):
+ """
+ Removes all members from the group
+ """
+ with self.mutate_value() as value:
+ if name in value:
+ del value[name]
+
+ def flush_expired(self, name):
+ """
+ Removes all members from the group who have expired, and returns the
+ new list of members.
+ """
+ with self.mutate_value() as value:
+ value[name] = {
+ item: expiry
+ for item, expiry in value[name].items()
+ if expiry >= time.time()
+ }
+ return value[name].keys()
+
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..3c6e79c
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,2 @@
+[bdist_wheel]
+universal=1
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..f2cdd6f
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,28 @@
+import os
+from setuptools import setup
+
+__version__ = '1.4.0'
+
+# We use the README as the long_description
+readme_path = os.path.join(os.path.dirname(__file__), "README.rst")
+
+
+setup(
+ name='asgi_ipc',
+ version=__version__,
+ url='http://github.com/django/asgi_ipc/',
+ author='Django Software Foundation',
+ author_email='foundation at djangoproject.com',
+ description='Posix IPC-backed ASGI channel layer implementation',
+ long_description=open(readme_path).read(),
+ license='BSD',
+ zip_safe=False,
+ packages=["asgi_ipc"],
+ include_package_data=True,
+ install_requires=[
+ 'six',
+ 'posix_ipc>=1.0.0',
+ 'msgpack-python',
+ 'asgiref~=1.1.2',
+ ]
+)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_asgi_ipc.py b/tests/test_asgi_ipc.py
new file mode 100644
index 0000000..1aae1dc
--- /dev/null
+++ b/tests/test_asgi_ipc.py
@@ -0,0 +1,12 @@
+from __future__ import unicode_literals
+
+from asgi_ipc import IPCChannelLayer
+from asgiref.conformance import ConformanceTestCase
+
+
+# Default conformance tests
+class IPCLayerTests(ConformanceTestCase):
+
+ channel_layer = IPCChannelLayer(expiry=1, group_expiry=2, capacity=5)
+ expiry_delay = 1.1
+ capacity_limit = 5
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..0e59285
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,5 @@
+[tox]
+envlist = py27,py34,py35,py36
+
+[testenv]
+commands = python -m unittest discover
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asgi-ipc.git
More information about the Python-modules-commits
mailing list