[Python-modules-commits] [aioredis] 01/05: Import aioredis_0.2.5.orig.tar.gz

Piotr Ożarowski piotr at moszumanska.debian.org
Thu Mar 3 20:10:31 UTC 2016


This is an automated email from the git hooks/post-receive script.

piotr pushed a commit to branch master
in repository aioredis.

commit 125cad2e870eedcd58ca9393dfa6fd578da166d0
Author: Piotr Ożarowski <piotr at debian.org>
Date:   Thu Mar 3 20:55:36 2016 +0100

    Import aioredis_0.2.5.orig.tar.gz
---
 CHANGES.txt                      | 51 +++++++++++++++++++++++-
 LICENSE                          |  4 +-
 PKG-INFO                         | 61 +++++++++++++++++++++++++---
 README.rst                       |  8 +++-
 aioredis.egg-info/PKG-INFO       | 61 +++++++++++++++++++++++++---
 aioredis.egg-info/requires.txt   |  2 +-
 aioredis/__init__.py             | 11 +++---
 aioredis/commands/transaction.py |  4 +-
 aioredis/connection.py           | 85 +++++++++++++++++++++++++++++++---------
 aioredis/errors.py               |  6 ++-
 aioredis/pool.py                 | 11 ++++--
 aioredis/util.py                 | 58 +++++++++++++++++++++++++--
 setup.cfg                        |  2 +-
 13 files changed, 313 insertions(+), 51 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 7de3631..c703b17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,52 @@
 Changes
 -------
 
+0.2.5 (2016-03-02)
+^^^^^^^^^^^^^^^^^^
+
+* Close all Pub/Sub channels on connection close
+  (see `#88 <https://github.com/aio-libs/aioredis/issues/88>`_);
+
+* Add ``iter()`` method to ``aioredis.Channel`` allowing to use it
+  with ``async for``
+  (see `#89 <https://github.com/aio-libs/aioredis/issues/89>`_);
+
+* Inline code samples in docs made runnable and downloadable
+  (see `#92 <https://github.com/aio-libs/aioredis/issues/92>`_);
+
+* Python 3.5 examples converted to use ``async``/``await`` syntax
+  (see `#93 <https://github.com/aio-libs/aioredis/issues/93>`_);
+
+* Fix Multi/Exec to honor encoding parameter
+  (see `#94 <https://github.com/aio-libs/aioredis/issues/94>`_
+  and `#97 <https://github.com/aio-libs/aioredis/issues/97>`_);
+
+* Add debug message in ``create_connection``
+  (see `#90 <https://github.com/aio-libs/aioredis/issues/90>`_);
+
+* Replace ``asyncio.async`` calls with wrapper that respects asyncio version
+  (see `#101 <https://github.com/aio-libs/aioredis/issues/101>`_);
+
+* Use NODELAY option for TCP sockets
+  (see `#105 <https://github.com/aio-libs/aioredis/issues/105>`_);
+
+* New ``aioredis.ConnectionClosedError`` exception added. Raised if
+  connection to Redis server is lost
+  (see `#108 <https://github.com/aio-libs/aioredis/issues/108>`_
+  and `#109 <https://github.com/aio-libs/aioredis/issues/109>`_);
+
+* Fix RedisPool to close and drop connection in subscribe mode on release;
+
+* Fix ``aioredis.util.decode`` to recursively decode list responses;
+
+* More examples added and docs updated;
+
+* Add google groups link to README;
+
+* Bump year in LICENSE and docs;
+
+
+
 0.2.4 (2015-10-13)
 ^^^^^^^^^^^^^^^^^^
 
@@ -8,7 +54,8 @@ Changes
 
   - New scan commands API (``iscan``, ``izscan``, ``ihscan``);
 
-  - Pool made awaitable (allowing ``with await pool: ...`` constructs);
+  - Pool made awaitable (allowing ``with await pool: ...`` and ``async
+    with pool.get() as conn:`` constructs);
 
 * Fixed dropping closed connections from free pool
   (see `#83 <https://github.com/aio-libs/aioredis/issues/83>`_);
