[Python-modules-commits] [python-motor] 01/04: Import python-motor_1.2.1.orig.tar.gz
Ondrej Novy
onovy at debian.org
Sun Jan 21 13:25:00 UTC 2018
This is an automated email from the git hooks/post-receive script.
onovy pushed a commit to branch master
in repository python-motor.
commit f2e9761fa3de0eae9f5385dc6e1b0bdc2d3b312e
Author: Ondřej Nový <onovy at debian.org>
Date: Sun Jan 21 14:20:43 2018 +0100
Import python-motor_1.2.1.orig.tar.gz
---
doc/changelog.rst | 8 +++
doc/conf.py | 2 +-
doc/examples/tornado_change_stream_example.py | 18 +++++--
motor/__init__.py | 4 +-
motor/core.py | 75 +++++++++++++++++++--------
motor/frameworks/asyncio/__init__.py | 42 ++++++++++++---
motor/frameworks/tornado/__init__.py | 5 +-
setup.py | 2 +-
8 files changed, 119 insertions(+), 37 deletions(-)
diff --git a/doc/changelog.rst b/doc/changelog.rst
index 255f7b7..dcccfa0 100644
--- a/doc/changelog.rst
+++ b/doc/changelog.rst
@@ -3,6 +3,14 @@ Changelog
.. currentmodule:: motor.motor_tornado
+Motor 1.2.1
+-----------
+
+An asyncio application that created a Change Stream with
+:meth:`MotorCollection.watch` and shut down while the Change Stream was open
+would print several errors. I have rewritten :meth:`MotorChangeStream.next`
+and some Motor internals to allow clean shutdown with asyncio.
+
Motor 1.2
---------
diff --git a/doc/conf.py b/doc/conf.py
index 7119062..4088c49 100644
--- a/doc/conf.py
+++ b/doc/conf.py
@@ -31,7 +31,7 @@ master_doc = 'index'
# General information about the project.
project = u'Motor'
-copyright = u'2016 MongoDB, Inc.'
+copyright = u'2016-present MongoDB, Inc.'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
diff --git a/doc/examples/tornado_change_stream_example.py b/doc/examples/tornado_change_stream_example.py
index c595dbf..ae153f8 100644
--- a/doc/examples/tornado_change_stream_example.py
+++ b/doc/examples/tornado_change_stream_example.py
@@ -83,9 +83,15 @@ class ChangesHandler(tornado.websocket.WebSocketHandler):
ChangesHandler.update_cache(change)
+change_stream = None
+
+
async def watch(collection):
- async for change in collection.watch():
- ChangesHandler.on_change(change)
+ global change_stream
+
+ async with collection.watch() as change_stream:
+ async for change in change_stream:
+ ChangesHandler.on_change(change)
def main():
@@ -103,7 +109,13 @@ def main():
loop = tornado.ioloop.IOLoop.current()
# Start watching collection for changes.
loop.add_callback(watch, collection)
- loop.start()
+ try:
+ loop.start()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ if change_stream is not None:
+ change_stream.close()
if __name__ == "__main__":
diff --git a/motor/__init__.py b/motor/__init__.py
index 6326e7f..6110de2 100644
--- a/motor/__init__.py
+++ b/motor/__init__.py
@@ -20,12 +20,10 @@ import pymongo
from motor.motor_py3_compat import text_type
-version_tuple = (1, 2, 0)
+version_tuple = (1, 2, 1)
def get_version_string():
- if isinstance(version_tuple[-1], text_type):
- return '.'.join(map(str, version_tuple[:-1])) + version_tuple[-1]
return '.'.join(map(str, version_tuple))
diff --git a/motor/core.py b/motor/core.py
index 7a7b00c..eddc2df 100644
--- a/motor/core.py
+++ b/motor/core.py
@@ -462,15 +462,56 @@ class AgnosticCollection(AgnosticBaseProperties):
Returns a :class:`~MotorChangeStream` cursor which iterates over changes
on this collection. Introduced in MongoDB 3.6.
+ A change stream continues waiting indefinitely for matching change
+ events. Code like the following allows a program to cancel the change
+ stream and exit.
+
.. code-block:: python3
- async with db.collection.watch() as stream:
- async for change in stream:
- print(change)
+ change_stream = None
+
+ async def watch_collection():
+ global change_stream
+
+ # Using the change stream in an "async with" block
+ # ensures it is canceled promptly if your code breaks
+ # from the loop or throws an exception.
+ async with db.collection.watch() as change_stream:
+ async for change in stream:
+ print(change)
+
+ # Tornado
+ from tornado.ioloop import IOLoop
- Using the change stream in an "async with" block as shown above ensures
- it is canceled promptly if your code breaks from the loop or throws an
- exception.
+ def main():
+ loop = IOLoop.current()
+ # Start watching collection for changes.
+ loop.add_callback(watch_collection)
+ try:
+ loop.start()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ if change_stream is not None:
+ change_stream.close()
+
+ # asyncio
+ from asyncio import get_event_loop
+
+ def main():
+ loop = get_event_loop()
+ task = loop.create_task(watch_collection)
+
+ try:
+ loop.run_forever()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ if change_stream is not None:
+ change_stream.close()
+
+ # Prevent "Task was destroyed but it is pending!"
+ loop.run_until_complete(task)
The :class:`~MotorChangeStream` async iterable blocks
until the next change document is returned or an error is raised. If
@@ -1248,22 +1289,15 @@ class AgnosticChangeStream(AgnosticBase):
'collation': collation,
'session': session}
- def _next(self, future):
- # This method is run on a thread. asyncio prohibits future.set_exception
- # with a StopIteration, so we must handle this operation differently
- # from other async methods.
+ def _next(self):
+ # This method is run on a thread.
try:
if not self.delegate:
self.delegate = self._collection.delegate.watch(**self._kwargs)
- change = self.delegate.next()
- self._framework.call_soon(self.get_io_loop(),
- future.set_result,
- change)
+ return self.delegate.next()
except StopIteration:
- future.set_exception(StopAsyncIteration())
- except Exception as exc:
- future.set_exception(exc)
+ raise StopAsyncIteration()
@coroutine_annotation(callback=False)
def next(self):
@@ -1284,9 +1318,7 @@ class AgnosticChangeStream(AgnosticBase):
"""
loop = self.get_io_loop()
- future = self._framework.get_future(loop)
- self._framework.run_on_executor(loop, self._next, future)
- return future
+ return self._framework.run_on_executor(loop, self._next)
@coroutine_annotation(callback=False)
def close(self):
@@ -1313,7 +1345,8 @@ class AgnosticChangeStream(AgnosticBase):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
- await self.close()
+ if self.delegate:
+ self.delegate.close()
"""), globals(), locals())
def get_io_loop(self):
diff --git a/motor/frameworks/asyncio/__init__.py b/motor/frameworks/asyncio/__init__.py
index 48a3aef..098f24c 100644
--- a/motor/frameworks/asyncio/__init__.py
+++ b/motor/frameworks/asyncio/__init__.py
@@ -61,12 +61,42 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
-def run_on_executor(loop, fn, self, *args, **kwargs):
- # Ensures the wrapped future is resolved on the main thread, though the
- # executor's future is resolved on a worker thread.
- return asyncio.futures.wrap_future(
- _EXECUTOR.submit(functools.partial(fn, self, *args, **kwargs)),
- loop=loop)
+def run_on_executor(loop, fn, *args, **kwargs):
+ # Adapted from asyncio's wrap_future and _chain_future. Ensure the wrapped
+ # future is resolved on the main thread when the executor's future is
+ # resolved on a worker thread. asyncio's wrap_future does the same, but
+ # throws an error if the loop is stopped. We want to avoid errors if a
+ # background task completes after the loop stops, e.g. ChangeStream.next()
+ # returns while the program is shutting down.
+ def _set_state():
+ if dest.cancelled():
+ return
+
+ if source.cancelled():
+ dest.cancel()
+ else:
+ exception = source.exception()
+ if exception is not None:
+ dest.set_exception(exception)
+ else:
+ result = source.result()
+ dest.set_result(result)
+
+ def _call_check_cancel(_):
+ if dest.cancelled():
+ source.cancel()
+
+ def _call_set_state(_):
+ if loop.is_closed():
+ return
+
+ loop.call_soon_threadsafe(_set_state)
+
+ source = _EXECUTOR.submit(functools.partial(fn, *args, **kwargs))
+ dest = asyncio.Future(loop=loop)
+ dest.add_done_callback(_call_check_cancel)
+ source.add_done_callback(_call_set_state)
+ return dest
_DEFAULT = object()
diff --git a/motor/frameworks/tornado/__init__.py b/motor/frameworks/tornado/__init__.py
index 7490f67..515dedb 100644
--- a/motor/frameworks/tornado/__init__.py
+++ b/motor/frameworks/tornado/__init__.py
@@ -57,11 +57,11 @@ else:
_EXECUTOR = ThreadPoolExecutor(max_workers=max_workers)
-def run_on_executor(loop, fn, self, *args, **kwargs):
+def run_on_executor(loop, fn, *args, **kwargs):
# Need a Tornado Future for "await" expressions. exec_fut is resolved on a
# worker thread, loop.add_future ensures "future" is resolved on main.
future = concurrent.Future()
- exec_fut = _EXECUTOR.submit(fn, self, *args, **kwargs)
+ exec_fut = _EXECUTOR.submit(fn, *args, **kwargs)
def copy(_):
if future.done():
@@ -75,6 +75,7 @@ def run_on_executor(loop, fn, self, *args, **kwargs):
loop.add_future(exec_fut, copy)
return future
+
_DEFAULT = object()
diff --git a/setup.py b/setup.py
index 9e04368..986fd4a 100644
--- a/setup.py
+++ b/setup.py
@@ -152,7 +152,7 @@ if sys.version_info[0] >= 3:
packages.append('motor.aiohttp')
setup(name='motor',
- version='1.2.0',
+ version='1.2.1',
packages=packages,
description=description,
long_description=long_description,
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-motor.git
More information about the Python-modules-commits
mailing list