[Python-modules-commits] [aioredis] 01/05: Import aioredis_0.2.5.orig.tar.gz
Piotr Ożarowski
piotr at moszumanska.debian.org
Thu Mar 3 20:10:31 UTC 2016
This is an automated email from the git hooks/post-receive script.
piotr pushed a commit to branch master
in repository aioredis.
commit 125cad2e870eedcd58ca9393dfa6fd578da166d0
Author: Piotr Ożarowski <piotr at debian.org>
Date: Thu Mar 3 20:55:36 2016 +0100
Import aioredis_0.2.5.orig.tar.gz
---
CHANGES.txt | 51 +++++++++++++++++++++++-
LICENSE | 4 +-
PKG-INFO | 61 +++++++++++++++++++++++++---
README.rst | 8 +++-
aioredis.egg-info/PKG-INFO | 61 +++++++++++++++++++++++++---
aioredis.egg-info/requires.txt | 2 +-
aioredis/__init__.py | 11 +++---
aioredis/commands/transaction.py | 4 +-
aioredis/connection.py | 85 +++++++++++++++++++++++++++++++---------
aioredis/errors.py | 6 ++-
aioredis/pool.py | 11 ++++--
aioredis/util.py | 58 +++++++++++++++++++++++++--
setup.cfg | 2 +-
13 files changed, 313 insertions(+), 51 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 7de3631..c703b17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,52 @@
Changes
-------
+0.2.5 (2016-03-02)
+^^^^^^^^^^^^^^^^^^
+
+* Close all Pub/Sub channels on connection close
+ (see `#88 <https://github.com/aio-libs/aioredis/issues/88>`_);
+
+* Add ``iter()`` method to ``aioredis.Channel`` allowing to use it
+ with ``async for``
+ (see `#89 <https://github.com/aio-libs/aioredis/issues/89>`_);
+
+* Inline code samples in docs made runnable and downloadable
+ (see `#92 <https://github.com/aio-libs/aioredis/issues/92>`_);
+
+* Python 3.5 examples converted to use ``async``/``await`` syntax
+ (see `#93 <https://github.com/aio-libs/aioredis/issues/93>`_);
+
+* Fix Multi/Exec to honor encoding parameter
+ (see `#94 <https://github.com/aio-libs/aioredis/issues/94>`_
+ and `#97 <https://github.com/aio-libs/aioredis/issues/97>`_);
+
+* Add debug message in ``create_connection``
+ (see `#90 <https://github.com/aio-libs/aioredis/issues/90>`_);
+
+* Replace ``asyncio.async`` calls with wrapper that respects asyncio version
+ (see `#101 <https://github.com/aio-libs/aioredis/issues/101>`_);
+
+* Use NODELAY option for TCP sockets
+ (see `#105 <https://github.com/aio-libs/aioredis/issues/105>`_);
+
+* New ``aioredis.ConnectionClosedError`` exception added. Raised if
+ connection to Redis server is lost
+ (see `#108 <https://github.com/aio-libs/aioredis/issues/108>`_
+ and `#109 <https://github.com/aio-libs/aioredis/issues/109>`_);
+
+* Fix RedisPool to close and drop connection in subscribe mode on release;
+
+* Fix ``aioredis.util.decode`` to recursively decode list responses;
+
+* More examples added and docs updated;
+
+* Add google groups link to README;
+
+* Bump year in LICENSE and docs;
+
+
+
0.2.4 (2015-10-13)
^^^^^^^^^^^^^^^^^^
@@ -8,7 +54,8 @@ Changes
- New scan commands API (``iscan``, ``izscan``, ``ihscan``);
- - Pool made awaitable (allowing ``with await pool: ...`` constructs);
+ - Pool made awaitable (allowing ``with await pool: ...`` and ``async
+ with pool.get() as conn:`` constructs);
* Fixed dropping closed connections from free pool
(see `#83 <https://github.com/aio-libs/aioredis/issues/83>`_);
@@ -39,7 +86,7 @@ Changes
0.2.2 (2015-07-07)
^^^^^^^^^^^^^^^^^^
-* Decoding data with ``encoding`` paramter now takes into account
+* Decoding data with ``encoding`` parameter now takes into account
list (array) replies
(see `#68 <https://github.com/aio-libs/aioredis/pull/68>`_);
diff --git a/LICENSE b/LICENSE
index 435733f..270d29e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,6 +1,6 @@
The MIT License (MIT)
-Copyright (c) 2014 Alexey Popravka
+Copyright (c) 2014-2016 Alexey Popravka
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@@ -18,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
\ No newline at end of file
+SOFTWARE.
diff --git a/PKG-INFO b/PKG-INFO
index 7b5eb0e..ea61354 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: aioredis
-Version: 0.2.4
+Version: 0.2.5
Summary: asyncio (PEP 3156) Redis support
Home-page: https://github.com/aio-libs/aioredis
Author: Alexey Popravka
@@ -15,8 +15,8 @@ Description: aioredis
:target: https://travis-ci.org/aio-libs/aioredis
- .. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.png?branch=master
- :target: https://coveralls.io/r/aio-libs/aioredis?branch=master
+ .. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.svg?branch=master&service=github
+ :target: https://coveralls.io/github/aio-libs/aioredis?branch=master
Features
--------
@@ -118,6 +118,10 @@ Description: aioredis
hiredis is preferred requirement.
Pure-python fallback protocol parser is TBD.
+ Discussion list
+ ---------------
+
+ *aio-libs* google group: https://groups.google.com/forum/#!forum/aio-libs
License
-------
@@ -134,6 +138,52 @@ Description: aioredis
Changes
-------
+ 0.2.5 (2016-03-02)
+ ^^^^^^^^^^^^^^^^^^
+
+ * Close all Pub/Sub channels on connection close
+ (see `#88 <https://github.com/aio-libs/aioredis/issues/88>`_);
+
+ * Add ``iter()`` method to ``aioredis.Channel`` allowing to use it
+ with ``async for``
+ (see `#89 <https://github.com/aio-libs/aioredis/issues/89>`_);
+
+ * Inline code samples in docs made runnable and downloadable
+ (see `#92 <https://github.com/aio-libs/aioredis/issues/92>`_);
+
+ * Python 3.5 examples converted to use ``async``/``await`` syntax
+ (see `#93 <https://github.com/aio-libs/aioredis/issues/93>`_);
+
+ * Fix Multi/Exec to honor encoding parameter
+ (see `#94 <https://github.com/aio-libs/aioredis/issues/94>`_
+ and `#97 <https://github.com/aio-libs/aioredis/issues/97>`_);
+
+ * Add debug message in ``create_connection``
+ (see `#90 <https://github.com/aio-libs/aioredis/issues/90>`_);
+
+ * Replace ``asyncio.async`` calls with wrapper that respects asyncio version
+ (see `#101 <https://github.com/aio-libs/aioredis/issues/101>`_);
+
+ * Use NODELAY option for TCP sockets
+ (see `#105 <https://github.com/aio-libs/aioredis/issues/105>`_);
+
+ * New ``aioredis.ConnectionClosedError`` exception added. Raised if
+ connection to Redis server is lost
+ (see `#108 <https://github.com/aio-libs/aioredis/issues/108>`_
+ and `#109 <https://github.com/aio-libs/aioredis/issues/109>`_);
+
+ * Fix RedisPool to close and drop connection in subscribe mode on release;
+
+ * Fix ``aioredis.util.decode`` to recursively decode list responses;
+
+ * More examples added and docs updated;
+
+ * Add google groups link to README;
+
+ * Bump year in LICENSE and docs;
+
+
+
0.2.4 (2015-10-13)
^^^^^^^^^^^^^^^^^^
@@ -141,7 +191,8 @@ Description: aioredis
- New scan commands API (``iscan``, ``izscan``, ``ihscan``);
- - Pool made awaitable (allowing ``with await pool: ...`` constructs);
+ - Pool made awaitable (allowing ``with await pool: ...`` and ``async
+ with pool.get() as conn:`` constructs);
* Fixed dropping closed connections from free pool
(see `#83 <https://github.com/aio-libs/aioredis/issues/83>`_);
@@ -172,7 +223,7 @@ Description: aioredis
0.2.2 (2015-07-07)
^^^^^^^^^^^^^^^^^^
- * Decoding data with ``encoding`` paramter now takes into account
+ * Decoding data with ``encoding`` parameter now takes into account
list (array) replies
(see `#68 <https://github.com/aio-libs/aioredis/pull/68>`_);
diff --git a/README.rst b/README.rst
index f448791..c1f01a2 100644
--- a/README.rst
+++ b/README.rst
@@ -7,8 +7,8 @@ asyncio (PEP 3156) Redis client library.
:target: https://travis-ci.org/aio-libs/aioredis
-.. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.png?branch=master
- :target: https://coveralls.io/r/aio-libs/aioredis?branch=master
+.. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.svg?branch=master&service=github
+ :target: https://coveralls.io/github/aio-libs/aioredis?branch=master
Features
--------
@@ -110,6 +110,10 @@ Requirements
hiredis is preferred requirement.
Pure-python fallback protocol parser is TBD.
+Discussion list
+---------------
+
+*aio-libs* google group: https://groups.google.com/forum/#!forum/aio-libs
License
-------
diff --git a/aioredis.egg-info/PKG-INFO b/aioredis.egg-info/PKG-INFO
index 7b5eb0e..ea61354 100644
--- a/aioredis.egg-info/PKG-INFO
+++ b/aioredis.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: aioredis
-Version: 0.2.4
+Version: 0.2.5
Summary: asyncio (PEP 3156) Redis support
Home-page: https://github.com/aio-libs/aioredis
Author: Alexey Popravka
@@ -15,8 +15,8 @@ Description: aioredis
:target: https://travis-ci.org/aio-libs/aioredis
- .. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.png?branch=master
- :target: https://coveralls.io/r/aio-libs/aioredis?branch=master
+ .. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.svg?branch=master&service=github
+ :target: https://coveralls.io/github/aio-libs/aioredis?branch=master
Features
--------
@@ -118,6 +118,10 @@ Description: aioredis
hiredis is preferred requirement.
Pure-python fallback protocol parser is TBD.
+ Discussion list
+ ---------------
+
+ *aio-libs* google group: https://groups.google.com/forum/#!forum/aio-libs
License
-------
@@ -134,6 +138,52 @@ Description: aioredis
Changes
-------
+ 0.2.5 (2016-03-02)
+ ^^^^^^^^^^^^^^^^^^
+
+ * Close all Pub/Sub channels on connection close
+ (see `#88 <https://github.com/aio-libs/aioredis/issues/88>`_);
+
+ * Add ``iter()`` method to ``aioredis.Channel`` allowing to use it
+ with ``async for``
+ (see `#89 <https://github.com/aio-libs/aioredis/issues/89>`_);
+
+ * Inline code samples in docs made runnable and downloadable
+ (see `#92 <https://github.com/aio-libs/aioredis/issues/92>`_);
+
+ * Python 3.5 examples converted to use ``async``/``await`` syntax
+ (see `#93 <https://github.com/aio-libs/aioredis/issues/93>`_);
+
+ * Fix Multi/Exec to honor encoding parameter
+ (see `#94 <https://github.com/aio-libs/aioredis/issues/94>`_
+ and `#97 <https://github.com/aio-libs/aioredis/issues/97>`_);
+
+ * Add debug message in ``create_connection``
+ (see `#90 <https://github.com/aio-libs/aioredis/issues/90>`_);
+
+ * Replace ``asyncio.async`` calls with wrapper that respects asyncio version
+ (see `#101 <https://github.com/aio-libs/aioredis/issues/101>`_);
+
+ * Use NODELAY option for TCP sockets
+ (see `#105 <https://github.com/aio-libs/aioredis/issues/105>`_);
+
+ * New ``aioredis.ConnectionClosedError`` exception added. Raised if
+ connection to Redis server is lost
+ (see `#108 <https://github.com/aio-libs/aioredis/issues/108>`_
+ and `#109 <https://github.com/aio-libs/aioredis/issues/109>`_);
+
+ * Fix RedisPool to close and drop connection in subscribe mode on release;
+
+ * Fix ``aioredis.util.decode`` to recursively decode list responses;
+
+ * More examples added and docs updated;
+
+ * Add google groups link to README;
+
+ * Bump year in LICENSE and docs;
+
+
+
0.2.4 (2015-10-13)
^^^^^^^^^^^^^^^^^^
@@ -141,7 +191,8 @@ Description: aioredis
- New scan commands API (``iscan``, ``izscan``, ``ihscan``);
- - Pool made awaitable (allowing ``with await pool: ...`` constructs);
+ - Pool made awaitable (allowing ``with await pool: ...`` and ``async
+ with pool.get() as conn:`` constructs);
* Fixed dropping closed connections from free pool
(see `#83 <https://github.com/aio-libs/aioredis/issues/83>`_);
@@ -172,7 +223,7 @@ Description: aioredis
0.2.2 (2015-07-07)
^^^^^^^^^^^^^^^^^^
- * Decoding data with ``encoding`` paramter now takes into account
+ * Decoding data with ``encoding`` parameter now takes into account
list (array) replies
(see `#68 <https://github.com/aio-libs/aioredis/pull/68>`_);
diff --git a/aioredis.egg-info/requires.txt b/aioredis.egg-info/requires.txt
index 01e7805..ebc4fdc 100644
--- a/aioredis.egg-info/requires.txt
+++ b/aioredis.egg-info/requires.txt
@@ -1 +1 @@
-hiredis
+hiredis
\ No newline at end of file
diff --git a/aioredis/__init__.py b/aioredis/__init__.py
index c825638..b7e7d09 100644
--- a/aioredis/__init__.py
+++ b/aioredis/__init__.py
@@ -3,19 +3,20 @@ from .commands import Redis, create_redis, create_reconnecting_redis
from .pool import RedisPool, create_pool
from .util import Channel
from .errors import (
- RedisError,
+ ConnectionClosedError,
+ MultiExecError,
+ PipelineError,
ProtocolError,
+ RedisError,
ReplyError,
- PipelineError,
- MultiExecError,
)
-__version__ = '0.2.4'
+__version__ = '0.2.5'
# make pyflakes happy
(create_connection, RedisConnection,
create_redis, create_reconnecting_redis, Redis,
create_pool, RedisPool, Channel,
RedisError, ProtocolError, ReplyError,
- PipelineError, MultiExecError)
+ PipelineError, MultiExecError, ConnectionClosedError)
diff --git a/aioredis/commands/transaction.py b/aioredis/commands/transaction.py
index 4e78b6e..505b36b 100644
--- a/aioredis/commands/transaction.py
+++ b/aioredis/commands/transaction.py
@@ -2,7 +2,7 @@ import asyncio
import functools
from ..errors import RedisError, PipelineError, MultiExecError
-from ..util import wait_ok
+from ..util import wait_ok, async_task
class TransactionsCommandsMixin:
@@ -133,7 +133,7 @@ class Pipeline:
@functools.wraps(attr)
def wrapper(*args, **kw):
try:
- task = asyncio.async(attr(*args, **kw), loop=self._loop)
+ task = async_task(attr(*args, **kw), loop=self._loop)
except Exception as exc:
task = asyncio.Future(loop=self._loop)
task.set_exception(exc)
diff --git a/aioredis/connection.py b/aioredis/connection.py
index f89b439..05bc266 100644
--- a/aioredis/connection.py
+++ b/aioredis/connection.py
@@ -1,6 +1,7 @@
import types
import asyncio
import hiredis
+import socket
from functools import partial
from collections import deque
@@ -10,8 +11,14 @@ from .util import (
coerced_keys_dict,
Channel,
decode,
+ async_task
+ )
+from .errors import (
+ ConnectionClosedError,
+ RedisError,
+ ProtocolError,
+ ReplyError
)
-from .errors import RedisError, ProtocolError, ReplyError
from .log import logger
@@ -49,9 +56,14 @@ def create_connection(address, *, db=None, password=None,
if isinstance(address, (list, tuple)):
host, port = address
+ logger.debug("Creating tcp connection to %r", address)
reader, writer = yield from asyncio.open_connection(
host, port, loop=loop)
+ sock = writer.transport.get_extra_info('socket')
+ if sock is not None:
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
else:
+ logger.debug("Creating unix connection to %r", address)
reader, writer = yield from asyncio.open_unix_connection(
address, loop=loop)
conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)
@@ -80,14 +92,14 @@ class RedisConnection:
self._waiters = deque()
self._parser = hiredis.Reader(protocolError=ProtocolError,
replyError=ReplyError)
- self._reader_task = asyncio.Task(self._read_data(), loop=self._loop)
+ self._reader_task = async_task(self._read_data(), loop=self._loop)
self._db = 0
self._closing = False
self._closed = False
self._close_waiter = asyncio.Future(loop=self._loop)
self._reader_task.add_done_callback(self._close_waiter.set_result)
- self._in_transaction = False
- self._transaction_error = None
+ self._in_transaction = None
+ self._transaction_error = None # XXX: never used?
self._in_pubsub = 0
self._pubsub_channels = coerced_keys_dict()
self._pubsub_patterns = coerced_keys_dict()
@@ -118,7 +130,7 @@ class RedisConnection:
# so connection must be closed
self._closing = True
self._loop.call_soon(self._do_close, exc)
- if self._in_transaction:
+ if self._in_transaction is not None:
self._transaction_error = exc
return
else:
@@ -141,7 +153,7 @@ class RedisConnection:
return
if isinstance(obj, RedisError):
waiter.set_exception(obj)
- if self._in_transaction:
+ if self._in_transaction is not None:
self._transaction_error = obj
else:
if encoding is not None:
@@ -150,9 +162,15 @@ class RedisConnection:
except Exception as exc:
waiter.set_exception(exc)
return
- waiter.set_result(obj)
if cb is not None:
- cb(obj)
+ try:
+ obj = cb(obj)
+ except Exception as exc:
+ waiter.set_exception(exc)
+ return
+ waiter.set_result(obj)
+ if self._in_transaction is not None:
+ self._in_transaction.append((encoding, cb))
def _process_pubsub(self, obj, *, _process_waiters=True):
"""Processes pubsub messages."""
@@ -195,8 +213,8 @@ class RedisConnection:
* ProtocolError when response can not be decoded meaning connection
is broken.
"""
- assert self._reader and not self._reader.at_eof(), (
- "Connection closed or corrupted")
+ if self._reader is None or self._reader.at_eof():
+ raise ConnectionClosedError("Connection closed or corrupted")
if command is None:
raise TypeError("command must not be None")
if None in set(args):
@@ -213,8 +231,10 @@ class RedisConnection:
cb = partial(self._set_db, args=args)
elif command in ('MULTI', b'MULTI'):
cb = self._start_transaction
- elif command in ('EXEC', b'EXEC', 'DISCARD', b'DISCARD'):
- cb = self._end_transaction
+ elif command in ('EXEC', b'EXEC'):
+ cb = partial(self._end_transaction, discard=False)
+ elif command in ('DISCARD', b'DISCARD'):
+ cb = partial(self._end_transaction, discard=True)
else:
cb = None
if encoding is _NOTSET:
@@ -267,7 +287,14 @@ class RedisConnection:
waiter.cancel()
else:
waiter.set_exception(exc)
- # TODO: close all subscribed channels
+ while self._pubsub_channels:
+ _, ch = self._pubsub_channels.popitem()
+ logger.debug("Closing pubsub channel %r", ch)
+ ch.close()
+ while self._pubsub_patterns:
+ _, ch = self._pubsub_patterns.popitem()
+ logger.debug("Closing pubsub pattern %r", ch)
+ ch.close()
@property
def closed(self):
@@ -305,27 +332,47 @@ class RedisConnection:
def _set_db(self, ok, args):
assert ok in {b'OK', 'OK'}, ok
self._db = args[0]
+ return ok
def _start_transaction(self, ok):
- assert not self._in_transaction
- self._in_transaction = True
+ assert self._in_transaction is None
+ self._in_transaction = deque()
self._transaction_error = None
+ return ok
- def _end_transaction(self, ok):
- assert self._in_transaction
- self._in_transaction = False
+ def _end_transaction(self, obj, discard):
+ assert self._in_transaction is not None
self._transaction_error = None
+ recall, self._in_transaction = self._in_transaction, None
+ recall.popleft() # ignore first (its _start_transaction)
+ if discard:
+ return obj
+ assert len(obj) == len(recall), (obj, recall)
+ res = []
+ for o, (encoding, cb) in zip(obj, recall):
+ if not isinstance(o, RedisError):
+ try:
+ if encoding:
+ o = decode(o, encoding)
+ if cb:
+ o = cb(o)
+ except Exception as err:
+ res.append(err)
+ continue
+ res.append(o)
+ return res
def _update_pubsub(self, obj):
*head, subscriptions = obj
self._in_pubsub, was_in_pubsub = subscriptions, self._in_pubsub
if not was_in_pubsub:
self._process_pubsub(obj, _process_waiters=False)
+ return obj
@property
def in_transaction(self):
"""Set to True when MULTI command was issued."""
- return self._in_transaction
+ return self._in_transaction is not None
@property
def in_pubsub(self):
diff --git a/aioredis/errors.py b/aioredis/errors.py
index c540572..ae2043e 100644
--- a/aioredis/errors.py
+++ b/aioredis/errors.py
@@ -4,6 +4,7 @@ __all__ = [
'ReplyError',
'PipelineError',
'MultiExecError',
+ 'ConnectionClosedError',
]
@@ -34,4 +35,7 @@ class ChannelClosedError(RedisError):
"""Raised when Pub/Sub channel is unsubscribed and messages queue is empty.
"""
-# TODO: add ConnectionClosed exception.
+
+class ConnectionClosedError(RedisError):
+ """Raised if connection to server was closed.
+ """
diff --git a/aioredis/pool.py b/aioredis/pool.py
index 0ff360f..75315f0 100644
--- a/aioredis/pool.py
+++ b/aioredis/pool.py
@@ -4,6 +4,7 @@ import sys
from .commands import create_redis, Redis
from .log import logger
+from .util import async_task
PY_35 = sys.version_info >= (3, 5)
@@ -136,8 +137,12 @@ class RedisPool:
self._used.remove(conn)
if not conn.closed:
if conn.in_transaction:
- logger.warning("Connection %r in transaction, closing it.",
- conn)
+ logger.warning(
+ "Connection %r is in transaction, closing it.", conn)
+ conn.close()
+ elif conn.in_pubsub:
+ logger.warning(
+ "Connection %r is in subscribe mode, closing it.", conn)
conn.close()
elif conn.db == self.db:
if self.maxsize and self.freesize < self.maxsize:
@@ -148,7 +153,7 @@ class RedisPool:
else:
conn.close()
# FIXME: check event loop is not closed
- asyncio.async(self._wakeup(), loop=self._loop)
+ async_task(self._wakeup(), loop=self._loop)
def _drop_closed(self):
for i in range(self.freesize):
diff --git a/aioredis/util.py b/aioredis/util.py
index a77a3ea..308affc 100644
--- a/aioredis/util.py
+++ b/aioredis/util.py
@@ -49,8 +49,7 @@ def decode(obj, encoding):
if isinstance(obj, bytes):
return obj.decode(encoding)
elif isinstance(obj, list):
- return [o.decode(encoding) if isinstance(o, bytes) else o
- for o in obj]
+ return [decode(o, encoding) for o in obj]
return obj
@@ -90,7 +89,7 @@ class Channel:
Can be used with ``while``:
>>> ch = conn.pubsub_channels['chan:1']
- >>> while ch.is_active():
+ >>> while ch.is_active:
... msg = yield from ch.get() # may stuck for a long time
"""
@@ -104,9 +103,18 @@ class Channel:
"""
assert decoder is None or callable(decoder), decoder
if not self.is_active:
+ if self._queue.qsize() == 1:
+ msg = self._queue.get_nowait()
+ assert msg is None, msg
+ return
raise ChannelClosedError()
msg = yield from self._queue.get()
if msg is None:
+ # TODO: maybe we need an explicit marker for "end of stream"
+ # currently, returning None may overlap with
+ # possible return value from `decoder`
+ # so the user would have to check `ch.is_active`
+ # to determine if its EoS or payload
return
if self._is_pattern:
dest_channel, msg = msg
@@ -123,6 +131,18 @@ class Channel:
"""Shortcut to get JSON messages."""
return (yield from self.get(encoding=encoding, decoder=json.loads))
+ if PY_35:
+ def iter(self, *, encoding=None, decoder=None):
+ """Same as get method but its native coroutine.
+
+ Usage example:
+
+ >>> async for msg in ch.iter():
+ ... print(msg)
+ """
+ return _ChannelIter(self, encoding=encoding,
+ decoder=decoder)
+
@asyncio.coroutine
def wait_message(self):
"""Waits for message to become available in channel.
@@ -147,6 +167,10 @@ class Channel:
self._queue.put_nowait(data)
if self._waiter is not None:
fut, self._waiter = self._waiter, None
+ if fut.done():
+ assert fut.cancelled(), (
+ "Waiting future is in wrong state", self, fut)
+ return
fut.set_result(None)
def close(self):
@@ -235,3 +259,31 @@ if PY_35:
else:
ret = self._ret.pop(0)
return ret
+
+ class _ChannelIter:
+
+ __slots__ = ('_ch', '_args', '_kw')
+
+ def __init__(self, ch, *args, **kw):
+ self._ch = ch
+ self._args = args
+ self._kw = kw
+
+ @asyncio.coroutine
+ def __aiter__(self):
+ return self
+
+ @asyncio.coroutine
+ def __anext__(self):
+ if not self._ch.is_active:
+ raise StopAsyncIteration # noqa
+ msg = yield from self._ch.get(*self._args, **self._kw)
+ if msg is None:
+ raise StopAsyncIteration # noqa
+ return msg
+
+
+if hasattr(asyncio, 'ensure_future'):
+ async_task = asyncio.ensure_future
+else:
+ async_task = asyncio.async
diff --git a/setup.cfg b/setup.cfg
index ebbec92..6bc2ff3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,5 +1,5 @@
[egg_info]
+tag_date = 0
tag_build =
tag_svn_revision = 0
-tag_date = 0
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/aioredis.git
More information about the Python-modules-commits
mailing list