@@ -39,7 +86,7 @@ Changes
 0.2.2 (2015-07-07)
 ^^^^^^^^^^^^^^^^^^
 
-* Decoding data with ``encoding`` paramter now takes into account
+* Decoding data with ``encoding`` parameter now takes into account
   list (array) replies
   (see `#68 <https://github.com/aio-libs/aioredis/pull/68>`_);
 
diff --git a/LICENSE b/LICENSE
index 435733f..270d29e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,6 +1,6 @@
 The MIT License (MIT)
 
-Copyright (c) 2014 Alexey Popravka
+Copyright (c) 2014-2016 Alexey Popravka
 
 Permission is hereby granted, free of charge, to any person obtaining a copy
 of this software and associated documentation files (the "Software"), to deal
@@ -18,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
\ No newline at end of file
+SOFTWARE.
diff --git a/PKG-INFO b/PKG-INFO
index 7b5eb0e..ea61354 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: aioredis
-Version: 0.2.4
+Version: 0.2.5
 Summary: asyncio (PEP 3156) Redis support
 Home-page: https://github.com/aio-libs/aioredis
 Author: Alexey Popravka
@@ -15,8 +15,8 @@ Description: aioredis
            :target: https://travis-ci.org/aio-libs/aioredis
         
         
-        .. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.png?branch=master
-           :target: https://coveralls.io/r/aio-libs/aioredis?branch=master
+        .. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.svg?branch=master&service=github
+           :target: https://coveralls.io/github/aio-libs/aioredis?branch=master
         
         Features
         --------
@@ -118,6 +118,10 @@ Description: aioredis
             hiredis is preferred requirement.
             Pure-python fallback protocol parser is TBD.
         
+        Discussion list
+        ---------------
+        
+        *aio-libs* google group: https://groups.google.com/forum/#!forum/aio-libs
         
         License
         -------
@@ -134,6 +138,52 @@ Description: aioredis
         Changes
         -------
         
+        0.2.5 (2016-03-02)
+        ^^^^^^^^^^^^^^^^^^
+        
+        * Close all Pub/Sub channels on connection close
+          (see `#88 <https://github.com/aio-libs/aioredis/issues/88>`_);
+        
+        * Add ``iter()`` method to ``aioredis.Channel`` allowing to use it
+          with ``async for``
+          (see `#89 <https://github.com/aio-libs/aioredis/issues/89>`_);
+        
+        * Inline code samples in docs made runnable and downloadable
+          (see `#92 <https://github.com/aio-libs/aioredis/issues/92>`_);
+        
+        * Python 3.5 examples converted to use ``async``/``await`` syntax
+          (see `#93 <https://github.com/aio-libs/aioredis/issues/93>`_);
+        
+        * Fix Multi/Exec to honor encoding parameter
+          (see `#94 <https://github.com/aio-libs/aioredis/issues/94>`_
+          and `#97 <https://github.com/aio-libs/aioredis/issues/97>`_);
+        
+        * Add debug message in ``create_connection``
+          (see `#90 <https://github.com/aio-libs/aioredis/issues/90>`_);
+        
+        * Replace ``asyncio.async`` calls with wrapper that respects asyncio version
+          (see `#101 <https://github.com/aio-libs/aioredis/issues/101>`_);
+        
+        * Use NODELAY option for TCP sockets
+          (see `#105 <https://github.com/aio-libs/aioredis/issues/105>`_);
+        
+        * New ``aioredis.ConnectionClosedError`` exception added. Raised if
+          connection to Redis server is lost
+          (see `#108 <https://github.com/aio-libs/aioredis/issues/108>`_
+          and `#109 <https://github.com/aio-libs/aioredis/issues/109>`_);
+        
+        * Fix RedisPool to close and drop connection in subscribe mode on release;
+        
+        * Fix ``aioredis.util.decode`` to recursively decode list responses;
+        
+        * More examples added and docs updated;
+        
+        * Add google groups link to README;
+        
+        * Bump year in LICENSE and docs;
+        
+        
+        
         0.2.4 (2015-10-13)
         ^^^^^^^^^^^^^^^^^^
         
