[Python-modules-commits] [python-hbmqtt] 01/01: Import python-hbmqtt_0.9.orig.tar.gz
Stein Magnus Jodal
jodal at moszumanska.debian.org
Wed Jul 26 11:21:33 UTC 2017
This is an automated email from the git hooks/post-receive script.
jodal pushed a commit to branch upstream
in repository python-hbmqtt.
commit da5faab0b6d7197934f241864deef23388cf2248
Author: Stein Magnus Jodal <jodal at debian.org>
Date: Wed Jul 26 10:40:29 2017 +0200
Import python-hbmqtt_0.9.orig.tar.gz
---
MANIFEST.in | 2 +
PKG-INFO | 21 +
hbmqtt.egg-info/PKG-INFO | 21 +
hbmqtt.egg-info/SOURCES.txt | 71 ++++
hbmqtt.egg-info/dependency_links.txt | 1 +
hbmqtt.egg-info/entry_points.txt | 19 +
hbmqtt.egg-info/requires.txt | 5 +
hbmqtt.egg-info/top_level.txt | 3 +
hbmqtt/__init__.py | 5 +
hbmqtt/adapters.py | 217 ++++++++++
hbmqtt/broker.py | 754 +++++++++++++++++++++++++++++++++
hbmqtt/client.py | 518 ++++++++++++++++++++++
hbmqtt/codecs.py | 120 ++++++
hbmqtt/errors.py | 31 ++
hbmqtt/mqtt/__init__.py | 44 ++
hbmqtt/mqtt/connack.py | 84 ++++
hbmqtt/mqtt/connect.py | 328 ++++++++++++++
hbmqtt/mqtt/constants.py | 7 +
hbmqtt/mqtt/disconnect.py | 21 +
hbmqtt/mqtt/packet.py | 240 +++++++++++
hbmqtt/mqtt/pingreq.py | 21 +
hbmqtt/mqtt/pingresp.py | 25 ++
hbmqtt/mqtt/protocol/__init__.py | 0
hbmqtt/mqtt/protocol/broker_handler.py | 185 ++++++++
hbmqtt/mqtt/protocol/client_handler.py | 181 ++++++++
hbmqtt/mqtt/protocol/handler.py | 567 +++++++++++++++++++++++++
hbmqtt/mqtt/puback.py | 35 ++
hbmqtt/mqtt/pubcomp.py | 35 ++
hbmqtt/mqtt/publish.py | 158 +++++++
hbmqtt/mqtt/pubrec.py | 35 ++
hbmqtt/mqtt/pubrel.py | 34 ++
hbmqtt/mqtt/suback.py | 65 +++
hbmqtt/mqtt/subscribe.py | 63 +++
hbmqtt/mqtt/unsuback.py | 27 ++
hbmqtt/mqtt/unsubscribe.py | 57 +++
hbmqtt/plugins/__init__.py | 4 +
hbmqtt/plugins/authentication.py | 91 ++++
hbmqtt/plugins/logging.py | 46 ++
hbmqtt/plugins/manager.py | 210 +++++++++
hbmqtt/plugins/persistence.py | 65 +++
hbmqtt/plugins/sys/__init__.py | 0
hbmqtt/plugins/sys/broker.py | 167 ++++++++
hbmqtt/session.py | 171 ++++++++
hbmqtt/utils.py | 48 +++
hbmqtt/version.py | 54 +++
license.txt | 21 +
scripts/__init__.py | 0
scripts/broker_script.py | 81 ++++
scripts/default_broker.yaml | 10 +
scripts/default_client.yaml | 7 +
scripts/pub_script.py | 168 ++++++++
scripts/sub_script.py | 144 +++++++
setup.cfg | 8 +
setup.py | 61 +++
tests/mqtt/__init__.py | 1 +
tests/mqtt/protocol/__init__.py | 1 +
tests/mqtt/protocol/test_handler.py | 460 ++++++++++++++++++++
tests/mqtt/test_connect.py | 123 ++++++
tests/mqtt/test_packet.py | 50 +++
tests/mqtt/test_puback.py | 25 ++
tests/mqtt/test_pubcomp.py | 25 ++
tests/mqtt/test_publish.py | 121 ++++++
tests/mqtt/test_pubrec.py | 25 ++
tests/mqtt/test_pubrel.py | 25 ++
tests/mqtt/test_suback.py | 35 ++
tests/mqtt/test_subscribe.py | 37 ++
tests/mqtt/test_unsuback.py | 26 ++
tests/mqtt/test_unsubscribe.py | 28 ++
tests/plugins/__init__.py | 1 +
tests/plugins/test_authentication.py | 111 +++++
tests/plugins/test_manager.py | 100 +++++
tests/plugins/test_persistence.py | 59 +++
72 files changed, 6609 insertions(+)
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..1640d6e
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,2 @@
+recursive-include scripts *.yaml
+include license.txt
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..4adc081
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,21 @@
+Metadata-Version: 1.1
+Name: hbmqtt
+Version: 0.9
+Summary: MQTT client/brocker using Python 3.4 asyncio library
+Home-page: https://github.com/beerfactory/hbmqtt
+Author: Nicolas Jouanin
+Author-email: nico at beerfactory.org
+License: MIT
+Description: UNKNOWN
+Platform: all
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: MIT License
+Classifier: Operating System :: POSIX
+Classifier: Operating System :: MacOS
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: 3.6
+Classifier: Topic :: Communications
+Classifier: Topic :: Internet
diff --git a/hbmqtt.egg-info/PKG-INFO b/hbmqtt.egg-info/PKG-INFO
new file mode 100644
index 0000000..4adc081
--- /dev/null
+++ b/hbmqtt.egg-info/PKG-INFO
@@ -0,0 +1,21 @@
+Metadata-Version: 1.1
+Name: hbmqtt
+Version: 0.9
+Summary: MQTT client/brocker using Python 3.4 asyncio library
+Home-page: https://github.com/beerfactory/hbmqtt
+Author: Nicolas Jouanin
+Author-email: nico at beerfactory.org
+License: MIT
+Description: UNKNOWN
+Platform: all
+Classifier: Development Status :: 3 - Alpha
+Classifier: Intended Audience :: Developers
+Classifier: License :: OSI Approved :: MIT License
+Classifier: Operating System :: POSIX
+Classifier: Operating System :: MacOS
+Classifier: Operating System :: Microsoft :: Windows
+Classifier: Programming Language :: Python :: 3.4
+Classifier: Programming Language :: Python :: 3.5
+Classifier: Programming Language :: Python :: 3.6
+Classifier: Topic :: Communications
+Classifier: Topic :: Internet
diff --git a/hbmqtt.egg-info/SOURCES.txt b/hbmqtt.egg-info/SOURCES.txt
new file mode 100644
index 0000000..f713a5e
--- /dev/null
+++ b/hbmqtt.egg-info/SOURCES.txt
@@ -0,0 +1,71 @@
+MANIFEST.in
+license.txt
+setup.cfg
+setup.py
+hbmqtt/__init__.py
+hbmqtt/adapters.py
+hbmqtt/broker.py
+hbmqtt/client.py
+hbmqtt/codecs.py
+hbmqtt/errors.py
+hbmqtt/session.py
+hbmqtt/utils.py
+hbmqtt/version.py
+hbmqtt.egg-info/PKG-INFO
+hbmqtt.egg-info/SOURCES.txt
+hbmqtt.egg-info/dependency_links.txt
+hbmqtt.egg-info/entry_points.txt
+hbmqtt.egg-info/requires.txt
+hbmqtt.egg-info/top_level.txt
+hbmqtt/mqtt/__init__.py
+hbmqtt/mqtt/connack.py
+hbmqtt/mqtt/connect.py
+hbmqtt/mqtt/constants.py
+hbmqtt/mqtt/disconnect.py
+hbmqtt/mqtt/packet.py
+hbmqtt/mqtt/pingreq.py
+hbmqtt/mqtt/pingresp.py
+hbmqtt/mqtt/puback.py
+hbmqtt/mqtt/pubcomp.py
+hbmqtt/mqtt/publish.py
+hbmqtt/mqtt/pubrec.py
+hbmqtt/mqtt/pubrel.py
+hbmqtt/mqtt/suback.py
+hbmqtt/mqtt/subscribe.py
+hbmqtt/mqtt/unsuback.py
+hbmqtt/mqtt/unsubscribe.py
+hbmqtt/mqtt/protocol/__init__.py
+hbmqtt/mqtt/protocol/broker_handler.py
+hbmqtt/mqtt/protocol/client_handler.py
+hbmqtt/mqtt/protocol/handler.py
+hbmqtt/plugins/__init__.py
+hbmqtt/plugins/authentication.py
+hbmqtt/plugins/logging.py
+hbmqtt/plugins/manager.py
+hbmqtt/plugins/persistence.py
+hbmqtt/plugins/sys/__init__.py
+hbmqtt/plugins/sys/broker.py
+scripts/__init__.py
+scripts/broker_script.py
+scripts/default_broker.yaml
+scripts/default_client.yaml
+scripts/pub_script.py
+scripts/sub_script.py
+tests/mqtt/__init__.py
+tests/mqtt/test_connect.py
+tests/mqtt/test_packet.py
+tests/mqtt/test_puback.py
+tests/mqtt/test_pubcomp.py
+tests/mqtt/test_publish.py
+tests/mqtt/test_pubrec.py
+tests/mqtt/test_pubrel.py
+tests/mqtt/test_suback.py
+tests/mqtt/test_subscribe.py
+tests/mqtt/test_unsuback.py
+tests/mqtt/test_unsubscribe.py
+tests/mqtt/protocol/__init__.py
+tests/mqtt/protocol/test_handler.py
+tests/plugins/__init__.py
+tests/plugins/test_authentication.py
+tests/plugins/test_manager.py
+tests/plugins/test_persistence.py
\ No newline at end of file
diff --git a/hbmqtt.egg-info/dependency_links.txt b/hbmqtt.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/hbmqtt.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/hbmqtt.egg-info/entry_points.txt b/hbmqtt.egg-info/entry_points.txt
new file mode 100644
index 0000000..eb496b3
--- /dev/null
+++ b/hbmqtt.egg-info/entry_points.txt
@@ -0,0 +1,19 @@
+[console_scripts]
+hbmqtt = scripts.broker_script:main
+hbmqtt_pub = scripts.pub_script:main
+hbmqtt_sub = scripts.sub_script:main
+
+[hbmqtt.broker.plugins]
+auth_anonymous = hbmqtt.plugins.authentication:AnonymousAuthPlugin
+auth_file = hbmqtt.plugins.authentication:FileAuthPlugin
+broker_sys = hbmqtt.plugins.sys.broker:BrokerSysPlugin
+packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin
+
+[hbmqtt.client.plugins]
+packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin
+
+[hbmqtt.test.plugins]
+event_plugin = tests.plugins.test_manager:EventTestPlugin
+packet_logger_plugin = hbmqtt.plugins.logging:PacketLoggerPlugin
+test_plugin = tests.plugins.test_manager:TestPlugin
+
diff --git a/hbmqtt.egg-info/requires.txt b/hbmqtt.egg-info/requires.txt
new file mode 100644
index 0000000..0ade1d9
--- /dev/null
+++ b/hbmqtt.egg-info/requires.txt
@@ -0,0 +1,5 @@
+transitions==0.2.5
+websockets
+passlib
+docopt
+pyyaml
diff --git a/hbmqtt.egg-info/top_level.txt b/hbmqtt.egg-info/top_level.txt
new file mode 100644
index 0000000..16eab0d
--- /dev/null
+++ b/hbmqtt.egg-info/top_level.txt
@@ -0,0 +1,3 @@
+hbmqtt
+scripts
+tests
diff --git a/hbmqtt/__init__.py b/hbmqtt/__init__.py
new file mode 100644
index 0000000..f53cbc0
--- /dev/null
+++ b/hbmqtt/__init__.py
@@ -0,0 +1,5 @@
+# Copyright (c) 2015 Nicolas JOUANIN
+#
+# See the file license.txt for copying permission.
+
+VERSION = (0, 9, 0, 'final', 0)
diff --git a/hbmqtt/adapters.py b/hbmqtt/adapters.py
new file mode 100644
index 0000000..090a518
--- /dev/null
+++ b/hbmqtt/adapters.py
@@ -0,0 +1,217 @@
+# Copyright (c) 2015 Nicolas JOUANIN
+#
+# See the file license.txt for copying permission.
+import asyncio
+import io
+from websockets.protocol import WebSocketCommonProtocol
+from websockets.exceptions import ConnectionClosed
+from asyncio import StreamReader, StreamWriter
+import logging
+
+
+class ReaderAdapter:
+ """
+ Base class for all network protocol reader adapter.
+
+ Reader adapters are used to adapt read operations on the network depending on the protocol used
+ """
+
+ @asyncio.coroutine
+ def read(self, n=-1) -> bytes:
+ """
+ Read up to n bytes. If n is not provided, or set to -1, read until EOF and return all read bytes.
+ If the EOF was received and the internal buffer is empty, return an empty bytes object.
+ :return: packet read as bytes data
+ """
+
+ def feed_eof(self):
+ """
+ Acknowleddge EOF
+ """
+
+
+class WriterAdapter:
+ """
+ Base class for all network protocol writer adapter.
+
+ Writer adapters are used to adapt write operations on the network depending on the protocol used
+ """
+
+ def write(self, data):
+ """
+ write some data to the protocol layer
+ """
+
+ @asyncio.coroutine
+ def drain(self):
+ """
+ Let the write buffer of the underlying transport a chance to be flushed.
+ """
+
+ def get_peer_info(self):
+ """
+ Return peer socket info (remote address and remote port as tuple
+ """
+
+ @asyncio.coroutine
+ def close(self):
+ """
+ Close the protocol connection
+ """
+
+
+class WebSocketsReader(ReaderAdapter):
+ """
+ WebSockets API reader adapter
+ This adapter relies on WebSocketCommonProtocol to read from a WebSocket.
+ """
+ def __init__(self, protocol: WebSocketCommonProtocol):
+ self._protocol = protocol
+ self._stream = io.BytesIO(b'')
+
+ @asyncio.coroutine
+ def read(self, n=-1) -> bytes:
+ yield from self._feed_buffer(n)
+ data = self._stream.read(n)
+ return data
+
+ @asyncio.coroutine
+ def _feed_buffer(self, n=1):
+ """
+ Feed the data buffer by reading a Websocket message.
+ :param n: if given, feed buffer until it contains at least n bytes
+ """
+ buffer = bytearray(self._stream.read())
+ while len(buffer) < n:
+ try:
+ message = yield from self._protocol.recv()
+ except ConnectionClosed:
+ message = None
+ if message is None:
+ break
+ if not isinstance(message, bytes):
+ raise TypeError("message must be bytes")
+ buffer.extend(message)
+ self._stream = io.BytesIO(buffer)
+
+
+class WebSocketsWriter(WriterAdapter):
+ """
+ WebSockets API writer adapter
+ This adapter relies on WebSocketCommonProtocol to read from a WebSocket.
+ """
+ def __init__(self, protocol: WebSocketCommonProtocol):
+ self._protocol = protocol
+ self._stream = io.BytesIO(b'')
+
+ def write(self, data):
+ """
+ write some data to the protocol layer
+ """
+ self._stream.write(data)
+
+ @asyncio.coroutine
+ def drain(self):
+ """
+ Let the write buffer of the underlying transport a chance to be flushed.
+ """
+ data = self._stream.getvalue()
+ if len(data):
+ yield from self._protocol.send(data)
+ self._stream = io.BytesIO(b'')
+
+ def get_peer_info(self):
+ extra_info = self._protocol.writer.get_extra_info('peername')
+ return extra_info[0], extra_info[1]
+
+ @asyncio.coroutine
+ def close(self):
+ yield from self._protocol.close()
+
+
+class StreamReaderAdapter(ReaderAdapter):
+ """
+ Asyncio Streams API protocol adapter
+ This adapter relies on StreamReader to read from a TCP socket.
+ Because API is very close, this class is trivial
+ """
+ def __init__(self, reader: StreamReader):
+ self._reader = reader
+
+ @asyncio.coroutine
+ def read(self, n=-1) -> bytes:
+ return (yield from self._reader.read(n))
+
+ def feed_eof(self):
+ return self._reader.feed_eof()
+
+
+class StreamWriterAdapter(WriterAdapter):
+ """
+ Asyncio Streams API protocol adapter
+ This adapter relies on StreamWriter to write to a TCP socket.
+ Because API is very close, this class is trivial
+ """
+ def __init__(self, writer: StreamWriter):
+ self.logger = logging.getLogger(__name__)
+ self._writer = writer
+
+ def write(self, data):
+ self._writer.write(data)
+
+ @asyncio.coroutine
+ def drain(self):
+ yield from self._writer.drain()
+
+ def get_peer_info(self):
+ extra_info = self._writer.get_extra_info('peername')
+ return extra_info[0], extra_info[1]
+
+ @asyncio.coroutine
+ def close(self):
+ yield from self._writer.drain()
+ if self._writer.can_write_eof():
+ self._writer.write_eof()
+ self._writer.close()
+
+
+class BufferReader(ReaderAdapter):
+ """
+ Byte Buffer reader adapter
+ This adapter simply adapt reading a byte buffer.
+ """
+ def __init__(self, buffer: bytes):
+ self._stream = io.BytesIO(buffer)
+
+ @asyncio.coroutine
+ def read(self, n=-1) -> bytes:
+ return self._stream.read(n)
+
+
+class BufferWriter(WriterAdapter):
+ """
+ ByteBuffer writer adapter
+ This adapter simply adapt writing to a byte buffer
+ """
+ def __init__(self, buffer=b''):
+ self._stream = io.BytesIO(buffer)
+
+ def write(self, data):
+ """
+ write some data to the protocol layer
+ """
+ self._stream.write(data)
+
+ @asyncio.coroutine
+ def drain(self):
+ pass
+
+ def get_buffer(self):
+ return self._stream.getvalue()
+
+ def get_peer_info(self):
+ return "BufferWriter", 0
+
+ @asyncio.coroutine
+ def close(self):
+ self._stream.close()
diff --git a/hbmqtt/broker.py b/hbmqtt/broker.py
new file mode 100644
index 0000000..d2d4311
--- /dev/null
+++ b/hbmqtt/broker.py
@@ -0,0 +1,754 @@
+# Copyright (c) 2015 Nicolas JOUANIN
+#
+# See the file license.txt for copying permission.
+import logging
+import ssl
+import websockets
+import asyncio
+import sys
+import re
+from asyncio import Queue, CancelledError
+if sys.version_info < (3, 5):
+ from asyncio import async as ensure_future
+else:
+ from asyncio import ensure_future
+from collections import deque
+
+from functools import partial
+from transitions import Machine, MachineError
+from hbmqtt.session import Session
+from hbmqtt.mqtt.protocol.broker_handler import BrokerProtocolHandler
+from hbmqtt.errors import HBMQTTException, MQTTException
+from hbmqtt.utils import format_client_message, gen_client_id
+from hbmqtt.adapters import (
+ StreamReaderAdapter,
+ StreamWriterAdapter,
+ ReaderAdapter,
+ WriterAdapter,
+ WebSocketsReader,
+ WebSocketsWriter)
+from .plugins.manager import PluginManager, BaseContext
+
+_defaults = {
+ 'timeout-disconnect-delay': 2,
+ 'auth': {
+ 'allow-anonymous': True,
+ 'password-file': None
+ },
+}
+
+EVENT_BROKER_PRE_START = 'broker_pre_start'
+EVENT_BROKER_POST_START = 'broker_post_start'
+EVENT_BROKER_PRE_SHUTDOWN = 'broker_pre_shutdown'
+EVENT_BROKER_POST_SHUTDOWN = 'broker_post_shutdown'
+EVENT_BROKER_CLIENT_CONNECTED = 'broker_client_connected'
+EVENT_BROKER_CLIENT_DISCONNECTED = 'broker_client_disconnected'
+EVENT_BROKER_CLIENT_SUBSCRIBED = 'broker_client_subscribed'
+EVENT_BROKER_CLIENT_UNSUBSCRIBED = 'broker_client_unsubscribed'
+EVENT_BROKER_MESSAGE_RECEIVED = 'broker_message_received'
+
+
+class BrokerException(BaseException):
+ pass
+
+
+class RetainedApplicationMessage:
+ def __init__(self, source_session, topic, data, qos=None):
+ self.source_session = source_session
+ self.topic = topic
+ self.data = data
+ self.qos = qos
+
+
+class Server:
+ def __init__(self, listener_name, server_instance, max_connections=-1, loop=None):
+ self.logger = logging.getLogger(__name__)
+ self.instance = server_instance
+ self.conn_count = 0
+ self.listener_name = listener_name
+ if loop is not None:
+ self._loop = loop
+ else:
+ self._loop = asyncio.get_event_loop()
+
+ self.max_connections = max_connections
+ if self.max_connections > 0:
+ self.semaphore = asyncio.Semaphore(self.max_connections, loop=self._loop)
+ else:
+ self.semaphore = None
+
+ @asyncio.coroutine
+ def acquire_connection(self):
+ if self.semaphore:
+ yield from self.semaphore.acquire()
+ self.conn_count += 1
+ if self.max_connections > 0:
+ self.logger.info("Listener '%s': %d/%d connections acquired" %
+ (self.listener_name, self.conn_count, self.max_connections))
+ else:
+ self.logger.info("Listener '%s': %d connections acquired" %
+ (self.listener_name, self.conn_count))
+
+ def release_connection(self):
+ if self.semaphore:
+ self.semaphore.release()
+ self.conn_count -= 1
+ if self.max_connections > 0:
+ self.logger.info("Listener '%s': %d/%d connections acquired" %
+ (self.listener_name, self.conn_count, self.max_connections))
+ else:
+ self.logger.info("Listener '%s': %d connections acquired" %
+ (self.listener_name, self.conn_count))
+
+ @asyncio.coroutine
+ def close_instance(self):
+ if self.instance:
+ self.instance.close()
+ yield from self.instance.wait_closed()
+
+
+class BrokerContext(BaseContext):
+ """
+ BrokerContext is used as the context passed to plugins interacting with the broker.
+ It act as an adapter to broker services from plugins developed for HBMQTT broker
+ """
+ def __init__(self, broker):
+ super().__init__()
+ self.config = None
+ self._broker_instance = broker
+
+ @asyncio.coroutine
+ def broadcast_message(self, topic, data, qos=None):
+ yield from self._broker_instance.internal_message_broadcast(topic, data, qos)
+
+ def retain_message(self, topic_name, data, qos=None):
+ self._broker_instance.retain_message(None, topic_name, data, qos)
+
+ @property
+ def sessions(self):
+ for k, session in self._broker_instance._sessions.items():
+ yield session[0]
+
+ @property
+ def retained_messages(self):
+ return self._broker_instance._retained_messages
+
+ @property
+ def subscriptions(self):
+ return self._broker_instance._subscriptions
+
+
+class Broker:
+ """
+ MQTT 3.1.1 compliant broker implementation
+
+ :param config: Example Yaml config
+ :param loop: asyncio loop to use. Defaults to ``asyncio.get_event_loop()`` if none is given
+ :param plugin_namespace: Plugin namespace to use when loading plugin entry_points. Defaults to ``hbmqtt.broker.plugins``
+
+ """
+ states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped']
+
+ def __init__(self, config=None, loop=None, plugin_namespace=None):
+ self.logger = logging.getLogger(__name__)
+ self.config = _defaults
+ if config is not None:
+ self.config.update(config)
+ self._build_listeners_config(self.config)
+
+ if loop is not None:
+ self._loop = loop
+ else:
+ self._loop = asyncio.get_event_loop()
+
+ self._servers = dict()
+ self._init_states()
+ self._sessions = dict()
+ self._subscriptions = dict()
+ self._retained_messages = dict()
+ self._broadcast_queue = asyncio.Queue(loop=self._loop)
+
+ self._broadcast_task = None
+
+ # Init plugins manager
+ context = BrokerContext(self)
+ context.config = self.config
+ if plugin_namespace:
+ namespace = plugin_namespace
+ else:
+ namespace = 'hbmqtt.broker.plugins'
+ self.plugins_manager = PluginManager(namespace, context, self._loop)
+
+ def _build_listeners_config(self, broker_config):
+ self.listeners_config = dict()
+ try:
+ listeners_config = broker_config['listeners']
+ defaults = listeners_config['default']
+ for listener in listeners_config:
+ config = dict(defaults)
+ config.update(listeners_config[listener])
+ self.listeners_config[listener] = config
+ except KeyError as ke:
+ raise BrokerException("Listener config not found invalid: %s" % ke)
+
+ def _init_states(self):
+ self.transitions = Machine(states=Broker.states, initial='new')
+ self.transitions.add_transition(trigger='start', source='new', dest='starting')
+ self.transitions.add_transition(trigger='starting_fail', source='starting', dest='not_started')
+ self.transitions.add_transition(trigger='starting_success', source='starting', dest='started')
+ self.transitions.add_transition(trigger='shutdown', source='started', dest='stopping')
+ self.transitions.add_transition(trigger='stopping_success', source='stopping', dest='stopped')
+ self.transitions.add_transition(trigger='stopping_failure', source='stopping', dest='not_stopped')
+ self.transitions.add_transition(trigger='start', source='stopped', dest='starting')
+
+ @asyncio.coroutine
+ def start(self):
+ """
+ Start the broker to serve with the given configuration
+
+ Start method opens network sockets and will start listening for incoming connections.
+
+ This method is a *coroutine*.
+ """
+ try:
+ self._sessions = dict()
+ self._subscriptions = dict()
+ self._retained_messages = dict()
+ self.transitions.start()
+ self.logger.debug("Broker starting")
+ except MachineError as me:
+ self.logger.warn("[WARN-0001] Invalid method call at this moment: %s" % me)
+ raise BrokerException("Broker instance can't be started: %s" % me)
+
+ yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_START)
+ try:
+ # Start network listeners
+ for listener_name in self.listeners_config:
+ listener = self.listeners_config[listener_name]
+
+ if 'bind' not in listener:
+ self.logger.debug("Listener configuration '%s' is not bound" % listener_name)
+ else:
+ # Max connections
+ try:
+ max_connections = listener['max_connections']
+ except KeyError:
+ max_connections = -1
+
+ # SSL Context
+ sc = None
+
+ # accept string "on" / "off" or boolean
+ ssl_active = listener.get('ssl', False)
+ if isinstance(ssl_active, str):
+ ssl_active = ssl_active.upper() == 'ON'
+
+ if ssl_active:
+ try:
+ sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
+ sc.load_cert_chain(listener['certfile'], listener['keyfile'])
+ sc.verify_mode = ssl.CERT_OPTIONAL
+ except KeyError as ke:
+ raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke)
+ except FileNotFoundError as fnfe:
+ raise BrokerException("Can't read cert files '%s' or '%s' : %s" %
+ (listener['certfile'], listener['keyfile'], fnfe))
+
+ address, s_port = listener['bind'].split(':')
+ port = 0
+ try:
+ port = int(s_port)
+ except ValueError as ve:
+ raise BrokerException("Invalid port value in bind value: %s" % listener['bind'])
+
+ if listener['type'] == 'tcp':
+ cb_partial = partial(self.stream_connected, listener_name=listener_name)
+ instance = yield from asyncio.start_server(cb_partial,
+ address,
+ port,
+ ssl=sc,
+ loop=self._loop)
+ self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
+ elif listener['type'] == 'ws':
+ cb_partial = partial(self.ws_connected, listener_name=listener_name)
+ instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop,
+ subprotocols=['mqtt'])
+ self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)
+
+ self.logger.info("Listener '%s' bind to %s (max_connections=%d)" %
+ (listener_name, listener['bind'], max_connections))
+
+ self.transitions.starting_success()
+ yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START)
+
+ #Start broadcast loop
+ self._broadcast_task = ensure_future(self._broadcast_loop(), loop=self._loop)
+
+ self.logger.debug("Broker started")
+ except Exception as e:
+ self.logger.error("Broker startup failed: %s" % e)
+ self.transitions.starting_fail()
+ raise BrokerException("Broker instance can't be started: %s" % e)
+
+ @asyncio.coroutine
+ def shutdown(self):
+ """
+ Stop broker instance.
+
+ Closes all connected session, stop listening on network socket and free resources.
+ """
+ try:
+ self._sessions = dict()
+ self._subscriptions = dict()
+ self._retained_messages = dict()
+ self.transitions.shutdown()
+ except MachineError as me:
+ self.logger.debug("Invalid method call at this moment: %s" % me)
+ raise BrokerException("Broker instance can't be stopped: %s" % me)
+
+ # Fire broker_shutdown event to plugins
+ yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_SHUTDOWN)
+
+ # Stop broadcast loop
+ if self._broadcast_task:
+ self._broadcast_task.cancel()
+ if self._broadcast_queue.qsize() > 0:
+ self.logger.warning("%d messages not broadcasted" % self._broadcast_queue.qsize())
+
+ for listener_name in self._servers:
+ server = self._servers[listener_name]
+ yield from server.close_instance()
+ self.logger.debug("Broker closing")
+ self.logger.info("Broker closed")
+ yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN)
+ self.transitions.stopping_success()
+
+ @asyncio.coroutine
+ def internal_message_broadcast(self, topic, data, qos=None):
+ return (yield from self._broadcast_message(None, topic, data))
+
+ @asyncio.coroutine
+ def ws_connected(self, websocket, uri, listener_name):
+ yield from self.client_connected(listener_name, WebSocketsReader(websocket), WebSocketsWriter(websocket))
+
+ @asyncio.coroutine
+ def stream_connected(self, reader, writer, listener_name):
+ yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
+
+ @asyncio.coroutine
+ def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterAdapter):
+ # Wait for connection available on listener
+ server = self._servers.get(listener_name, None)
+ if not server:
+ raise BrokerException("Invalid listener name '%s'" % listener_name)
+ yield from server.acquire_connection()
+
+ remote_address, remote_port = writer.get_peer_info()
+ self.logger.info("Connection from %s:%d on listener '%s'" % (remote_address, remote_port, listener_name))
+
+ # Wait for first packet and expect a CONNECT
+ try:
+ handler, client_session = yield from BrokerProtocolHandler.init_from_connect(reader, writer, self.plugins_manager, loop=self._loop)
+ except HBMQTTException as exc:
+ self.logger.warn("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" %
+ (format_client_message(address=remote_address, port=remote_port), exc))
+ #yield from writer.close()
+ self.logger.debug("Connection closed")
+ return
+ except MQTTException as me:
+ self.logger.error('Invalid connection from %s : %s' %
+ (format_client_message(address=remote_address, port=remote_port), me))
+ yield from writer.close()
+ self.logger.debug("Connection closed")
+ return
+
+ if client_session.clean_session:
+ # Delete existing session and create a new one
+ if client_session.client_id is not None:
+ self.delete_session(client_session.client_id)
+ else:
+ client_session.client_id = gen_client_id()
+ client_session.parent = 0
+ else:
+ # Get session from cache
+ if client_session.client_id in self._sessions:
+ self.logger.debug("Found old session %s" % repr(self._sessions[client_session.client_id]))
+ (client_session,h) = self._sessions[client_session.client_id]
+ client_session.parent = 1
+ else:
+ client_session.parent = 0
+ if client_session.keep_alive > 0:
+ client_session.keep_alive += self.config['timeout-disconnect-delay']
+ self.logger.debug("Keep-alive timeout=%d" % client_session.keep_alive)
+
+ handler.attach(client_session, reader, writer)
+ self._sessions[client_session.client_id] = (client_session, handler)
+
+ authenticated = yield from self.authenticate(client_session, self.listeners_config[listener_name])
+ if not authenticated:
+ yield from writer.close()
+ return
+
+ while True:
+ try:
+ client_session.transitions.connect()
+ break
+ except MachineError:
+ self.logger.warning("Client %s is reconnecting too quickly, make it wait" % client_session.client_id)
+ # Wait a bit may be client is reconnecting too fast
+ yield from asyncio.sleep(1, loop=self._loop)
+ yield from handler.mqtt_connack_authorize(authenticated)
+
+ yield from self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client_session.client_id)
+
+ self.logger.debug("%s Start messages handling" % client_session.client_id)
+ yield from handler.start()
+ self.logger.debug("Retained messages queue size: %d" % client_session.retained_messages.qsize())
+ yield from self.publish_session_retained_messages(client_session)
+
+ # Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect)
+ disconnect_waiter = ensure_future(handler.wait_disconnect(), loop=self._loop)
+ subscribe_waiter = ensure_future(handler.get_next_pending_subscription(), loop=self._loop)
+ unsubscribe_waiter = ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop)
+ wait_deliver = ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop)
+ connected = True
+ while connected:
+ try:
+ done, pending = yield from asyncio.wait(
+ [disconnect_waiter, subscribe_waiter, unsubscribe_waiter, wait_deliver],
+ return_when=asyncio.FIRST_COMPLETED, loop=self._loop)
+ if disconnect_waiter in done:
+ result = disconnect_waiter.result()
+ self.logger.debug("%s Result from wait_diconnect: %s" % (client_session.client_id, result))
+ if result is None:
+ self.logger.debug("Will flag: %s" % client_session.will_flag)
+ # Connection closed anormally, send will message
+ if client_session.will_flag:
+ self.logger.debug("Client %s disconnected abnormally, sending will message" %
+ format_client_message(client_session))
+ yield from self._broadcast_message(
+ client_session,
+ client_session.will_topic,
+ client_session.will_message,
+ client_session.will_qos)
+ if client_session.will_retain:
+ self.retain_message(client_session,
+ client_session.will_topic,
+ client_session.will_message,
+ client_session.will_qos)
+ self.logger.debug("%s Disconnecting session" % client_session.client_id)
+ yield from self._stop_handler(handler)
+ client_session.transitions.disconnect()
+ yield from self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id)
+ connected = False
+ if unsubscribe_waiter in done:
+ self.logger.debug("%s handling unsubscription" % client_session.client_id)
+ unsubscription = unsubscribe_waiter.result()
+ for topic in unsubscription['topics']:
+ self._del_subscription(topic, client_session)
+ yield from self.plugins_manager.fire_event(
+ EVENT_BROKER_CLIENT_UNSUBSCRIBED,
+ client_id=client_session.client_id,
+ topic=topic)
+ yield from handler.mqtt_acknowledge_unsubscription(unsubscription['packet_id'])
+ unsubscribe_waiter = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop)
+ if subscribe_waiter in done:
+ self.logger.debug("%s handling subscription" % client_session.client_id)
+ subscriptions = subscribe_waiter.result()
+ return_codes = []
+ for subscription in subscriptions['topics']:
+ return_codes.append(self.add_subscription(subscription, client_session))
+ yield from handler.mqtt_acknowledge_subscription(subscriptions['packet_id'], return_codes)
+ for index, subscription in enumerate(subscriptions['topics']):
+ if return_codes[index] != 0x80:
+ yield from self.plugins_manager.fire_event(
+ EVENT_BROKER_CLIENT_SUBSCRIBED,
+ client_id=client_session.client_id,
+ topic=subscription[0],
+ qos=subscription[1])
+ yield from self.publish_retained_messages_for_subscription(subscription, client_session)
+ subscribe_waiter = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop)
+ self.logger.debug(repr(self._subscriptions))
+ if wait_deliver in done:
+ if self.logger.isEnabledFor(logging.DEBUG):
+ self.logger.debug("%s handling message delivery" % client_session.client_id)
+ app_message = wait_deliver.result()
+ if not app_message.topic:
+ self.logger.warn("[MQTT-4.7.3-1] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)
+ break
+ if "#" in app_message.topic or "+" in app_message.topic:
+ self.logger.warn("[MQTT-3.3.2-2] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)
+ break
+ yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED,
+ client_id=client_session.client_id,
+ message=app_message)
+ yield from self._broadcast_message(client_session, app_message.topic, app_message.data)
+ if app_message.publish_packet.retain_flag:
+ self.retain_message(client_session, app_message.topic, app_message.data, app_message.qos)
+ wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop)
... 6125 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-hbmqtt.git
More information about the Python-modules-commits
mailing list