[Python-modules-commits] [python-motor] 01/04: Import python-motor_1.2.1.orig.tar.gz

Ondrej Novy onovy at debian.org
Sun Jan 21 13:25:00 UTC 2018


This is an automated email from the git hooks/post-receive script.

onovy pushed a commit to branch master
in repository python-motor.

commit f2e9761fa3de0eae9f5385dc6e1b0bdc2d3b312e
Author: Ondřej Nový <onovy at debian.org>
Date:   Sun Jan 21 14:20:43 2018 +0100

    Import python-motor_1.2.1.orig.tar.gz
---
 doc/changelog.rst                             |  8 +++
 doc/conf.py                                   |  2 +-
 doc/examples/tornado_change_stream_example.py | 18 +++++--
 motor/__init__.py                             |  4 +-
 motor/core.py                                 | 75 +++++++++++++++++++--------
 motor/frameworks/asyncio/__init__.py          | 42 ++++++++++++---
 motor/frameworks/tornado/__init__.py          |  5 +-
 setup.py                                      |  2 +-
 8 files changed, 119 insertions(+), 37 deletions(-)

diff --git a/doc/changelog.rst b/doc/changelog.rst
index 255f7b7..dcccfa0 100644
--- a/doc/changelog.rst
+++ b/doc/changelog.rst
@@ -3,6 +3,14 @@ Changelog
 
 .. currentmodule:: motor.motor_tornado
 
+Motor 1.2.1
+-----------
+
+An asyncio application that created a Change Stream with
+:meth:`MotorCollection.watch` and shut down while the Change Stream was open
+would print several errors. I have rewritten :meth:`MotorChangeStream.next`
+and some Motor internals to allow clean shutdown with asyncio.
+
 Motor 1.2
 ---------
 
diff --git a/doc/conf.py b/doc/conf.py
index 7119062..4088c49 100644
--- a/doc/conf.py
+++ b/doc/conf.py
@@ -31,7 +31,7 @@ master_doc = 'index'
 
 # General information about the project.
 project = u'Motor'
-copyright = u'2016 MongoDB, Inc.'
+copyright = u'2016-present MongoDB, Inc.'
 
 # The version info for the project you're documenting, acts as replacement for
 # |version| and |release|, also used in various other places throughout the
diff --git a/doc/examples/tornado_change_stream_example.py b/doc/examples/tornado_change_stream_example.py
index c595dbf..ae153f8 100644
--- a/doc/examples/tornado_change_stream_example.py
+++ b/doc/examples/tornado_change_stream_example.py
@@ -83,9 +83,15 @@ class ChangesHandler(tornado.websocket.WebSocketHandler):
         ChangesHandler.update_cache(change)
 
 
+change_stream = None
+
+
 async def watch(collection):
-    async for change in collection.watch():
-        ChangesHandler.on_change(change)
+    global change_stream
+
+    async with collection.watch() as change_stream:
+        async for change in change_stream:
+            ChangesHandler.on_change(change)
 
 
 def main():
@@ -103,7 +109,13 @@ def main():
     loop = tornado.ioloop.IOLoop.current()
     # Start watching collection for changes.
     loop.add_callback(watch, collection)
-    loop.start()
+    try:
+        loop.start()
+    except KeyboardInterrupt:
+        pass
+    finally:
+        if change_stream is not None:
+            change_stream.close()
 
 
 if __name__ == "__main__":
diff --git a/motor/__init__.py b/motor/__init__.py
index 6326e7f..6110de2 100644
--- a/motor/__init__.py
+++ b/motor/__init__.py
@@ -20,12 +20,10 @@ import pymongo
 
 from motor.motor_py3_compat import text_type
 
-version_tuple = (1, 2, 0)
+version_tuple = (1, 2, 1)
 
 
 def get_version_string():
-    if isinstance(version_tuple[-1], text_type):
-        return '.'.join(map(str, version_tuple[:-1])) + version_tuple[-1]
     return '.'.join(map(str, version_tuple))
 
 
diff --git a/motor/core.py b/motor/core.py
index 7a7b00c..eddc2df 100644
--- a/motor/core.py
+++ b/motor/core.py
@@ -462,15 +462,56 @@ class AgnosticCollection(AgnosticBaseProperties):
         Returns a :class:`~MotorChangeStream` cursor which iterates over changes
         on this collection. Introduced in MongoDB 3.6.
 
+        A change stream continues waiting indefinitely for matching change
+        events. Code like the following allows a program to cancel the change
+        stream and exit.
+
         .. code-block:: python3
 
-           async with db.collection.watch() as stream:
-               async for change in stream:
-                   print(change)
+          change_stream = None
+
+          async def watch_collection():
+              global change_stream
+
+              # Using the change stream in an "async with" block
+              # ensures it is canceled promptly if your code breaks
+              # from the loop or throws an exception.
+              async with db.collection.watch() as change_stream:
+                  async for change in stream:
+                      print(change)
+
+          # Tornado
+          from tornado.ioloop import IOLoop
 
-        Using the change stream in an "async with" block as shown above ensures
-        it is canceled promptly if your code breaks from the loop or throws an
-        exception.
+          def main():
+              loop = IOLoop.current()
+              # Start watching collection for changes.
+              loop.add_callback(watch_collection)
+              try:
+                  loop.start()
+              except KeyboardInterrupt:
+                  pass
+              finally:
+                  if change_stream is not None:
+                      change_stream.close()
+
+          # asyncio
+          from asyncio import get_event_loop
+
+          def main():
+              loop = get_event_loop()
+              task = loop.create_task(watch_collection)
+
+              try:
+                  loop.run_forever()
+              except KeyboardInterrupt:
+                  pass
+              finally:
+                  if change_stream is not None:
+                      change_stream.close()
+
+                  # Prevent "Task was destroyed but it is pending!"
+                  loop.run_until_complete(task)
 
         The :class:`~MotorChangeStream` async iterable blocks
         until the next change document is returned or an error is raised. If