@@ -141,7 +191,8 @@ Description: aioredis
         
           - New scan commands API (``iscan``, ``izscan``, ``ihscan``);
         
-          - Pool made awaitable (allowing ``with await pool: ...`` constructs);
+          - Pool made awaitable (allowing ``with await pool: ...`` and ``async
+            with pool.get() as conn:`` constructs);
         
         * Fixed dropping closed connections from free pool
           (see `#83 <https://github.com/aio-libs/aioredis/issues/83>`_);
@@ -172,7 +223,7 @@ Description: aioredis
         0.2.2 (2015-07-07)
         ^^^^^^^^^^^^^^^^^^
         
-        * Decoding data with ``encoding`` paramter now takes into account
+        * Decoding data with ``encoding`` parameter now takes into account
           list (array) replies
           (see `#68 <https://github.com/aio-libs/aioredis/pull/68>`_);
         
diff --git a/README.rst b/README.rst
index f448791..c1f01a2 100644
--- a/README.rst
+++ b/README.rst
@@ -7,8 +7,8 @@ asyncio (PEP 3156) Redis client library.
    :target: https://travis-ci.org/aio-libs/aioredis
 
 
-.. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.png?branch=master
-   :target: https://coveralls.io/r/aio-libs/aioredis?branch=master
+.. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.svg?branch=master&service=github
+   :target: https://coveralls.io/github/aio-libs/aioredis?branch=master
 
 Features
 --------
@@ -110,6 +110,10 @@ Requirements
     hiredis is preferred requirement.
     Pure-python fallback protocol parser is TBD.
 
+Discussion list
+---------------
+
+*aio-libs* google group: https://groups.google.com/forum/#!forum/aio-libs
 
 License
 -------
diff --git a/aioredis.egg-info/PKG-INFO b/aioredis.egg-info/PKG-INFO
index 7b5eb0e..ea61354 100644
--- a/aioredis.egg-info/PKG-INFO
+++ b/aioredis.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.1
 Name: aioredis
-Version: 0.2.4
+Version: 0.2.5
 Summary: asyncio (PEP 3156) Redis support
 Home-page: https://github.com/aio-libs/aioredis
 Author: Alexey Popravka
@@ -15,8 +15,8 @@ Description: aioredis
            :target: https://travis-ci.org/aio-libs/aioredis
         
         
-        .. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.png?branch=master
-           :target: https://coveralls.io/r/aio-libs/aioredis?branch=master
+        .. image:: https://coveralls.io/repos/aio-libs/aioredis/badge.svg?branch=master&service=github
+           :target: https://coveralls.io/github/aio-libs/aioredis?branch=master
         
         Features
         --------
@@ -118,6 +118,10 @@ Description: aioredis
             hiredis is preferred requirement.
             Pure-python fallback protocol parser is TBD.
         
+        Discussion list
+        ---------------
+        
+        *aio-libs* google group: https://groups.google.com/forum/#!forum/aio-libs
         
         License
         -------
@@ -134,6 +138,52 @@ Description: aioredis
         Changes
         -------
         
