[Python-modules-commits] [python-amqp] 01/07: Import python-amqp_2.0.1.orig.tar.gz
Michael Fladischer
fladi at moszumanska.debian.org
Sun Jul 30 21:27:56 UTC 2017
This is an automated email from the git hooks/post-receive script.
fladi pushed a commit to branch debian/experimental
in repository python-amqp.
commit 26aaeccb6cfbbff9d87f41375fde5e989624e370
Author: Michael Fladischer <FladischerMichael at fladi.at>
Date: Thu Jun 9 09:36:18 2016 +0200
Import python-amqp_2.0.1.orig.tar.gz
---
Changelog | 90 +++
PKG-INFO | 26 +-
README.rst | 18 +-
amqp.egg-info/PKG-INFO | 26 +-
amqp.egg-info/SOURCES.txt | 43 +-
amqp.egg-info/requires.txt | 1 +
amqp/__init__.py | 26 +-
amqp/abstract_channel.py | 95 ++-
amqp/basic_message.py | 64 +-
amqp/channel.py | 976 +++++++-------------------
amqp/connection.py | 951 ++++++++-----------------
amqp/exceptions.py | 24 +-
amqp/five.py | 183 +----
amqp/method_framing.py | 342 ++++-----
amqp/protocol.py | 2 +-
amqp/serialization.py | 878 ++++++++++++-----------
amqp/spec.py | 106 +++
amqp/tests/case.py | 85 +--
amqp/tests/test_abstract_channel.py | 125 ++++
amqp/tests/test_basic_message.py | 19 +
amqp/tests/test_channel.py | 420 ++++++++++-
amqp/tests/test_connection.py | 304 ++++++++
amqp/tests/test_exceptions.py | 22 +
amqp/tests/test_method_framing.py | 98 +++
amqp/tests/test_serialization.py | 203 ++++++
amqp/tests/test_transport.py | 367 ++++++++++
amqp/tests/test_utils.py | 80 +++
amqp/transport.py | 257 ++++---
amqp/utils.py | 126 ++--
demo/amqp_clock.py | 78 --
demo/demo_receive.py | 83 ---
demo/demo_send.py | 66 --
docs/.templates/page.html | 4 -
docs/.templates/sidebarintro.html | 4 -
docs/.templates/sidebarlogo.html | 3 -
docs/Makefile | 223 +++++-
docs/_ext/applyxrefs.py | 92 ---
docs/_ext/literals_to_xrefs.py | 173 -----
docs/{.static => _static}/.keep | 0
docs/_templates/sidebardonations.html | 10 +
docs/_theme/celery/static/celery.css_t | 401 -----------
docs/_theme/celery/theme.conf | 5 -
docs/changelog.rst | 545 +-------------
docs/conf.py | 151 +---
docs/images/celery_128.png | Bin 0 -> 26250 bytes
docs/images/favicon.ico | Bin 0 -> 3364 bytes
docs/includes/{intro.txt => introduction.txt} | 6 +-
docs/index.rst | 2 +-
docs/make.bat | 272 +++++++
docs/reference/amqp.spec.rst | 11 +
docs/reference/index.rst | 1 +
docs/templates/readme.txt | 15 +-
extra/README | 10 -
extra/generate_skeleton_0_8.py | 377 ----------
extra/release/bump_version.py | 181 -----
extra/release/sphinx-to-rst.py | 75 --
extra/update_comments_from_spec.py | 8 +-
funtests/run_all.py | 38 -
funtests/settings.py | 91 ---
funtests/test_basic_message.py | 132 ----
funtests/test_channel.py | 317 ---------
funtests/test_connection.py | 127 ----
funtests/test_exceptions.py | 47 --
funtests/test_serialization.py | 411 -----------
funtests/test_with.py | 70 --
requirements/default.txt | 1 +
requirements/docs.txt | 3 +-
requirements/pkgutils.txt | 11 +-
requirements/test-ci.txt | 2 +
requirements/test.txt | 6 +-
setup.cfg | 3 +
setup.py | 36 +-
72 files changed, 3993 insertions(+), 6055 deletions(-)
diff --git a/Changelog b/Changelog
index 0612fff..ecdd1dc 100644
--- a/Changelog
+++ b/Changelog
@@ -5,11 +5,101 @@ py-amqp is fork of amqplib used by Kombu containing additional features and impr
The previous amqplib changelog is here:
http://code.google.com/p/py-amqplib/source/browse/CHANGES
+.. _version-2.0.1:
+
+2.0.1
+=====
+:release-date: 2016-05-31 6:20 PM PDT
+:release-by: Ask Solem
+
+- Adds backward compatibility layer for the 1.4 API.
+
+ Using the connection without calling ``.connect()`` first will now work,
+ but a warning is emitted and the behavior is deprecated and will be
+ removed in version 2.2.
+
+- Fixes kombu 3.0/celery 3.1 compatibility (Issue #88).
+
+ Fix contributed by Bas ten Berge.
+
+- Fixed compatibility with Python 2.7.3 (Issue #85)
+
+ Fix contributed by Bas ten Berge.
+
+- Fixed bug where calling drain_events() with a timeout of 0 would actually
+ block until a frame is received.
+
+- Documentation moved to http://amqp.readthedocs.io (Issue #89).
+
+ See https://blog.readthedocs.com/securing-subdomains/ for the reasoning
+ behind this change.
+
+ Fix contributed by Adam Chainz.
+
+.. _version-2.0.0:
+
+2.0.0
+=====
+:release-date: 2016-05-26 1:44 PM PDT
+:release-by: Ask Solem
+
+- No longer supports Python 2.6
+
+- You must now call Connection.connect() to establish the connection.
+
+ The Connection constructor no longer has side effects, so you have
+ to explicitly call connect first.
+
+- Library rewritten to anticipate async changes.
+
+- Connection now exposes underlying socket options.
+
+ This change allows to set arbitrary TCP socket options during the creation of
+ the transport.
+
+ Those values can be set passing a dictionray where the key is the name of
+ the parameter we want to set.
+ The names of the keys are the ones reported above.
+
+ Contributed by Andrea Rosa, Dallas Marlow and Rongze Zhu.
+
+- Additional logging for heartbeats.
+
+ Contributed by Davanum Srinivas, and Dmitry Mescheryakov.
+
+- SSL: Fixes issue with remote connection hanging
+
+ Fix contributed by Adrien Guinet.
+
+- SSL: ``ssl`` dict argument now supports the ``check_hostname`` key
+ (Issue #63).
+
+ Contributed by Vic Kumar.
+
+- Contributions by:
+
+ Adrien Guinet
+ Andrea Rosa
+ Artyom Koval
+ Corey Farwell
+ Craig Jellick
+ Dallas Marlow
+ Davanum Srinivas
+ Federico Ficarelli
+ Jared Lewis
+ Rémy Greinhofer
+ Rongze Zhu
+ Yury Selivanov
+ Vic Kumar
+ Vladimir Bolshakov
+ :github_user:`lezeroq`
+
.. _version-1.4.9:
1.4.9
=====
:release-date: 2016-01-08 5:50 PM PST
+:release-by: Ask Solem
- Fixes compatibility with Linux/OS X instances where the ``ctypes`` module
does not exist.
diff --git a/PKG-INFO b/PKG-INFO
index 53452ee..e8d81aa 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: amqp
-Version: 1.4.9
+Version: 2.0.1
Summary: Low-level AMQP client for Python (fork of amqplib)
Home-page: http://github.com/celery/py-amqp
Author: Ask Solem
@@ -10,8 +10,10 @@ Description: ===================================================================
Python AMQP 0.9.1 client library
=====================================================================
- :Version: 1.4.9
- :Web: http://amqp.readthedocs.org/
+ |build-status| |coverage| |bitdeli|
+
+ :Version: 2.0.0rc2
+ :Web: https://amqp.readthedocs.io/
:Download: http://pypi.python.org/pypi/amqp/
:Source: http://github.com/celery/py-amqp/
:Keywords: amqp, rabbitmq
@@ -27,7 +29,7 @@ Description: ===================================================================
.. _amqplib: http://pypi.python.org/pypi/amqplib
.. _Celery: http://celeryproject.org/
- .. _kombu: http://kombu.readthedocs.org/
+ .. _kombu: https://kombu.readthedocs.io/
.. _librabbitmq: http://pypi.python.org/pypi/librabbitmq
Differences from `amqplib`_
@@ -107,22 +109,26 @@ Description: ===================================================================
http://www.rabbitmq.com/devtools.html#python-dev
- .. image:: https://d2weczhvl823v0.cloudfront.net/celery/celery/trend.png
+ .. |build-status| image:: https://secure.travis-ci.org/celery/py-amqp.png?branch=master
+ :alt: Build status
+ :target: https://travis-ci.org/celery/py-amqp
+
+ .. |coverage| image:: https://codecov.io/github/celery/py-amqp/coverage.svg?branch=master
+ :target: https://codecov.io/github/celery/py-amqp?branch=master
+
+ .. |bitdeli| image:: https://d2weczhvl823v0.cloudfront.net/celery/py-amqp/trend.png
:alt: Bitdeli badge
:target: https://bitdeli.com/free
+
Platform: any
Classifier: Development Status :: 5 - Production/Stable
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
-Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
-Classifier: Programming Language :: Python :: 3.0
-Classifier: Programming Language :: Python :: 3.1
-Classifier: Programming Language :: Python :: 3.2
Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
Classifier: License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)
Classifier: Intended Audience :: Developers
-Classifier: License :: OSI Approved :: BSD License
Classifier: Operating System :: OS Independent
diff --git a/README.rst b/README.rst
index 653c4ce..0a89e29 100644
--- a/README.rst
+++ b/README.rst
@@ -2,8 +2,10 @@
Python AMQP 0.9.1 client library
=====================================================================
-:Version: 1.4.9
-:Web: http://amqp.readthedocs.org/
+|build-status| |coverage| |bitdeli|
+
+:Version: 2.0.0rc2
+:Web: https://amqp.readthedocs.io/
:Download: http://pypi.python.org/pypi/amqp/
:Source: http://github.com/celery/py-amqp/
:Keywords: amqp, rabbitmq
@@ -19,7 +21,7 @@ This library should be API compatible with `librabbitmq`_.
.. _amqplib: http://pypi.python.org/pypi/amqplib
.. _Celery: http://celeryproject.org/
-.. _kombu: http://kombu.readthedocs.org/
+.. _kombu: https://kombu.readthedocs.io/
.. _librabbitmq: http://pypi.python.org/pypi/librabbitmq
Differences from `amqplib`_
@@ -99,6 +101,14 @@ Further
http://www.rabbitmq.com/devtools.html#python-dev
-.. image:: https://d2weczhvl823v0.cloudfront.net/celery/celery/trend.png
+.. |build-status| image:: https://secure.travis-ci.org/celery/py-amqp.png?branch=master
+ :alt: Build status
+ :target: https://travis-ci.org/celery/py-amqp
+
+.. |coverage| image:: https://codecov.io/github/celery/py-amqp/coverage.svg?branch=master
+ :target: https://codecov.io/github/celery/py-amqp?branch=master
+
+.. |bitdeli| image:: https://d2weczhvl823v0.cloudfront.net/celery/py-amqp/trend.png
:alt: Bitdeli badge
:target: https://bitdeli.com/free
+
diff --git a/amqp.egg-info/PKG-INFO b/amqp.egg-info/PKG-INFO
index 53452ee..e8d81aa 100644
--- a/amqp.egg-info/PKG-INFO
+++ b/amqp.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: amqp
-Version: 1.4.9
+Version: 2.0.1
Summary: Low-level AMQP client for Python (fork of amqplib)
Home-page: http://github.com/celery/py-amqp
Author: Ask Solem
@@ -10,8 +10,10 @@ Description: ===================================================================
Python AMQP 0.9.1 client library
=====================================================================
- :Version: 1.4.9
- :Web: http://amqp.readthedocs.org/
+ |build-status| |coverage| |bitdeli|
+
+ :Version: 2.0.0rc2
+ :Web: https://amqp.readthedocs.io/
:Download: http://pypi.python.org/pypi/amqp/
:Source: http://github.com/celery/py-amqp/
:Keywords: amqp, rabbitmq
@@ -27,7 +29,7 @@ Description: ===================================================================
.. _amqplib: http://pypi.python.org/pypi/amqplib
.. _Celery: http://celeryproject.org/
- .. _kombu: http://kombu.readthedocs.org/
+ .. _kombu: https://kombu.readthedocs.io/
.. _librabbitmq: http://pypi.python.org/pypi/librabbitmq
Differences from `amqplib`_
@@ -107,22 +109,26 @@ Description: ===================================================================
http://www.rabbitmq.com/devtools.html#python-dev
- .. image:: https://d2weczhvl823v0.cloudfront.net/celery/celery/trend.png
+ .. |build-status| image:: https://secure.travis-ci.org/celery/py-amqp.png?branch=master
+ :alt: Build status
+ :target: https://travis-ci.org/celery/py-amqp
+
+ .. |coverage| image:: https://codecov.io/github/celery/py-amqp/coverage.svg?branch=master
+ :target: https://codecov.io/github/celery/py-amqp?branch=master
+
+ .. |bitdeli| image:: https://d2weczhvl823v0.cloudfront.net/celery/py-amqp/trend.png
:alt: Bitdeli badge
:target: https://bitdeli.com/free
+
Platform: any
Classifier: Development Status :: 5 - Production/Stable
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
-Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
-Classifier: Programming Language :: Python :: 3.0
-Classifier: Programming Language :: Python :: 3.1
-Classifier: Programming Language :: Python :: 3.2
Classifier: Programming Language :: Python :: 3.3
+Classifier: Programming Language :: Python :: 3.4
Classifier: License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)
Classifier: Intended Audience :: Developers
-Classifier: License :: OSI Approved :: BSD License
Classifier: Operating System :: OS Independent
diff --git a/amqp.egg-info/SOURCES.txt b/amqp.egg-info/SOURCES.txt
index ef83e64..4c3283c 100644
--- a/amqp.egg-info/SOURCES.txt
+++ b/amqp.egg-info/SOURCES.txt
@@ -14,32 +14,36 @@ amqp/five.py
amqp/method_framing.py
amqp/protocol.py
amqp/serialization.py
+amqp/spec.py
amqp/transport.py
amqp/utils.py
amqp.egg-info/PKG-INFO
amqp.egg-info/SOURCES.txt
amqp.egg-info/dependency_links.txt
amqp.egg-info/not-zip-safe
+amqp.egg-info/requires.txt
amqp.egg-info/top_level.txt
amqp/tests/__init__.py
amqp/tests/case.py
+amqp/tests/test_abstract_channel.py
+amqp/tests/test_basic_message.py
amqp/tests/test_channel.py
-demo/amqp_clock.py
-demo/demo_receive.py
-demo/demo_send.py
+amqp/tests/test_connection.py
+amqp/tests/test_exceptions.py
+amqp/tests/test_method_framing.py
+amqp/tests/test_serialization.py
+amqp/tests/test_transport.py
+amqp/tests/test_utils.py
docs/Makefile
docs/changelog.rst
docs/conf.py
docs/index.rst
-docs/.static/.keep
-docs/.templates/page.html
-docs/.templates/sidebarintro.html
-docs/.templates/sidebarlogo.html
-docs/_ext/applyxrefs.py
-docs/_ext/literals_to_xrefs.py
-docs/_theme/celery/theme.conf
-docs/_theme/celery/static/celery.css_t
-docs/includes/intro.txt
+docs/make.bat
+docs/_static/.keep
+docs/_templates/sidebardonations.html
+docs/images/celery_128.png
+docs/images/favicon.ico
+docs/includes/introduction.txt
docs/reference/amqp.abstract_channel.rst
docs/reference/amqp.basic_message.rst
docs/reference/amqp.channel.rst
@@ -49,23 +53,14 @@ docs/reference/amqp.five.rst
docs/reference/amqp.method_framing.rst
docs/reference/amqp.protocol.rst
docs/reference/amqp.serialization.rst
+docs/reference/amqp.spec.rst
docs/reference/amqp.transport.rst
docs/reference/amqp.utils.rst
docs/reference/index.rst
docs/templates/readme.txt
-extra/README
-extra/generate_skeleton_0_8.py
extra/update_comments_from_spec.py
-extra/release/bump_version.py
-extra/release/sphinx-to-rst.py
-funtests/run_all.py
-funtests/settings.py
-funtests/test_basic_message.py
-funtests/test_channel.py
-funtests/test_connection.py
-funtests/test_exceptions.py
-funtests/test_serialization.py
-funtests/test_with.py
+requirements/default.txt
requirements/docs.txt
requirements/pkgutils.txt
+requirements/test-ci.txt
requirements/test.txt
\ No newline at end of file
diff --git a/amqp.egg-info/requires.txt b/amqp.egg-info/requires.txt
new file mode 100644
index 0000000..fdc96a1
--- /dev/null
+++ b/amqp.egg-info/requires.txt
@@ -0,0 +1 @@
+vine>=1.1.0
diff --git a/amqp/__init__.py b/amqp/__init__.py
index 9c39c43..b2b849b 100644
--- a/amqp/__init__.py
+++ b/amqp/__init__.py
@@ -14,10 +14,17 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-from __future__ import absolute_import
+from __future__ import absolute_import, unicode_literals
-VERSION = (1, 4, 9)
-__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
+from collections import namedtuple
+
+version_info_t = namedtuple(
+ 'version_info_t', ('major', 'minor', 'micro', 'releaselevel', 'serial'),
+)
+
+VERSION = version_info = version_info_t(2, 0, 1, '', '')
+
+__version__ = '{0.major}.{0.minor}.{0.micro}{0.releaselevel}'.format(VERSION)
__author__ = 'Barry Pederson'
__maintainer__ = 'Ask Solem'
__contact__ = 'pyamqp at celeryproject.org'
@@ -61,10 +68,19 @@ from .exceptions import ( # noqa
error_for_code,
__all__ as _all_exceptions,
)
-from .utils import promise # noqa
+from .utils import promise # noqa
+
+# Enable celery 3.1.23 to import the package instead of breaking on an
+# unknown symbol
+__all_externals__ = [
+ 'promise',
+]
__all__ = [
'Connection',
'Channel',
'Message',
-] + _all_exceptions
+]
+__all__ += _all_exceptions
+
+__all__ += __all_externals__
diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py
index 62fca89..da60769 100644
--- a/amqp/abstract_channel.py
+++ b/amqp/abstract_channel.py
@@ -14,10 +14,13 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-from __future__ import absolute_import
+from __future__ import absolute_import, unicode_literals
+
+from vine import ensure_promise, promise
from .exceptions import AMQPNotImplementedError, RecoverableConnectionError
-from .serialization import AMQPWriter
+from .five import bytes_if_py2
+from .serialization import dumps, loads
__all__ = ['AbstractChannel']
@@ -36,6 +39,10 @@ class AbstractChannel(object):
connection.channels[channel_id] = self
self.method_queue = [] # Higher level queue for methods
self.auto_decode = False
+ self._pending = {}
+ self._callbacks = {}
+
+ self._setup_listeners()
def __enter__(self):
return self
@@ -43,32 +50,57 @@ class AbstractChannel(object):
def __exit__(self, *exc_info):
self.close()
- def _send_method(self, method_sig, args=bytes(), content=None):
- """Send a method for our channel."""
+ def send_method(self, sig,
+ format=None, args=None, content=None,
+ wait=None, callback=None, returns_tuple=False):
+ p = promise()
conn = self.connection
if conn is None:
raise RecoverableConnectionError('connection already closed')
+ args = dumps(format, args) if format else bytes_if_py2('')
+ try:
+ conn.frame_writer.send((1, self.channel_id, sig, args, content))
+ except StopIteration:
+ raise RecoverableConnectionError('connection already closed')
- if isinstance(args, AMQPWriter):
- args = args.getvalue()
-
- conn.method_writer.write_method(
- self.channel_id, method_sig, args, content,
- )
+ # TODO temp: callback should be after write_method ... ;)
+ if callback:
+ p.then(callback)
+ p()
+ if wait:
+ return self.wait(wait, returns_tuple=returns_tuple)
+ return p
def close(self):
"""Close this Channel or Connection"""
raise NotImplementedError('Must be overriden in subclass')
- def wait(self, allowed_methods=None, timeout=None):
- """Wait for a method that matches our allowed_methods parameter (the
- default value of None means match any method), and dispatch to it."""
- method_sig, args, content = self.connection._wait_method(
- self.channel_id, allowed_methods, timeout)
+ def wait(self, method, callback=None, timeout=None, returns_tuple=False):
+ p = ensure_promise(callback)
+ pending = self._pending
+ prev_p = []
+ if not isinstance(method, list):
+ method = [method]
- return self.dispatch_method(method_sig, args, content)
+ for m in method:
+ prev_p.append(pending.get(m))
+ pending[m] = p
- def dispatch_method(self, method_sig, args, content):
+ try:
+ while not p.ready:
+ self.connection.drain_events(timeout=timeout)
+
+ if p.value:
+ args, kwargs = p.value
+ return args if returns_tuple else (args and args[0])
+ finally:
+ for i, m in enumerate(method):
+ if prev_p[i] is not None:
+ pending[m] = prev_p[i]
+ else:
+ pending.pop(m, None)
+
+ def dispatch_method(self, method_sig, payload, content):
if content and \
self.auto_decode and \
hasattr(content, 'content_encoding'):
@@ -78,16 +110,35 @@ class AbstractChannel(object):
pass
try:
- amqp_method = self._METHOD_MAP[method_sig]
+ amqp_method = self._METHODS[method_sig]
except KeyError:
raise AMQPNotImplementedError(
'Unknown AMQP method {0!r}'.format(method_sig))
- if content is None:
- return amqp_method(self, args)
+ try:
+ listeners = [self._callbacks[method_sig]]
+ except KeyError:
+ listeners = None
+ try:
+ one_shot = self._pending.pop(method_sig)
+ except KeyError:
+ if not listeners:
+ return
else:
- return amqp_method(self, args, content)
+ if listeners is None:
+ listeners = [one_shot]
+ else:
+ listeners.append(one_shot)
+
+ args = []
+ if amqp_method.args:
+ args, _ = loads(amqp_method.args, payload, 4)
+ if amqp_method.content:
+ args.append(content)
+
+ for listener in listeners:
+ listener(*args)
#: Placeholder, the concrete implementations will have to
#: supply their own versions of _METHOD_MAP
- _METHOD_MAP = {}
+ _METHODS = {}
diff --git a/amqp/basic_message.py b/amqp/basic_message.py
index 192ede9..56b9792 100644
--- a/amqp/basic_message.py
+++ b/amqp/basic_message.py
@@ -14,8 +14,18 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-from __future__ import absolute_import
-
+from __future__ import absolute_import, unicode_literals
+
+# Intended to fix #85: ImportError: cannot import name spec
+# Encountered on python 2.7.3
+# "The submodules often need to refer to each other. For example, the
+# surround [sic] module might use the echo module. In fact, such
+# references are so common that the import statement first looks in
+# the containing package before looking in the standard module search
+# path."
+# Source:
+# http://stackoverflow.com/a/14216937/4982251
+from .spec import Basic
from .serialization import GenericContent
__all__ = ['Message']
@@ -23,27 +33,31 @@ __all__ = ['Message']
class Message(GenericContent):
"""A Message for use with the Channnel.basic_* methods."""
+ CLASS_ID = Basic.CLASS_ID
#: Instances of this class have these attributes, which
#: are passed back and forth as message properties between
#: client and server
PROPERTIES = [
- ('content_type', 'shortstr'),
- ('content_encoding', 'shortstr'),
- ('application_headers', 'table'),
- ('delivery_mode', 'octet'),
- ('priority', 'octet'),
- ('correlation_id', 'shortstr'),
- ('reply_to', 'shortstr'),
- ('expiration', 'shortstr'),
- ('message_id', 'shortstr'),
- ('timestamp', 'timestamp'),
- ('type', 'shortstr'),
- ('user_id', 'shortstr'),
- ('app_id', 'shortstr'),
- ('cluster_id', 'shortstr')
+ ('content_type', 's'),
+ ('content_encoding', 's'),
+ ('application_headers', 'F'),
+ ('delivery_mode', 'o'),
+ ('priority', 'o'),
+ ('correlation_id', 's'),
+ ('reply_to', 's'),
+ ('expiration', 's'),
+ ('message_id', 's'),
+ ('timestamp', 'L'),
+ ('type', 's'),
+ ('user_id', 's'),
+ ('app_id', 's'),
+ ('cluster_id', 's')
]
+ #: set by basic_consume/basic_get
+ delivery_info = None
+
def __init__(self, body='', children=None, channel=None, **properties):
"""Expected arg types
@@ -109,16 +123,10 @@ class Message(GenericContent):
self.body = body
self.channel = channel
- def __eq__(self, other):
- """Check if the properties and bodies of this Message and another
- Message are the same.
+ @property
+ def headers(self):
+ return self.properties.get('application_headers')
- Received messages may contain a 'delivery_info' attribute,
- which isn't compared.
-
- """
- try:
- return (super(Message, self).__eq__(other) and
- self.body == other.body)
- except AttributeError:
- return NotImplemented
+ @property
+ def delivery_tag(self):
+ return self.delivery_info.get('delivery_tag')
diff --git a/amqp/channel.py b/amqp/channel.py
index ff6a4ae..8060646 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -14,20 +14,24 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
-from __future__ import absolute_import
+from __future__ import absolute_import, unicode_literals
import logging
+import socket
from collections import defaultdict
from warnings import warn
+from vine import ensure_promise
+
+from . import spec
from .abstract_channel import AbstractChannel
from .exceptions import (
- ChannelError, ConsumerCancelled, NotConfirmed, error_for_code,
+ ChannelError, ConsumerCancelled,
+ RecoverableChannelError, RecoverableConnectionError, error_for_code,
)
from .five import Queue
-from .protocol import basic_return_t, queue_declare_ok_t
-from .serialization import AMQPWriter
+from .protocol import queue_declare_ok_t
__all__ = ['Channel']
@@ -38,6 +42,11 @@ The auto_delete flag for exchanges has been deprecated and will be removed
from py-amqp v1.5.0.\
"""
+REJECTED_MESSAGE_WITHOUT_CALLBACK = """\
+Rejecting message with delivery tag %r for reason of having no callbacks.
+consumer_tag=%r exchange=%r routing_key=%r.\
+"""
+
class VDeprecationWarning(DeprecationWarning):
pass
@@ -61,8 +70,40 @@ class Channel(AbstractChannel):
/ S:CLOSE C:CLOSE-OK
"""
-
- def __init__(self, connection, channel_id=None, auto_decode=True):
+ _METHODS = set([
+ spec.method(spec.Channel.Close, 'BsBB'),
+ spec.method(spec.Channel.CloseOk),
+ spec.method(spec.Channel.Flow, 'b'),
+ spec.method(spec.Channel.FlowOk, 'b'),
+ spec.method(spec.Channel.OpenOk),
+ spec.method(spec.Exchange.DeclareOk),
+ spec.method(spec.Exchange.DeleteOk),
+ spec.method(spec.Exchange.BindOk),
+ spec.method(spec.Exchange.UnbindOk),
+ spec.method(spec.Queue.BindOk),
+ spec.method(spec.Queue.UnbindOk),
+ spec.method(spec.Queue.DeclareOk, 'sll'),
+ spec.method(spec.Queue.DeleteOk, 'l'),
+ spec.method(spec.Queue.PurgeOk, 'l'),
+ spec.method(spec.Basic.Cancel, 's'),
+ spec.method(spec.Basic.CancelOk, 's'),
+ spec.method(spec.Basic.ConsumeOk, 's'),
+ spec.method(spec.Basic.Deliver, 'sLbss', content=True),
+ spec.method(spec.Basic.GetEmpty, 's'),
+ spec.method(spec.Basic.GetOk, 'Lbssl', content=True),
+ spec.method(spec.Basic.QosOk),
+ spec.method(spec.Basic.RecoverOk),
+ spec.method(spec.Basic.Return, 'Bsss', content=True),
+ spec.method(spec.Tx.CommitOk),
+ spec.method(spec.Tx.RollbackOk),
+ spec.method(spec.Tx.SelectOk),
+ spec.method(spec.Confirm.SelectOk),
+ spec.method(spec.Basic.Ack, 'Lb'),
+ ])
+ _METHODS = {m.method_sig: m for m in _METHODS}
+
+ def __init__(self, connection,
+ channel_id=None, auto_decode=True, on_open=None):
"""Create a channel bound to a connection and using the specified
numeric channel_id, and open on the server.
@@ -79,7 +120,7 @@ class Channel(AbstractChannel):
else:
channel_id = connection._get_free_channel_id()
- AMQP_LOGGER.debug('using channel_id: %d', channel_id)
+ AMQP_LOGGER.debug('using channel_id: %s', channel_id)
super(Channel, self).__init__(connection, channel_id)
@@ -92,18 +133,34 @@ class Channel(AbstractChannel):
self.events = defaultdict(set)
self.no_ack_consumers = set()
+ self.on_open = ensure_promise(on_open)
+
# set first time basic_publish_confirm is called
# and publisher confirms are enabled for this channel.
self._confirm_selected = False
if self.connection.confirm_publish:
self.basic_publish = self.basic_publish_confirm
- self._x_open()
-
- def _do_close(self):
+ def then(self, on_success, on_error=None):
+ return self.on_open.then(on_success, on_error)
+
+ def _setup_listeners(self):
+ self._callbacks.update({
+ spec.Channel.Close: self._on_close,
+ spec.Channel.CloseOk: self._on_close_ok,
+ spec.Channel.Flow: self._on_flow,
+ spec.Channel.OpenOk: self._on_open_ok,
+ spec.Basic.Cancel: self._on_basic_cancel,
+ spec.Basic.CancelOk: self._on_basic_cancel_ok,
+ spec.Basic.Deliver: self._on_basic_deliver,
+ spec.Basic.Return: self._on_basic_return,
+ spec.Basic.Ack: self._on_basic_ack,
+ })
+
+ def collect(self):
"""Tear down this object, after we've agreed to close
with the server."""
- AMQP_LOGGER.debug('Closed channel #%d', self.channel_id)
+ AMQP_LOGGER.debug('Closed channel #%s', self.channel_id)
self.is_open = False
channel_id, self.channel_id = self.channel_id, None
connection, self.connection = self.connection, None
@@ -117,9 +174,10 @@ class Channel(AbstractChannel):
def _do_revive(self):
self.is_open = False
- self._x_open()
+ self.open()
- def close(self, reply_code=0, reply_text='', method_sig=(0, 0)):
+ def close(self, reply_code=0, reply_text='', method_sig=(0, 0),
+ argsig='BsBB'):
"""Request a channel close
This method indicates that the sender wants to close the
@@ -167,23 +225,23 @@ class Channel(AbstractChannel):
"""
try:
- if not self.is_open or self.connection is None:
+ is_closed = (
+ not self.is_open or
+ self.connection is None or
+ self.connection.channels is None
+ )
+ if is_closed:
return
- args = AMQPWriter()
- args.write_short(reply_code)
- args.write_shortstr(reply_text)
- args.write_short(method_sig[0]) # class_id
- args.write_short(method_sig[1]) # method_id
- self._send_method((20, 40), args)
- return self.wait(allowed_methods=[
- (20, 40), # Channel.close
- (20, 41), # Channel.close_ok
- ])
+ return self.send_method(
+ spec.Channel.Close, argsig,
+ (reply_code, reply_text, method_sig[0], method_sig[1]),
+ wait=spec.Channel.CloseOk,
+ )
finally:
self.connection = None
- def _close(self, args):
+ def _on_close(self, reply_code, reply_text, class_id, method_id):
"""Request a channel close
This method indicates that the sender wants to close the
@@ -231,19 +289,13 @@ class Channel(AbstractChannel):
"""
- reply_code = args.read_short()
- reply_text = args.read_shortstr()
- class_id = args.read_short()
- method_id = args.read_short()
-
- self._send_method((20, 41))
+ self.send_method(spec.Channel.CloseOk)
self._do_revive()
-
raise error_for_code(
reply_code, reply_text, (class_id, method_id), ChannelError,
)
- def _close_ok(self, args):
+ def _on_close_ok(self):
"""Confirm a channel close
This method confirms a Channel.Close method and tells the
@@ -257,7 +309,7 @@ class Channel(AbstractChannel):
the error.
"""
- self._do_close()
+ self.collect()
def flow(self, active):
"""Enable/disable flow from peer
@@ -305,14 +357,11 @@ class Channel(AbstractChannel):
False, the peer stops sending content frames.
"""
- args = AMQPWriter()
- args.write_bit(active)
- self._send_method((20, 20), args)
- return self.wait(allowed_methods=[
- (20, 21), # Channel.flow_ok
- ])
-
- def _flow(self, args):
+ return self.send_method(
+ spec.Channel.Flow, 'b', (active,), wait=spec.Channel.FlowOk,
+ )
+
+ def _on_flow(self, active):
"""Enable/disable flow from peer
This method asks the peer to pause or restart the flow of
@@ -358,7 +407,7 @@ class Channel(AbstractChannel):
False, the peer stops sending content frames.
"""
- self.active = args.read_bit()
+ self.active = active
self._x_flow_ok(self.active)
def _x_flow_ok(self, active):
@@ -377,29 +426,9 @@ class Channel(AbstractChannel):
to send content frames; False means it will not.
"""
- args = AMQPWriter()
- args.write_bit(active)
- self._send_method((20, 21), args)
-
- def _flow_ok(self, args):
- """Confirm a flow method
-
- Confirms to the peer that a flow command was received and
- processed.
-
- PARAMETERS:
- active: boolean
+ return self.send_method(spec.Channel.FlowOk, 'b', (active,))
- current flow setting
... 10775 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-amqp.git
More information about the Python-modules-commits
mailing list