@@ -1248,22 +1289,15 @@ class AgnosticChangeStream(AgnosticBase):
                         'collation': collation,
                         'session': session}
 
-    def _next(self, future):
-        # This method is run on a thread. asyncio prohibits future.set_exception
-        # with a StopIteration, so we must handle this operation differently
-        # from other async methods.
+    def _next(self):
+        # This method is run on a thread.
         try:
             if not self.delegate:
                 self.delegate = self._collection.delegate.watch(**self._kwargs)
 
-            change = self.delegate.next()
-            self._framework.call_soon(self.get_io_loop(),
-                                      future.set_result,
-                                      change)
+            return self.delegate.next()
         except StopIteration:
-            future.set_exception(StopAsyncIteration())
-        except Exception as exc:
-            future.set_exception(exc)
+            raise StopAsyncIteration()
 
     @coroutine_annotation(callback=False)
     def next(self):
@@ -1284,9 +1318,7 @@ class AgnosticChangeStream(AgnosticBase):
 
         """
         loop = self.get_io_loop()
-        future = self._framework.get_future(loop)
-        self._framework.run_on_executor(loop, self._next, future)
-        return future
+        return self._framework.run_on_executor(loop, self._next)
 
     @coroutine_annotation(callback=False)
     def close(self):
@@ -1313,7 +1345,8 @@ class AgnosticChangeStream(AgnosticBase):
             return self
     
         async def __aexit__(self, exc_type, exc_val, exc_tb):
-            await self.close()
+            if self.delegate:
+                self.delegate.close() 
         """), globals(), locals())
 
     def get_io_loop(self):
diff --git a/motor/frameworks/asyncio/__init__.py b/motor/frameworks/asyncio/__init__.py
index 48a3aef..098f24c 100644
--- a/motor/frameworks/asyncio/__init__.py
+++ b/motor/frameworks/asyncio/__init__.py
@@ -61,12 +61,42 @@ else:
 _EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
 
 
-def run_on_executor(loop, fn, self, *args, **kwargs):
-    # Ensures the wrapped future is resolved on the main thread, though the
-    # executor's future is resolved on a worker thread.
-    return asyncio.futures.wrap_future(
-        _EXECUTOR.submit(functools.partial(fn, self, *args, **kwargs)),
-        loop=loop)
+def run_on_executor(loop, fn, *args, **kwargs):
+    # Adapted from asyncio's wrap_future and _chain_future. Ensure the wrapped
+    # future is resolved on the main thread when the executor's future is
+    # resolved on a worker thread. asyncio's wrap_future does the same, but
+    # throws an error if the loop is stopped. We want to avoid errors if a
+    # background task completes after the loop stops, e.g. ChangeStream.next()
+    # returns while the program is shutting down.
+    def _set_state():
+        if dest.cancelled():
+            return
+
+        if source.cancelled():
+            dest.cancel()
+        else:
+            exception = source.exception()
+            if exception is not None:
+                dest.set_exception(exception)
+            else:
+                result = source.result()
+                dest.set_result(result)
+
+    def _call_check_cancel(_):
+        if dest.cancelled():
+            source.cancel()
+
+    def _call_set_state(_):
+        if loop.is_closed():
+            return
+
+        loop.call_soon_threadsafe(_set_state)
+
+    source = _EXECUTOR.submit(functools.partial(fn, *args, **kwargs))
+    dest = asyncio.Future(loop=loop)
+    dest.add_done_callback(_call_check_cancel)
+    source.add_done_callback(_call_set_state)
+    return dest
 
 
 _DEFAULT = object()
diff --git a/motor/frameworks/tornado/__init__.py b/motor/frameworks/tornado/__init__.py
index 7490f67..515dedb 100644
--- a/motor/frameworks/tornado/__init__.py
+++ b/motor/frameworks/tornado/__init__.py
@@ -57,11 +57,11 @@ else:
 _EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
 
 
-def run_on_executor(loop, fn, self, *args, **kwargs):
+def run_on_executor(loop, fn, *args, **kwargs):
     # Need a Tornado Future for "await" expressions. exec_fut is resolved on a
     # worker thread, loop.add_future ensures "future" is resolved on main.
     future = concurrent.Future()
-    exec_fut = _EXECUTOR.submit(fn, self, *args, **kwargs)
+    exec_fut = _EXECUTOR.submit(fn, *args, **kwargs)
 
     def copy(_):
         if future.done():
@@ -75,6 +75,7 @@ def run_on_executor(loop, fn, self, *args, **kwargs):
     loop.add_future(exec_fut, copy)
     return future
 
+
 _DEFAULT = object()
 
 
diff --git a/setup.py b/setup.py
index 9e04368..986fd4a 100644
--- a/setup.py
+++ b/setup.py
@@ -152,7 +152,7 @@ if sys.version_info[0] >= 3:
     packages.append('motor.aiohttp')
 
 setup(name='motor',
-      version='1.2.0',
+      version='1.2.1',
       packages=packages,
       description=description,
       long_description=long_description,

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-motor.git



More information about the Python-modules-commits mailing list