+        0.2.5 (2016-03-02)
+        ^^^^^^^^^^^^^^^^^^
+        
+        * Close all Pub/Sub channels on connection close
+          (see `#88 <https://github.com/aio-libs/aioredis/issues/88>`_);
+        
+        * Add ``iter()`` method to ``aioredis.Channel`` allowing to use it
+          with ``async for``
+          (see `#89 <https://github.com/aio-libs/aioredis/issues/89>`_);
+        
+        * Inline code samples in docs made runnable and downloadable
+          (see `#92 <https://github.com/aio-libs/aioredis/issues/92>`_);
+        
+        * Python 3.5 examples converted to use ``async``/``await`` syntax
+          (see `#93 <https://github.com/aio-libs/aioredis/issues/93>`_);
+        
+        * Fix Multi/Exec to honor encoding parameter
+          (see `#94 <https://github.com/aio-libs/aioredis/issues/94>`_
+          and `#97 <https://github.com/aio-libs/aioredis/issues/97>`_);
+        
+        * Add debug message in ``create_connection``
+          (see `#90 <https://github.com/aio-libs/aioredis/issues/90>`_);
+        
+        * Replace ``asyncio.async`` calls with wrapper that respects asyncio version
+          (see `#101 <https://github.com/aio-libs/aioredis/issues/101>`_);
+        
+        * Use NODELAY option for TCP sockets
+          (see `#105 <https://github.com/aio-libs/aioredis/issues/105>`_);
+        
+        * New ``aioredis.ConnectionClosedError`` exception added. Raised if
+          connection to Redis server is lost
+          (see `#108 <https://github.com/aio-libs/aioredis/issues/108>`_
+          and `#109 <https://github.com/aio-libs/aioredis/issues/109>`_);
+        
+        * Fix RedisPool to close and drop connection in subscribe mode on release;
+        
+        * Fix ``aioredis.util.decode`` to recursively decode list responses;
+        
+        * More examples added and docs updated;
+        
+        * Add google groups link to README;
+        
+        * Bump year in LICENSE and docs;
+        
+        
+        
         0.2.4 (2015-10-13)
         ^^^^^^^^^^^^^^^^^^
         
@@ -141,7 +191,8 @@ Description: aioredis
         
           - New scan commands API (``iscan``, ``izscan``, ``ihscan``);
         
-          - Pool made awaitable (allowing ``with await pool: ...`` constructs);
+          - Pool made awaitable (allowing ``with await pool: ...`` and ``async
+            with pool.get() as conn:`` constructs);
         
         * Fixed dropping closed connections from free pool
           (see `#83 <https://github.com/aio-libs/aioredis/issues/83>`_);
@@ -172,7 +223,7 @@ Description: aioredis
         0.2.2 (2015-07-07)
         ^^^^^^^^^^^^^^^^^^
         
-        * Decoding data with ``encoding`` paramter now takes into account
+        * Decoding data with ``encoding`` parameter now takes into account
           list (array) replies
           (see `#68 <https://github.com/aio-libs/aioredis/pull/68>`_);
         
diff --git a/aioredis.egg-info/requires.txt b/aioredis.egg-info/requires.txt
index 01e7805..ebc4fdc 100644
--- a/aioredis.egg-info/requires.txt
+++ b/aioredis.egg-info/requires.txt
@@ -1 +1 @@
-hiredis
+hiredis
\ No newline at end of file
diff --git a/aioredis/__init__.py b/aioredis/__init__.py
index c825638..b7e7d09 100644
--- a/aioredis/__init__.py
+++ b/aioredis/__init__.py
@@ -3,19 +3,20 @@ from .commands import Redis, create_redis, create_reconnecting_redis
 from .pool import RedisPool, create_pool
 from .util import Channel
 from .errors import (
-    RedisError,
+    ConnectionClosedError,
+    MultiExecError,
+    PipelineError,
     ProtocolError,
+    RedisError,
     ReplyError,
-    PipelineError,
-    MultiExecError,
     )
 
 
-__version__ = '0.2.4'
+__version__ = '0.2.5'
 
 # make pyflakes happy
 (create_connection, RedisConnection,
  create_redis, create_reconnecting_redis, Redis,
  create_pool, RedisPool, Channel,
  RedisError, ProtocolError, ReplyError,
- PipelineError, MultiExecError)
+ PipelineError, MultiExecError, ConnectionClosedError)
diff --git a/aioredis/commands/transaction.py b/aioredis/commands/transaction.py
index 4e78b6e..505b36b 100644
--- a/aioredis/commands/transaction.py
+++ b/aioredis/commands/transaction.py
@@ -2,7 +2,7 @@ import asyncio
 import functools
 
 from ..errors import RedisError, PipelineError, MultiExecError
-from ..util import wait_ok
+from ..util import wait_ok, async_task
 
 
 class TransactionsCommandsMixin:
@@ -133,7 +133,7 @@ class Pipeline:
             @functools.wraps(attr)
             def wrapper(*args, **kw):
                 try:
