[Python-modules-commits] [python-hbmqtt] 01/01: Import python-hbmqtt_0.9.orig.tar.gz

Stein Magnus Jodal jodal at moszumanska.debian.org
Wed Jul 26 11:21:33 UTC 2017


This is an automated email from the git hooks/post-receive script.

jodal pushed a commit to branch upstream
in repository python-hbmqtt.

commit da5faab0b6d7197934f241864deef23388cf2248
Author: Stein Magnus Jodal <jodal at debian.org>
Date:   Wed Jul 26 10:40:29 2017 +0200

    Import python-hbmqtt_0.9.orig.tar.gz
---
 MANIFEST.in                            |   2 +
 PKG-INFO                               |  21 +
 hbmqtt.egg-info/PKG-INFO               |  21 +
 hbmqtt.egg-info/SOURCES.txt            |  71 ++++
 hbmqtt.egg-info/dependency_links.txt   |   1 +
 hbmqtt.egg-info/entry_points.txt       |  19 +
 hbmqtt.egg-info/requires.txt           |   5 +
 hbmqtt.egg-info/top_level.txt          |   3 +
 hbmqtt/__init__.py                     |   5 +
 hbmqtt/adapters.py                     | 217 ++++++++++
 hbmqtt/broker.py                       | 754 +++++++++++++++++++++++++++++++++
 hbmqtt/client.py                       | 518 ++++++++++++++++++++++
 hbmqtt/codecs.py                       | 120 ++++++
 hbmqtt/errors.py                       |  31 ++
 hbmqtt/mqtt/__init__.py                |  44 ++
 hbmqtt/mqtt/connack.py                 |  84 ++++
 hbmqtt/mqtt/connect.py                 | 328 ++++++++++++++
 hbmqtt/mqtt/constants.py               |   7 +
 hbmqtt/mqtt/disconnect.py              |  21 +
 hbmqtt/mqtt/packet.py                  | 240 +++++++++++
 hbmqtt/mqtt/pingreq.py                 |  21 +
 hbmqtt/mqtt/pingresp.py                |  25 ++
 hbmqtt/mqtt/protocol/__init__.py       |   0
 hbmqtt/mqtt/protocol/broker_handler.py | 185 ++++++++
 hbmqtt/mqtt/protocol/client_handler.py | 181 ++++++++
 hbmqtt/mqtt/protocol/handler.py        | 567 +++++++++++++++++++++++++
 hbmqtt/mqtt/puback.py                  |  35 ++
 hbmqtt/mqtt/pubcomp.py                 |  35 ++
 hbmqtt/mqtt/publish.py                 | 158 +++++++
 hbmqtt/mqtt/pubrec.py                  |  35 ++
 hbmqtt/mqtt/pubrel.py                  |  34 ++
 hbmqtt/mqtt/suback.py                  |  65 +++
 hbmqtt/mqtt/subscribe.py               |  63 +++
 hbmqtt/mqtt/unsuback.py                |  27 ++
 hbmqtt/mqtt/unsubscribe.py             |  57 +++
 hbmqtt/plugins/__init__.py             |   4 +
 hbmqtt/plugins/authentication.py       |  91 ++++
 hbmqtt/plugins/logging.py              |  46 ++
 hbmqtt/plugins/manager.py              | 210 +++++++++
 hbmqtt/plugins/persistence.py          |  65 +++
 hbmqtt/plugins/sys/__init__.py         |   0
 hbmqtt/plugins/sys/broker.py           | 167 ++++++++
 hbmqtt/session.py                      | 171 ++++++++
 hbmqtt/utils.py                        |  48 +++
 hbmqtt/version.py                      |  54 +++
 license.txt                            |  21 +
 scripts/__init__.py                    |   0
 scripts/broker_script.py               |  81 ++++
 scripts/default_broker.yaml            |  10 +
 scripts/default_client.yaml            |   7 +
 scripts/pub_script.py                  | 168 ++++++++
 scripts/sub_script.py                  | 144 +++++++
 setup.cfg                              |   8 +
 setup.py                               |  61 +++
 tests/mqtt/__init__.py                 |   1 +
 tests/mqtt/protocol/__init__.py        |   1 +
 tests/mqtt/protocol/test_handler.py    | 460 ++++++++++++++++++++
 tests/mqtt/test_connect.py             | 123 ++++++
 tests/mqtt/test_packet.py              |  50 +++
 tests/mqtt/test_puback.py              |  25 ++
 tests/mqtt/test_pubcomp.py             |  25 ++
 tests/mqtt/test_publish.py             | 121 ++++++
 tests/mqtt/test_pubrec.py              |  25 ++
 tests/mqtt/test_pubrel.py              |  25 ++
 tests/mqtt/test_suback.py              |  35 ++
 tests/mqtt/test_subscribe.py           |  37 ++
 tests/mqtt/test_unsuback.py            |  26 ++
 tests/mqtt/test_unsubscribe.py         |  28 ++
 tests/plugins/__init__.py              |   1 +
 tests/plugins/test_authentication.py   | 111 +++++
 tests/plugins/test_manager.py          | 100 +++++
 tests/plugins/test_persistence.py      |  59 +++
 72 files changed, 6609 insertions(+)

diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..1640d6e
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,2 @@
+recursive-include scripts *.yaml
+include license.txt
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..4adc081
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,21 @@
+Metadata-Version: 1.1
+Name: hbmqtt
+Version: 0.9
+Summary: MQTT client/brocker using Python 3.4 asyncio library
+Home-page: https://github.com/beerfactory/hbmqtt
+Author: Nicolas Jouanin
+Author-email: nico at beerfactory.org
+License: MIT
+Description: UNKNOWN
+Platform: all
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: MIT License
+Classifier: Operating System :: POSIX
+Classifier: Operating System :: MacOS
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: 3.6
+Classifier: Topic :: Communications
+Classifier: Topic :: Internet
diff --git a/hbmqtt.egg-info/PKG-INFO b/hbmqtt.egg-info/PKG-INFO
new file mode 100644
index 0000000..4adc081
--- /dev/null
+++ b/hbmqtt.egg-info/PKG-INFO
@@ -0,0 +1,21 @@
+Metadata-Version: 1.1
+Name: hbmqtt
+Version: 0.9
+Summary: MQTT client/brocker using Python 3.4 asyncio library
+Home-page: https://github.com/beerfactory/hbmqtt
+Author: Nicolas Jouanin
+Author-email: nico at beerfactory.org
+License: MIT
+Description: UNKNOWN
+Platform: all
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: MIT License
+Classifier: Operating System :: POSIX
+Classifier: Operating System :: MacOS
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: 3.6
+Classifier: Topic :: Communications
+Classifier: Topic :: Internet
diff --git a/hbmqtt.egg-info/SOURCES.txt b/hbmqtt.egg-info/SOURCES.txt
new file mode 100644
index 0000000..f713a5e
--- /dev/null
+++ b/hbmqtt.egg-info/SOURCES.txt
@@ -0,0 +1,71 @@
+MANIFEST.in
+license.txt
+setup.cfg
+setup.py
+hbmqtt/__init__.py
+hbmqtt/adapters.py
+hbmqtt/broker.py
+hbmqtt/client.py
+hbmqtt/codecs.py
+hbmqtt/errors.py
+hbmqtt/session.py
+hbmqtt/utils.py
+hbmqtt/version.py
+hbmqtt.egg-info/PKG-INFO
+hbmqtt.egg-info/SOURCES.txt
+hbmqtt.egg-info/dependency_links.txt
+hbmqtt.egg-info/entry_points.txt
+hbmqtt.egg-info/requires.txt
+hbmqtt.egg-info/top_level.txt
+hbmqtt/mqtt/__init__.py
+hbmqtt/mqtt/connack.py
+hbmqtt/mqtt/connect.py
+hbmqtt/mqtt/constants.py
+hbmqtt/mqtt/disconnect.py
+hbmqtt/mqtt/packet.py
+hbmqtt/mqtt/pingreq.py
+hbmqtt/mqtt/pingresp.py
+hbmqtt/mqtt/puback.py
+hbmqtt/mqtt/pubcomp.py
+hbmqtt/mqtt/publish.py
+hbmqtt/mqtt/pubrec.py
+hbmqtt/mqtt/pubrel.py
+hbmqtt/mqtt/suback.py
+hbmqtt/mqtt/subscribe.py
+hbmqtt/mqtt/unsuback.py
+hbmqtt/mqtt/unsubscribe.py
+hbmqtt/mqtt/protocol/__init__.py
+hbmqtt/mqtt/protocol/broker_handler.py
+hbmqtt/mqtt/protocol/client_handler.py
+hbmqtt/mqtt/protocol/handler.py
+hbmqtt/plugins/__init__.py
+hbmqtt/plugins/authentication.py
+hbmqtt/plugins/logging.py
+hbmqtt/plugins/manager.py
+hbmqtt/plugins/persistence.py
+hbmqtt/plugins/sys/__init__.py
+hbmqtt/plugins/sys/broker.py
+scripts/__init__.py
+scripts/broker_script.py
+scripts/default_broker.yaml
+scripts/default_client.yaml
+scripts/pub_script.py
+scripts/sub_script.py
+tests/mqtt/__init__.py
+tests/mqtt/test_connect.py
+tests/mqtt/test_packet.py
+tests/mqtt/test_puback.py
+tests/mqtt/test_pubcomp.py
+tests/mqtt/test_publish.py
+tests/mqtt/test_pubrec.py
+tests/mqtt/test_pubrel.py
+tests/mqtt/test_suback.py
+tests/mqtt/test_subscribe.py
+tests/mqtt/test_unsuback.py
+tests/mqtt/test_unsubscribe.py
+tests/mqtt/protocol/__init__.py
+tests/mqtt/protocol/test_handler.py
+tests/plugins/__init__.py
+tests/plugins/test_authentication.py
+tests/plugins/test_manager.py
+tests/plugins/test_persistence.py
\ No newline at end of file
diff --git a/hbmqtt.egg-info/dependency_links.txt b/hbmqtt.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/hbmqtt.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/hbmqtt.egg-info/entry_points.txt b/hbmqtt.egg-info/entry_points.txt
new file mode 100644
index 0000000..eb496b3
--- /dev/null
+++ b/hbmqtt.egg-info/entry_points.txt
@@ -0,0 +1,19 @@
+[console_scripts]
+hbmqtt = scripts.broker_script:main
+hbmqtt_pub = scripts.pub_script:main
+hbmqtt_sub = scripts.sub_script:main
+
+[hbmqtt.broker.plugins]
+auth_anonymous = hbmqtt.plugins.authentication:AnonymousAuthPlugin
+auth_file = hbmqtt.plugins.authentication:FileAuthPlugin
+broker_sys = hbmqtt.plugins.sys.broker:BrokerSysPlugin
+packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin
+
+[hbmqtt.client.plugins]
+packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin
+
+[hbmqtt.test.plugins]
+event_plugin = tests.plugins.test_manager:EventTestPlugin
+packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin
+test_plugin = tests.plugins.test_manager:TestPlugin
+
diff --git a/hbmqtt.egg-info/requires.txt b/hbmqtt.egg-info/requires.txt
new file mode 100644
index 0000000..0ade1d9
--- /dev/null
+++ b/hbmqtt.egg-info/requires.txt
@@ -0,0 +1,5 @@
+transitions==0.2.5
+websockets
+passlib
+docopt
+pyyaml
diff --git a/hbmqtt.egg-info/top_level.txt b/hbmqtt.egg-info/top_level.txt
new file mode 100644
index 0000000..16eab0d
--- /dev/null
+++ b/hbmqtt.egg-info/top_level.txt
@@ -0,0 +1,3 @@
+hbmqtt
+scripts
+tests
diff --git a/hbmqtt/__init__.py b/hbmqtt/__init__.py
new file mode 100644
index 0000000..f53cbc0
--- /dev/null
+++ b/hbmqtt/__init__.py
@@ -0,0 +1,5 @@
+# Copyright (c) 2015 Nicolas JOUANIN
+#
+# See the file license.txt for copying permission.
+
+VERSION = (0, 9, 0, 'final', 0)
diff --git a/hbmqtt/adapters.py b/hbmqtt/adapters.py
new file mode 100644
index 0000000..090a518
--- /dev/null
+++ b/hbmqtt/adapters.py
@@ -0,0 +1,217 @@
+# Copyright (c) 2015 Nicolas JOUANIN
+#
+# See the file license.txt for copying permission.
+import asyncio
+import io
+from websockets.protocol import WebSocketCommonProtocol
+from websockets.exceptions import ConnectionClosed
+from asyncio import StreamReader, StreamWriter
+import logging
+
+
+class ReaderAdapter:
+    """
+    Base class for all network protocol reader adapter.
+
+    Reader adapters are used to adapt read operations on the network depending on the protocol used
+    """
+
+    @asyncio.coroutine
+    def read(self, n=-1) -> bytes:
+        """
+        Read up to n bytes. If n is not provided, or set to -1, read until EOF and return all read bytes.
+        If the EOF was received and the internal buffer is empty, return an empty bytes object.
+        :return: packet read as bytes data
+        """
+
+    def feed_eof(self):
+        """
+        Acknowleddge EOF
+        """
+
+
+class WriterAdapter:
+    """
+    Base class for all network protocol writer adapter.
+
+    Writer adapters are used to adapt write operations on the network depending on the protocol used
+    """
+
+    def write(self, data):
+        """
+        write some data to the protocol layer
+        """
+
+    @asyncio.coroutine
+    def drain(self):
+        """
+        Let the write buffer of the underlying transport a chance to be flushed.
+        """
+
+    def get_peer_info(self):
+        """
+        Return peer socket info (remote address and remote port as tuple
+        """
+
+    @asyncio.coroutine
+    def close(self):
+        """
+        Close the protocol connection
+        """
+
+
+class WebSocketsReader(ReaderAdapter):
+    """
+    WebSockets API reader adapter
+    This adapter relies on WebSocketCommonProtocol to read from a WebSocket.
+    """
+    def __init__(self, protocol: WebSocketCommonProtocol):
+        self._protocol = protocol
+        self._stream = io.BytesIO(b'')
+
+    @asyncio.coroutine
+    def read(self, n=-1) -> bytes:
+        yield from self._feed_buffer(n)
+        data = self._stream.read(n)
+        return data
+
+    @asyncio.coroutine
+    def _feed_buffer(self, n=1):
+        """
+        Feed the data buffer by reading a Websocket message.
+        :param n: if given, feed buffer until it contains at least n bytes
+        """
+        buffer = bytearray(self._stream.read())
+        while len(buffer) < n:
+            try:
+                message = yield from self._protocol.recv()
+            except ConnectionClosed:
+                message = None
+            if message is None:
+                break
+            if not isinstance(message, bytes):
+                raise TypeError("message must be bytes")
+            buffer.extend(message)
+        self._stream = io.BytesIO(buffer)
+
+
+class WebSocketsWriter(WriterAdapter):
+    """
+    WebSockets API writer adapter
+    This adapter relies on WebSocketCommonProtocol to read from a WebSocket.
+    """
+    def __init__(self, protocol: WebSocketCommonProtocol):
+        self._protocol = protocol
+        self._stream = io.BytesIO(b'')
+
+    def write(self, data):
+        """
+        write some data to the protocol layer
+        """
+        self._stream.write(data)
+
+    @asyncio.coroutine
+    def drain(self):
+        """
+        Let the write buffer of the underlying transport a chance to be flushed.
+        """
+        data = self._stream.getvalue()
+        if len(data):
+            yield from self._protocol.send(data)
+        self._stream = io.BytesIO(b'')
+
+    def get_peer_info(self):
+        extra_info = self._protocol.writer.get_extra_info('peername')
+        return extra_info[0], extra_info[1]
+
+    @asyncio.coroutine
+    def close(self):
+        yield from self._protocol.close()
+
+
+class StreamReaderAdapter(ReaderAdapter):
+    """
+    Asyncio Streams API protocol adapter
+    This adapter relies on StreamReader to read from a TCP socket.
+    Because API is very close, this class is trivial
+    """
+    def __init__(self, reader: StreamReader):
+        self._reader = reader
+
+    @asyncio.coroutine
+    def read(self, n=-1) -> bytes:
+        return (yield from self._reader.read(n))
+
+    def feed_eof(self):
+        return self._reader.feed_eof()
+
+
+class StreamWriterAdapter(WriterAdapter):
+    """
+    Asyncio Streams API protocol adapter
+    This adapter relies on StreamWriter to write to a TCP socket.
+    Because API is very close, this class is trivial
+    """
+    def __init__(self, writer: StreamWriter):
+        self.logger = logging.getLogger(__name__)
+        self._writer = writer
+
+    def write(self, data):
+        self._writer.write(data)
+
+    @asyncio.coroutine
+    def drain(self):
+        yield from self._writer.drain()
+
+    def get_peer_info(self):
+        extra_info = self._writer.get_extra_info('peername')
+        return extra_info[0], extra_info[1]
+
+    @asyncio.coroutine
+    def close(self):
+        yield from self._writer.drain()
+        if self._writer.can_write_eof():
+            self._writer.write_eof()
+        self._writer.close()
+
+
+class BufferReader(ReaderAdapter):
+    """
+    Byte Buffer reader adapter
+    This adapter simply adapt reading a byte buffer.
+    """
+    def __init__(self, buffer: bytes):
+        self._stream = io.BytesIO(buffer)
+
+    @asyncio.coroutine
+    def read(self, n=-1) -> bytes:
+        return self._stream.read(n)
+
+
+class BufferWriter(WriterAdapter):
+    """
+    ByteBuffer writer adapter
+    This adapter simply adapt writing to a byte buffer
+    """
+    def __init__(self, buffer=b''):
+        self._stream = io.BytesIO(buffer)
+
+    def write(self, data):
+        """
+        write some data to the protocol layer
+        """
+        self._stream.write(data)
+
+    @asyncio.coroutine
+    def drain(self):
+        pass
+
+    def get_buffer(self):
+        return self._stream.getvalue()
+
+    def get_peer_info(self):
+        return "BufferWriter", 0
+
+    @asyncio.coroutine
+    def close(self):
+        self._stream.close()
diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py
new file mode 100644
index 0000000..d2d4311
--- /dev/null
+++ b/hbmqtt/broker.py
@@ -0,0 +1,754 @@
+# Copyright (c) 2015 Nicolas JOUANIN
+#
+# See the file license.txt for copying permission.
+import logging
+import ssl
+import websockets
+import asyncio
+import sys
+import re
+from asyncio import Queue, CancelledError
+if sys.version_info < (3, 5):
+    from asyncio import async as ensure_future
+else:
+    from asyncio import ensure_future
+from collections import deque
+
+from functools import partial
+from transitions import Machine, MachineError
+from hbmqtt.session import Session
+from hbmqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
+from hbmqtt.errors import HBMQTTException, MQTTException
+from hbmqtt.utils import format_client_message, gen_client_id
+from hbmqtt.adapters import (
+    StreamReaderAdapter,
+    StreamWriterAdapter,
+    ReaderAdapter,
+    WriterAdapter,
+    WebSocketsReader,
+    WebSocketsWriter)
+from .plugins.manager import PluginManager, BaseContext
+
+_defaults = {
+    'timeout-disconnect-delay': 2,
+    'auth': {
+        'allow-anonymous': True,
+        'password-file': None
+    },
+}
+
+EVENT_BROKER_PRE_START = 'broker_pre_start'
+EVENT_BROKER_POST_START = 'broker_post_start'
+EVENT_BROKER_PRE_SHUTDOWN = 'broker_pre_shutdown'
+EVENT_BROKER_POST_SHUTDOWN = 'broker_post_shutdown'
+EVENT_BROKER_CLIENT_CONNECTED = 'broker_client_connected'
+EVENT_BROKER_CLIENT_DISCONNECTED = 'broker_client_disconnected'
+EVENT_BROKER_CLIENT_SUBSCRIBED = 'broker_client_subscribed'
+EVENT_BROKER_CLIENT_UNSUBSCRIBED = 'broker_client_unsubscribed'
+EVENT_BROKER_MESSAGE_RECEIVED = 'broker_message_received'
+
+
+class BrokerException(BaseException):
+    pass
+
+
+class RetainedApplicationMessage:
+    def __init__(self, source_session, topic, data, qos=None):
+        self.source_session = source_session
+        self.topic = topic
+        self.data = data
+        self.qos = qos
+
+
+class Server:
+    def __init__(self, listener_name, server_instance, max_connections=-1, loop=None):
+        self.logger = logging.getLogger(__name__)
+        self.instance = server_instance
+        self.conn_count = 0
+        self.listener_name = listener_name
+        if loop is not None:
+            self._loop = loop
+        else:
+            self._loop = asyncio.get_event_loop()
+
+        self.max_connections = max_connections
+        if self.max_connections > 0:
+            self.semaphore = asyncio.Semaphore(self.max_connections, loop=self._loop)
+        else:
+            self.semaphore = None
+
+    @asyncio.coroutine
+    def acquire_connection(self):
+        if self.semaphore:
+            yield from self.semaphore.acquire()
+        self.conn_count += 1
+        if self.max_connections > 0:
+            self.logger.info("Listener '%s': %d/%d connections acquired" %
+                              (self.listener_name, self.conn_count, self.max_connections))
+        else:
+            self.logger.info("Listener '%s': %d connections acquired" %
+                              (self.listener_name, self.conn_count))
+
+    def release_connection(self):
+        if self.semaphore:
+            self.semaphore.release()
+        self.conn_count -= 1
+        if self.max_connections > 0:
+            self.logger.info("Listener '%s': %d/%d connections acquired" %
+                              (self.listener_name, self.conn_count, self.max_connections))
+        else:
+            self.logger.info("Listener '%s': %d connections acquired" %
+                              (self.listener_name, self.conn_count))
+
+    @asyncio.coroutine
+    def close_instance(self):
+        if self.instance:
+            self.instance.close()
+            yield from self.instance.wait_closed()
+
+
+class BrokerContext(BaseContext):
+    """
+    BrokerContext is used as the context passed to plugins interacting with the broker.
+    It act as an adapter to broker services from plugins developed for HBMQTT broker
+    """
+    def __init__(self, broker):
+        super().__init__()
+        self.config = None
+        self._broker_instance = broker
+
+    @asyncio.coroutine
+    def broadcast_message(self, topic, data, qos=None):
+        yield from self._broker_instance.internal_message_broadcast(topic, data, qos)
+
+    def retain_message(self, topic_name, data, qos=None):
+        self._broker_instance.retain_message(None, topic_name, data, qos)
+
+    @property
+    def sessions(self):
+        for k, session in self._broker_instance._sessions.items():
+            yield session[0]
+
+    @property
+    def retained_messages(self):
+        return self._broker_instance._retained_messages
+
+    @property
+    def subscriptions(self):
+        return self._broker_instance._subscriptions
+
+
+class Broker:
+    """
+    MQTT 3.1.1 compliant broker implementation
+
+    :param config: Example Yaml config
+    :param loop: asyncio loop to use. Defaults to ``asyncio.get_event_loop()`` if none is given
+    :param plugin_namespace: Plugin namespace to use when loading plugin entry_points. Defaults to ``hbmqtt.broker.plugins``
+
+    """
+    states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped']
+
+    def __init__(self, config=None, loop=None, plugin_namespace=None):
+        self.logger = logging.getLogger(__name__)
+        self.config = _defaults
+        if config is not None:
+            self.config.update(config)
+        self._build_listeners_config(self.config)
+
+        if loop is not None:
+            self._loop = loop
+        else:
+            self._loop = asyncio.get_event_loop()
+
+        self._servers = dict()
+        self._init_states()
+        self._sessions = dict()
+        self._subscriptions = dict()
+        self._retained_messages = dict()
+        self._broadcast_queue = asyncio.Queue(loop=self._loop)
+
+        self._broadcast_task = None
+
+        # Init plugins manager
+        context = BrokerContext(self)
+        context.config = self.config
+        if plugin_namespace:
+            namespace = plugin_namespace
+        else:
+            namespace = 'hbmqtt.broker.plugins'
+        self.plugins_manager = PluginManager(namespace, context, self._loop)
+
+    def _build_listeners_config(self, broker_config):
+        self.listeners_config = dict()
+        try:
+            listeners_config = broker_config['listeners']
+            defaults = listeners_config['default']
+            for listener in listeners_config:
+                config = dict(defaults)
+                config.update(listeners_config[listener])
+                self.listeners_config[listener] = config
+        except KeyError as ke:
+            raise BrokerException("Listener config not found invalid: %s" % ke)
+
+    def _init_states(self):
+        self.transitions = Machine(states=Broker.states, initial='new')
+        self.transitions.add_transition(trigger='start', source='new', dest='starting')
+        self.transitions.add_transition(trigger='starting_fail', source='starting', dest='not_started')
+        self.transitions.add_transition(trigger='starting_success', source='starting', dest='started')
+        self.transitions.add_transition(trigger='shutdown', source='started', dest='stopping')
+        self.transitions.add_transition(trigger='stopping_success', source='stopping', dest='stopped')
+        self.transitions.add_transition(trigger='stopping_failure', source='stopping', dest='not_stopped')
+        self.transitions.add_transition(trigger='start', source='stopped', dest='starting')
+
+    @asyncio.coroutine
+    def start(self):
+        """
+            Start the broker to serve with the given configuration
+
+            Start method opens network sockets and will start listening for incoming connections.
+
+            This method is a *coroutine*.
+        """
+        try:
+            self._sessions = dict()
+            self._subscriptions = dict()
+            self._retained_messages = dict()
+            self.transitions.start()
+            self.logger.debug("Broker starting")
+        except MachineError as me:
+            self.logger.warn("[WARN-0001] Invalid method call at this moment: %s" % me)
+            raise BrokerException("Broker instance can't be started: %s" % me)
+
+        yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_START)
+        try:
+            # Start network listeners
+            for listener_name in self.listeners_config:
+                listener = self.listeners_config[listener_name]
+
+                if 'bind' not in listener:
+                    self.logger.debug("Listener configuration '%s' is not bound" % listener_name)
+                else:
+                    # Max connections
+                    try:
+                        max_connections = listener['max_connections']
+                    except KeyError:
+                        max_connections = -1
+
+                    # SSL Context
+                    sc = None
+
+                    # accept string "on" / "off" or boolean
+                    ssl_active = listener.get('ssl', False)
+                    if isinstance(ssl_active, str):
+                        ssl_active = ssl_active.upper() == 'ON'
+
+                    if ssl_active:
+                        try:
+                            sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
+                            sc.load_cert_chain(listener['certfile'], listener['keyfile'])
+                            sc.verify_mode = ssl.CERT_OPTIONAL
+                        except KeyError as ke:
+                            raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke)
+                        except FileNotFoundError as fnfe:
+                            raise BrokerException("Can't read cert files '%s' or '%s' : %s" %
+                                                  (listener['certfile'], listener['keyfile'], fnfe))
+
+                    address, s_port = listener['bind'].split(':')
+                    port = 0
+                    try:
+                        port = int(s_port)
+                    except ValueError as ve:
+                        raise BrokerException("Invalid port value in bind value: %s" % listener['bind'])
+
+                    if listener['type'] == 'tcp':
+                        cb_partial = partial(self.stream_connected, listener_name=listener_name)
+                        instance = yield from asyncio.start_server(cb_partial,
+                                                                   address,
+                                                                   port,
+                                                                   ssl=sc,
+                                                                   loop=self._loop)
+                        self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
+                    elif listener['type'] == 'ws':
+                        cb_partial = partial(self.ws_connected, listener_name=listener_name)
+                        instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop,
+                                                               subprotocols=['mqtt'])
+                        self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
+
+                    self.logger.info("Listener '%s' bind to %s (max_connections=%d)" %
+                                     (listener_name, listener['bind'], max_connections))
+
+            self.transitions.starting_success()
+            yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START)
+
+            #Start broadcast loop
+            self._broadcast_task = ensure_future(self._broadcast_loop(), loop=self._loop)
+
+            self.logger.debug("Broker started")
+        except Exception as e:
+            self.logger.error("Broker startup failed: %s" % e)
+            self.transitions.starting_fail()
+            raise BrokerException("Broker instance can't be started: %s" % e)
+
+    @asyncio.coroutine
+    def shutdown(self):
+        """
+            Stop broker instance.
+
+            Closes all connected session, stop listening on network socket and free resources.
+        """
+        try:
+            self._sessions = dict()
+            self._subscriptions = dict()
+            self._retained_messages = dict()
+            self.transitions.shutdown()
+        except MachineError as me:
+            self.logger.debug("Invalid method call at this moment: %s" % me)
+            raise BrokerException("Broker instance can't be stopped: %s" % me)
+
+        # Fire broker_shutdown event to plugins
+        yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_SHUTDOWN)
+
+        # Stop broadcast loop
+        if self._broadcast_task:
+            self._broadcast_task.cancel()
+        if self._broadcast_queue.qsize() > 0:
+            self.logger.warning("%d messages not broadcasted" % self._broadcast_queue.qsize())
+
+        for listener_name in self._servers:
+            server = self._servers[listener_name]
+            yield from server.close_instance()
+        self.logger.debug("Broker closing")
+        self.logger.info("Broker closed")
+        yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN)
+        self.transitions.stopping_success()
+
+    @asyncio.coroutine
+    def internal_message_broadcast(self, topic, data, qos=None):
+        return (yield from self._broadcast_message(None, topic, data))
+
+    @asyncio.coroutine
+    def ws_connected(self, websocket, uri, listener_name):
+        yield from self.client_connected(listener_name, WebSocketsReader(websocket), WebSocketsWriter(websocket))
+
+    @asyncio.coroutine
+    def stream_connected(self, reader, writer, listener_name):
+        yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
+
+    @asyncio.coroutine
+    def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterAdapter):
+        # Wait for connection available on listener
+        server = self._servers.get(listener_name, None)
+        if not server:
+            raise BrokerException("Invalid listener name '%s'" % listener_name)
+        yield from server.acquire_connection()
+
+        remote_address, remote_port = writer.get_peer_info()
+        self.logger.info("Connection from %s:%d on listener '%s'" % (remote_address, remote_port, listener_name))
+
+        # Wait for first packet and expect a CONNECT
+        try:
+            handler, client_session = yield from BrokerProtocolHandler.init_from_connect(reader, writer, self.plugins_manager, loop=self._loop)
+        except HBMQTTException as exc:
+            self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" %
+                             (format_client_message(address=remote_address, port=remote_port), exc))
+            #yield from writer.close()
+            self.logger.debug("Connection closed")
+            return
+        except MQTTException as me:
+            self.logger.error('Invalid connection from %s : %s' %
+                              (format_client_message(address=remote_address, port=remote_port), me))
+            yield from writer.close()
+            self.logger.debug("Connection closed")
+            return
+
+        if client_session.clean_session:
+            # Delete existing session and create a new one
+            if client_session.client_id is not None:
+                self.delete_session(client_session.client_id)
+            else:
+                client_session.client_id = gen_client_id()
+            client_session.parent = 0
+        else:
+            # Get session from cache
+            if client_session.client_id in self._sessions:
+                self.logger.debug("Found old session %s" % repr(self._sessions[client_session.client_id]))
+                (client_session,h) = self._sessions[client_session.client_id]
+                client_session.parent = 1
+            else:
+                client_session.parent = 0
+        if client_session.keep_alive > 0:
+            client_session.keep_alive += self.config['timeout-disconnect-delay']
+        self.logger.debug("Keep-alive timeout=%d" % client_session.keep_alive)
+
+        handler.attach(client_session, reader, writer)
+        self._sessions[client_session.client_id] = (client_session, handler)
+
+        authenticated = yield from self.authenticate(client_session, self.listeners_config[listener_name])
+        if not authenticated:
+            yield from writer.close()
+            return
+
+        while True:
+            try:
+                client_session.transitions.connect()
+                break
+            except MachineError:
+                self.logger.warning("Client %s is reconnecting too quickly, make it wait" % client_session.client_id)
+                # Wait a bit may be client is reconnecting too fast
+                yield from asyncio.sleep(1, loop=self._loop)
+        yield from handler.mqtt_connack_authorize(authenticated)
+
+        yield from self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client_session.client_id)
+
+        self.logger.debug("%s Start messages handling" % client_session.client_id)
+        yield from handler.start()
+        self.logger.debug("Retained messages queue size: %d" % client_session.retained_messages.qsize())
+        yield from self.publish_session_retained_messages(client_session)
+
+        # Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect)
+        disconnect_waiter = ensure_future(handler.wait_disconnect(), loop=self._loop)
+        subscribe_waiter = ensure_future(handler.get_next_pending_subscription(), loop=self._loop)
+        unsubscribe_waiter = ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop)
+        wait_deliver = ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop)
+        connected = True
+        while connected:
+            try:
+                done, pending = yield from asyncio.wait(
+                    [disconnect_waiter, subscribe_waiter, unsubscribe_waiter, wait_deliver],
+                    return_when=asyncio.FIRST_COMPLETED, loop=self._loop)
+                if disconnect_waiter in done:
+                    result = disconnect_waiter.result()
+                    self.logger.debug("%s Result from wait_diconnect: %s" % (client_session.client_id, result))
+                    if result is None:
+                        self.logger.debug("Will flag: %s" % client_session.will_flag)
+                        # Connection closed anormally, send will message
+                        if client_session.will_flag:
+                            self.logger.debug("Client %s disconnected abnormally, sending will message" %
+                                              format_client_message(client_session))
+                            yield from self._broadcast_message(
+                                client_session,
+                                client_session.will_topic,
+                                client_session.will_message,
+                                client_session.will_qos)
+                            if client_session.will_retain:
+                                self.retain_message(client_session,
+                                                    client_session.will_topic,
+                                                    client_session.will_message,
+                                                    client_session.will_qos)
+                    self.logger.debug("%s Disconnecting session" % client_session.client_id)
+                    yield from self._stop_handler(handler)
+                    client_session.transitions.disconnect()
+                    yield from self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id)
+                    connected = False
+                if unsubscribe_waiter in done:
+                    self.logger.debug("%s handling unsubscription" % client_session.client_id)
+                    unsubscription = unsubscribe_waiter.result()
+                    for topic in unsubscription['topics']:
+                        self._del_subscription(topic, client_session)
+                        yield from self.plugins_manager.fire_event(
+                            EVENT_BROKER_CLIENT_UNSUBSCRIBED,
+                            client_id=client_session.client_id,
+                            topic=topic)
+                    yield from handler.mqtt_acknowledge_unsubscription(unsubscription['packet_id'])
+                    unsubscribe_waiter = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop)
+                if subscribe_waiter in done:
+                    self.logger.debug("%s handling subscription" % client_session.client_id)
+                    subscriptions = subscribe_waiter.result()
+                    return_codes = []
+                    for subscription in subscriptions['topics']:
+                        return_codes.append(self.add_subscription(subscription, client_session))
+                    yield from handler.mqtt_acknowledge_subscription(subscriptions['packet_id'], return_codes)
+                    for index, subscription in enumerate(subscriptions['topics']):
+                        if return_codes[index] != 0x80:
+                            yield from self.plugins_manager.fire_event(
+                                EVENT_BROKER_CLIENT_SUBSCRIBED,
+                                client_id=client_session.client_id,
+                                topic=subscription[0],
+                                qos=subscription[1])
+                            yield from self.publish_retained_messages_for_subscription(subscription, client_session)
+                    subscribe_waiter = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop)
+                    self.logger.debug(repr(self._subscriptions))
+                if wait_deliver in done:
+                    if self.logger.isEnabledFor(logging.DEBUG):
+                        self.logger.debug("%s handling message delivery" % client_session.client_id)
+                    app_message = wait_deliver.result()
+                    if not app_message.topic:
+                        self.logger.warn("[MQTT-4.7.3-1] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)
+                        break
+                    if "#" in app_message.topic or "+" in app_message.topic:
+                        self.logger.warn("[MQTT-3.3.2-2] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)
+                        break
+                    yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
+                                                               client_id=client_session.client_id,
+                                                               message=app_message)
+                    yield from self._broadcast_message(client_session, app_message.topic, app_message.data)
+                    if app_message.publish_packet.retain_flag:
+                        self.retain_message(client_session, app_message.topic, app_message.data, app_message.qos)
+                    wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop)
... 6125 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-hbmqtt.git



More information about the Python-modules-commits mailing list