-                    task = asyncio.async(attr(*args, **kw), loop=self._loop)
+                    task = async_task(attr(*args, **kw), loop=self._loop)
                 except Exception as exc:
                     task = asyncio.Future(loop=self._loop)
                     task.set_exception(exc)
diff --git a/aioredis/connection.py b/aioredis/connection.py
index f89b439..05bc266 100644
--- a/aioredis/connection.py
+++ b/aioredis/connection.py
@@ -1,6 +1,7 @@
 import types
 import asyncio
 import hiredis
+import socket
 from functools import partial
 from collections import deque
 
@@ -10,8 +11,14 @@ from .util import (
     coerced_keys_dict,
     Channel,
     decode,
+    async_task
+    )
+from .errors import (
+    ConnectionClosedError,
+    RedisError,
+    ProtocolError,
+    ReplyError
     )
-from .errors import RedisError, ProtocolError, ReplyError
 from .log import logger
 
 
@@ -49,9 +56,14 @@ def create_connection(address, *, db=None, password=None,
 
     if isinstance(address, (list, tuple)):
         host, port = address
+        logger.debug("Creating tcp connection to %r", address)
         reader, writer = yield from asyncio.open_connection(
             host, port, loop=loop)
+        sock = writer.transport.get_extra_info('socket')
+        if sock is not None:
+            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
     else:
+        logger.debug("Creating unix connection to %r", address)
         reader, writer = yield from asyncio.open_unix_connection(
             address, loop=loop)
     conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)
@@ -80,14 +92,14 @@ class RedisConnection:
         self._waiters = deque()
         self._parser = hiredis.Reader(protocolError=ProtocolError,
                                       replyError=ReplyError)
-        self._reader_task = asyncio.Task(self._read_data(), loop=self._loop)
+        self._reader_task = async_task(self._read_data(), loop=self._loop)
         self._db = 0
         self._closing = False
         self._closed = False
         self._close_waiter = asyncio.Future(loop=self._loop)
         self._reader_task.add_done_callback(self._close_waiter.set_result)
-        self._in_transaction = False
-        self._transaction_error = None
+        self._in_transaction = None
+        self._transaction_error = None  # XXX: never used?
         self._in_pubsub = 0
         self._pubsub_channels = coerced_keys_dict()
         self._pubsub_patterns = coerced_keys_dict()
@@ -118,7 +130,7 @@ class RedisConnection:
                     # so connection must be closed
                     self._closing = True
                     self._loop.call_soon(self._do_close, exc)
-                    if self._in_transaction:
+                    if self._in_transaction is not None:
                         self._transaction_error = exc
                     return
                 else:
@@ -141,7 +153,7 @@ class RedisConnection:
             return
         if isinstance(obj, RedisError):
             waiter.set_exception(obj)
-            if self._in_transaction:
+            if self._in_transaction is not None:
                 self._transaction_error = obj
         else:
             if encoding is not None:
@@ -150,9 +162,15 @@ class RedisConnection:
                 except Exception as exc:
                     waiter.set_exception(exc)
                     return
-            waiter.set_result(obj)
             if cb is not None:
-                cb(obj)
+                try:
+                    obj = cb(obj)
+                except Exception as exc:
+                    waiter.set_exception(exc)
+                    return
+            waiter.set_result(obj)
+            if self._in_transaction is not None:
+                self._in_transaction.append((encoding, cb))
 
     def _process_pubsub(self, obj, *, _process_waiters=True):
         """Processes pubsub messages."""
@@ -195,8 +213,8 @@ class RedisConnection:
         * ProtocolError when response can not be decoded meaning connection
           is broken.
         """
-        assert self._reader and not self._reader.at_eof(), (
-            "Connection closed or corrupted")
+        if self._reader is None or self._reader.at_eof():
+            raise ConnectionClosedError("Connection closed or corrupted")
         if command is None:
             raise TypeError("command must not be None")
         if None in set(args):
@@ -213,8 +231,10 @@ class RedisConnection:
             cb = partial(self._set_db, args=args)
         elif command in ('MULTI', b'MULTI'):
             cb = self._start_transaction
-        elif command in ('EXEC', b'EXEC', 'DISCARD', b'DISCARD'):
-            cb = self._end_transaction
+        elif command in ('EXEC', b'EXEC'):
+            cb = partial(self._end_transaction, discard=False)
+        elif command in ('DISCARD', b'DISCARD'):
+            cb = partial(self._end_transaction, discard=True)
         else:
             cb = None
         if encoding is _NOTSET:
@@ -267,7 +287,14 @@ class RedisConnection:
                 waiter.cancel()
             else:
                 waiter.set_exception(exc)
-        # TODO: close all subscribed channels
+        while self._pubsub_channels:
+            _, ch = self._pubsub_channels.popitem()
+            logger.debug("Closing pubsub channel %r", ch)
+            ch.close()
+        while self._pubsub_patterns:
+            _, ch = self._pubsub_patterns.popitem()
+            logger.debug("Closing pubsub pattern %r", ch)
+            ch.close()
 
     @property
     def closed(self):
@@ -305,27 +332,47 @@ class RedisConnection:
     def _set_db(self, ok, args):
         assert ok in {b'OK', 'OK'}, ok
         self._db = args[0]
+        return ok
 
     def _start_transaction(self, ok):
-        assert not self._in_transaction
-        self._in_transaction = True
+        assert self._in_transaction is None
+        self._in_transaction = deque()
         self._transaction_error = None
+        return ok
 
-    def _end_transaction(self, ok):
-        assert self._in_transaction
-        self._in_transaction = False
+    def _end_transaction(self, obj, discard):
+        assert self._in_transaction is not None
         self._transaction_error = None
+        recall, self._in_transaction = self._in_transaction, None
+        recall.popleft()  # ignore first (its _start_transaction)
+        if discard:
+            return obj
+        assert len(obj) == len(recall), (obj, recall)
+        res = []
+        for o, (encoding, cb) in zip(obj, recall):
+            if not isinstance(o, RedisError):
+                try:
+                    if encoding:
+                        o = decode(o, encoding)
+                    if cb:
+                        o = cb(o)
+                except Exception as err:
+                    res.append(err)
+                    continue
+            res.append(o)
+        return res
 
     def _update_pubsub(self, obj):
         *head, subscriptions = obj
         self._in_pubsub, was_in_pubsub = subscriptions, self._in_pubsub
         if not was_in_pubsub:
             self._process_pubsub(obj, _process_waiters=False)
+        return obj
 
     @property
     def in_transaction(self):
         """Set to True when MULTI command was issued."""
-        return self._in_transaction
+        return self._in_transaction is not None
 
     @property
     def in_pubsub(self):
diff --git a/aioredis/errors.py b/aioredis/errors.py
index c540572..ae2043e 100644
--- a/aioredis/errors.py
+++ b/aioredis/errors.py
@@ -4,6 +4,7 @@ __all__ = [
     'ReplyError',
     'PipelineError',
     'MultiExecError',
+    'ConnectionClosedError',
     ]
 
 
@@ -34,4 +35,7 @@ class ChannelClosedError(RedisError):
     """Raised when Pub/Sub channel is unsubscribed and messages queue is empty.
     """
 
-# TODO: add ConnectionClosed exception.
+
+class ConnectionClosedError(RedisError):
+    """Raised if connection to server was closed.
+    """
diff --git a/aioredis/pool.py b/aioredis/pool.py
index 0ff360f..75315f0 100644
--- a/aioredis/pool.py
+++ b/aioredis/pool.py
@@ -4,6 +4,7 @@ import sys
 
 from .commands import create_redis, Redis
 from .log import logger
+from .util import async_task
 
 
 PY_35 = sys.version_info >= (3, 5)
@@ -136,8 +137,12 @@ class RedisPool:
         self._used.remove(conn)
         if not conn.closed:
             if conn.in_transaction:
-                logger.warning("Connection %r in transaction, closing it.",
-                               conn)
+                logger.warning(
+                    "Connection %r is in transaction, closing it.", conn)
+                conn.close()
+            elif conn.in_pubsub:
+                logger.warning(
+                    "Connection %r is in subscribe mode, closing it.", conn)
                 conn.close()
             elif conn.db == self.db:
                 if self.maxsize and self.freesize < self.maxsize:
@@ -148,7 +153,7 @@ class RedisPool:
             else:
                 conn.close()
         # FIXME: check event loop is not closed
-        asyncio.async(self._wakeup(), loop=self._loop)
+        async_task(self._wakeup(), loop=self._loop)
 
     def _drop_closed(self):
         for i in range(self.freesize):
diff --git a/aioredis/util.py b/aioredis/util.py
index a77a3ea..308affc 100644
--- a/aioredis/util.py
+++ b/aioredis/util.py
@@ -49,8 +49,7 @@ def decode(obj, encoding):
     if isinstance(obj, bytes):
         return obj.decode(encoding)
     elif isinstance(obj, list):
-        return [o.decode(encoding) if isinstance(o, bytes) else o
-                for o in obj]
+        return [decode(o, encoding) for o in obj]
     return obj
 
 
@@ -90,7 +89,7 @@ class Channel:
         Can be used with ``while``:
 
         >>> ch = conn.pubsub_channels['chan:1']
-        >>> while ch.is_active():
+        >>> while ch.is_active:
         ...     msg = yield from ch.get()   # may stuck for a long time
 
         """
@@ -104,9 +103,18 @@ class Channel:
         """
         assert decoder is None or callable(decoder), decoder
         if not self.is_active:
+            if self._queue.qsize() == 1:
+                msg = self._queue.get_nowait()
+                assert msg is None, msg
+                return
             raise ChannelClosedError()
         msg = yield from self._queue.get()
         if msg is None:
+            # TODO: maybe we need an explicit marker for "end of stream"
+            #       currently, returning None may overlap with
+            #       possible return value from `decoder`
+            #       so the user would have to check `ch.is_active`
+            #       to determine if its EoS or payload
             return
         if self._is_pattern:
             dest_channel, msg = msg
@@ -123,6 +131,18 @@ class Channel:
         """Shortcut to get JSON messages."""
         return (yield from self.get(encoding=encoding, decoder=json.loads))
 
+    if PY_35:
+        def iter(self, *, encoding=None, decoder=None):
+            """Same as get method but its native coroutine.
+
+            Usage example:
+
+            >>> async for msg in ch.iter():
+            ...     print(msg)
+            """
+            return _ChannelIter(self, encoding=encoding,
+                                decoder=decoder)
+
     @asyncio.coroutine
     def wait_message(self):
         """Waits for message to become available in channel.
@@ -147,6 +167,10 @@ class Channel:
         self._queue.put_nowait(data)
         if self._waiter is not None:
             fut, self._waiter = self._waiter, None
+            if fut.done():
+                assert fut.cancelled(), (
+                    "Waiting future is in wrong state", self, fut)
+                return
             fut.set_result(None)
 
     def close(self):
@@ -235,3 +259,31 @@ if PY_35:
             else:
                 ret = self._ret.pop(0)
                 return ret
+
+    class _ChannelIter:
+
+        __slots__ = ('_ch', '_args', '_kw')
+
+        def __init__(self, ch, *args, **kw):
+            self._ch = ch
+            self._args = args
+            self._kw = kw
+
+        @asyncio.coroutine
+        def __aiter__(self):
+            return self
+
+        @asyncio.coroutine
+        def __anext__(self):
+            if not self._ch.is_active:
+                raise StopAsyncIteration    # noqa
+            msg = yield from self._ch.get(*self._args, **self._kw)
+            if msg is None:
+                raise StopAsyncIteration    # noqa
+            return msg
+
+
+if hasattr(asyncio, 'ensure_future'):
+    async_task = asyncio.ensure_future
+else:
+    async_task = asyncio.async
diff --git a/setup.cfg b/setup.cfg
index ebbec92..6bc2ff3 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,5 +1,5 @@
 [egg_info]
+tag_date = 0
 tag_build = 
 tag_svn_revision = 0
-tag_date = 0
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/aioredis.git



More information about the Python-modules-commits mailing list