[Python-modules-commits] [python-confluent-kafka] 01/05: New upstream version 0.9.1.2

Christos Trochalakis ctrochalakis-guest at moszumanska.debian.org
Fri Sep 23 08:51:55 UTC 2016


This is an automated email from the git hooks/post-receive script.

ctrochalakis-guest pushed a commit to branch master
in repository python-confluent-kafka.

commit ab89e71fb2d0fda472a811916c3e9f5fc0520286
Author: Christos Trochalakis <yatiohi at ideopolis.gr>
Date:   Wed Sep 7 10:32:26 2016 +0300

    New upstream version 0.9.1.2
---
 LICENSE                                          |  202 ++++
 MANIFEST.in                                      |    3 +
 PKG-INFO                                         |   10 +
 README.md                                        |  108 ++
 confluent_kafka.egg-info/PKG-INFO                |   10 +
 confluent_kafka.egg-info/SOURCES.txt             |   17 +
 confluent_kafka.egg-info/dependency_links.txt    |    1 +
 confluent_kafka.egg-info/top_level.txt           |    1 +
 confluent_kafka/__init__.py                      |    2 +
 confluent_kafka/kafkatest/__init__.py            |    1 +
 confluent_kafka/kafkatest/verifiable_client.py   |   80 ++
 confluent_kafka/kafkatest/verifiable_consumer.py |  288 +++++
 confluent_kafka/kafkatest/verifiable_producer.py |  141 +++
 confluent_kafka/src/Consumer.c                   |  721 ++++++++++++
 confluent_kafka/src/Producer.c                   |  516 +++++++++
 confluent_kafka/src/confluent_kafka.c            | 1309 ++++++++++++++++++++++
 confluent_kafka/src/confluent_kafka.h            |  208 ++++
 setup.cfg                                        |    5 +
 setup.py                                         |   21 +
 19 files changed, 3644 insertions(+)

diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..e06d208
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..7b32b9e
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,3 @@
+include README.md
+include LICENSE
+include confluent_kafka/src/*.[ch]
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..2ef20cb
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,10 @@
+Metadata-Version: 1.0
+Name: confluent-kafka
+Version: 0.9.1.2
+Summary: Confluent's Apache Kafka client for Python
+Home-page: https://github.com/confluentinc/confluent-kafka-python
+Author: Confluent Inc
+Author-email: support at confluent.io
+License: UNKNOWN
+Description: UNKNOWN
+Platform: UNKNOWN
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..23bf7ea
--- /dev/null
+++ b/README.md
@@ -0,0 +1,108 @@
+Confluent's Apache Kafka client for Python
+==========================================
+
+Confluent's Kafka client for Python wraps the librdkafka C library, providing
+full Kafka protocol support with great performance and reliability.
+
+The Python bindings provides a high-level Producer and Consumer with support
+for the balanced consumer groups of Apache Kafka 0.9.
+
+See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info.
+
+**License**: [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0)
+
+
+Usage
+=====
+
+**Producer:**
+
+```python
+from confluent_kafka import Producer
+
+p = Producer({'bootstrap.servers': 'mybroker,mybroker2'})
+for data in some_data_source:
+    p.produce('mytopic', data.encode('utf-8'))
+p.flush()
+```
+
+
+**High-level Consumer:**
+
+```python
+from confluent_kafka import Consumer
+
+c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup',
+              'default.topic.config': {'auto.offset.reset': 'smallest'}})
+c.subscribe(['mytopic'])
+while running:
+    msg = c.poll()
+    if not msg.error():
+        print('Received message: %s' % msg.value().decode('utf-8'))
+c.close()
+```
+
+See [examples](examples) for more examples.
+
+
+Prerequisites
+=============
+
+ * Python >= 2.7 or Python 3.x
+ * [librdkafka](https://github.com/edenhill/librdkafka) >= 0.9.1
+
+
+Install
+=======
+
+**Install from PyPi:**
+
+    $ pip install confluent-kafka
+
+
+**Install from source / tarball:**
+
+    $ pip install .
+
+
+Build
+=====
+
+    $ python setup.by build
+
+If librdkafka is installed in a non-standard location provide the include and library directories with:
+
+    $ CPLUS_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python setup.py ...
+
+
+Tests
+=====
+
+
+**Run unit-tests:**
+
+    $ py.test
+
+**NOTE**: Requires `py.test`, install by `pip install pytest`
+
+
+**Run integration tests:**
+
+    $ examples/integration_test.py <kafka-broker>
+
+**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'.
+
+
+
+
+Generate documentation
+======================
+Install sphinx and sphinx_rtd_theme packages and then:
+
+    $ make docs
+
+or:
+
+    $ python setup.by build_sphinx
+
+Documentation will be generated in `docs/_build/`.
diff --git a/confluent_kafka.egg-info/PKG-INFO b/confluent_kafka.egg-info/PKG-INFO
new file mode 100644
index 0000000..2ef20cb
--- /dev/null
+++ b/confluent_kafka.egg-info/PKG-INFO
@@ -0,0 +1,10 @@
+Metadata-Version: 1.0
+Name: confluent-kafka
+Version: 0.9.1.2
+Summary: Confluent's Apache Kafka client for Python
+Home-page: https://github.com/confluentinc/confluent-kafka-python
+Author: Confluent Inc
+Author-email: support at confluent.io
+License: UNKNOWN
+Description: UNKNOWN
+Platform: UNKNOWN
diff --git a/confluent_kafka.egg-info/SOURCES.txt b/confluent_kafka.egg-info/SOURCES.txt
new file mode 100644
index 0000000..bc78298
--- /dev/null
+++ b/confluent_kafka.egg-info/SOURCES.txt
@@ -0,0 +1,17 @@
+LICENSE
+MANIFEST.in
+README.md
+setup.py
+confluent_kafka/__init__.py
+confluent_kafka.egg-info/PKG-INFO
+confluent_kafka.egg-info/SOURCES.txt
+confluent_kafka.egg-info/dependency_links.txt
+confluent_kafka.egg-info/top_level.txt
+confluent_kafka/kafkatest/__init__.py
+confluent_kafka/kafkatest/verifiable_client.py
+confluent_kafka/kafkatest/verifiable_consumer.py
+confluent_kafka/kafkatest/verifiable_producer.py
+confluent_kafka/src/Consumer.c
+confluent_kafka/src/Producer.c
+confluent_kafka/src/confluent_kafka.c
+confluent_kafka/src/confluent_kafka.h
\ No newline at end of file
diff --git a/confluent_kafka.egg-info/dependency_links.txt b/confluent_kafka.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/confluent_kafka.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/confluent_kafka.egg-info/top_level.txt b/confluent_kafka.egg-info/top_level.txt
new file mode 100644
index 0000000..1975722
--- /dev/null
+++ b/confluent_kafka.egg-info/top_level.txt
@@ -0,0 +1 @@
+confluent_kafka
diff --git a/confluent_kafka/__init__.py b/confluent_kafka/__init__.py
new file mode 100644
index 0000000..d998bec
--- /dev/null
+++ b/confluent_kafka/__init__.py
@@ -0,0 +1,2 @@
+__all__ = ['cimpl','kafkatest']
+from .cimpl import *
diff --git a/confluent_kafka/kafkatest/__init__.py b/confluent_kafka/kafkatest/__init__.py
new file mode 100644
index 0000000..eea8ccb
--- /dev/null
+++ b/confluent_kafka/kafkatest/__init__.py
@@ -0,0 +1 @@
+""" Python client implementations of the official Kafka tests/kafkatest clients. """
diff --git a/confluent_kafka/kafkatest/verifiable_client.py b/confluent_kafka/kafkatest/verifiable_client.py
new file mode 100644
index 0000000..a0eb1e1
--- /dev/null
+++ b/confluent_kafka/kafkatest/verifiable_client.py
@@ -0,0 +1,80 @@
+# Copyright 2016 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import signal, socket, os, sys, time, json, re, datetime
+
+
+class VerifiableClient(object):
+    """
+    Generic base class for a kafkatest verifiable client.
+    Implements the common kafkatest protocol and semantics.
+    """
+    def __init__ (self, conf):
+        """
+        """
+        super(VerifiableClient, self).__init__()
+        self.conf = conf
+        self.conf['client.id'] = 'python@' + socket.gethostname()
+        self.run = True
+        signal.signal(signal.SIGTERM, self.sig_term)
+        self.dbg('Pid is %d' % os.getpid())
+
+    def sig_term (self, sig, frame):
+        self.dbg('SIGTERM')
+        self.run = False
+
+    @staticmethod
+    def _timestamp ():
+        return time.strftime('%H:%M:%S', time.localtime())
+
+    def dbg (self, s):
+        """ Debugging printout """
+        sys.stderr.write('%% %s DEBUG: %s\n' % (self._timestamp(), s))
+
+    def err (self, s, term=False):
+        """ Error printout, if term=True the process will terminate immediately. """
+        sys.stderr.write('%% %s ERROR: %s\n' % (self._timestamp(), s))
+        if term:
+            sys.stderr.write('%% FATAL ERROR ^\n')
+            sys.exit(1)
+
+    def send (self, d):
+        """ Send dict as JSON to stdout for consumtion by kafkatest handler """
+        d['_time'] = str(datetime.datetime.now())
+        self.dbg('SEND: %s' % json.dumps(d))
+        sys.stdout.write('%s\n' % json.dumps(d))
+        sys.stdout.flush()
+
+
+    @staticmethod
+    def set_config (conf, args):
+        """ Set client config properties using args dict. """
+        for n,v in args.iteritems():
+            if v is None:
+                continue
+            # Things to ignore
+            if '.' not in n:
+                # App config, skip
+                continue
+            if n.startswith('topic.'):
+                # Set "topic.<...>" properties on default topic conf dict
+                conf['default.topic.config'][n[6:]] = v
+            elif n == 'partition.assignment.strategy':
+                # Convert Java class name to config value.
+                # "org.apache.kafka.clients.consumer.RangeAssignor" -> "range"
+                conf[n] = re.sub(r'org.apache.kafka.clients.consumer.(\w+)Assignor',
+                                 lambda x: x.group(1).lower(), v)
+            else:
+                conf[n] = v
diff --git a/confluent_kafka/kafkatest/verifiable_consumer.py b/confluent_kafka/kafkatest/verifiable_consumer.py
new file mode 100755
index 0000000..3e01367
--- /dev/null
+++ b/confluent_kafka/kafkatest/verifiable_consumer.py
@@ -0,0 +1,288 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse, sys
+from confluent_kafka import Consumer, KafkaError, KafkaException
+from verifiable_client import VerifiableClient
+
+class VerifiableConsumer(VerifiableClient):
+    """
+    confluent-kafka-python backed VerifiableConsumer class for use with
+    Kafka's kafkatests client tests.
+    """
+    def __init__ (self, conf):
+        """
+        \p conf is a config dict passed to confluent_kafka.Consumer()
+        """
+        super(VerifiableConsumer, self).__init__(conf)
+        self.conf['on_commit'] = self.on_commit
+        self.consumer = Consumer(**conf)
+        self.consumed_msgs = 0
+        self.consumed_msgs_last_reported = 0
+        self.consumed_msgs_at_last_commit = 0
+        self.use_auto_commit = False
+        self.use_async_commit = False
+        self.max_msgs = -1
+        self.assignment = []
+        self.assignment_dict = dict()
+
+
+    def find_assignment (self, topic, partition):
+        """ Find and return existing assignment based on \p topic and \p partition,
+        or None on miss. """
+        skey = '%s %d' % (topic, partition)
+        return self.assignment_dict.get(skey)
+
+
+    def send_records_consumed (self, immediate=False):
+        """ Send records_consumed, every 100 messages, on timeout,
+            or if immediate is set. """
+        if (self.consumed_msgs <= self.consumed_msgs_last_reported +
+            (0 if immediate else 100)):
+            return
+
+        if len(self.assignment) == 0:
+            return
+
+        d = {'name': 'records_consumed',
+             'count': self.consumed_msgs - self.consumed_msgs_last_reported,
+             'partitions': []}
+
+        for a in self.assignment:
+            if a.min_offset == -1:
+                # Skip partitions that havent had any messages since last time.
+                # This is to circumvent some minOffset checks in kafkatest.
+                continue
+            d['partitions'].append(a.to_dict())
+            a.min_offset = -1
+
+        self.send(d)
+        self.consumed_msgs_last_reported = self.consumed_msgs
+
+
+    def send_assignment (self, evtype, partitions):
+        """ Send assignment update, \p evtype is either 'assigned' or 'revoked' """
+        d = { 'name': 'partitions_' + evtype,
+              'partitions': [{'topic': x.topic, 'partition': x.partition} for x in partitions]}
+        self.send(d)
+
+
+    def on_assign (self, consumer, partitions):
+        """ Rebalance on_assign callback """
+        old_assignment = self.assignment
+        self.assignment = [AssignedPartition(p.topic, p.partition) for p in partitions]
+        # Move over our last seen offsets so that we can report a proper
+        # minOffset even after a rebalance loop.
+        for a in old_assignment:
+            b = self.find_assignment(a.topic, a.partition)
+            b.min_offset = a.min_offset
+
+        self.assignment_dict = {a.skey: a for a in self.assignment}
+        self.send_assignment('assigned', partitions)
+
+    def on_revoke (self, consumer, partitions):
+        """ Rebalance on_revoke callback """
+        # Send final consumed records prior to rebalancing to make sure
+        # latest consumed is in par with what is going to be committed.
+        self.send_records_consumed(immediate=True)
+        self.assignment = list()
+        self.assignment_dict = dict()
+        self.send_assignment('revoked', partitions)
+        self.do_commit(immediate=True)
+
+
+    def on_commit (self, err, partitions):
+        """ Offsets Committed callback """
+        if err is not None and err.code() == KafkaError._NO_OFFSET:
+            self.dbg('on_commit(): no offsets to commit')
+            return
+
+        # Report consumed messages to make sure consumed position >= committed position
+        self.send_records_consumed(immediate=True)
+
+        d = {'name': 'offsets_committed',
+             'offsets': []}
+
+        if err is not None:
+            d['success'] = False
+            d['error'] = str(err)
+        else:
+            d['success'] = True
+            d['error'] = ''
+
+        for p in partitions:
+            pd = {'topic': p.topic, 'partition': p.partition,
+                  'offset': p.offset, 'error': str(p.error)}
+            d['offsets'].append(pd)
+
+        self.send(d)
+
+
+    def do_commit (self, immediate=False, async=None):
+        """ Commit every 1000 messages or whenever there is a consume timeout
+            or immediate. """
+        if (self.use_auto_commit or
+            self.consumed_msgs_at_last_commit + (0 if immediate else 1000) >
+            self.consumed_msgs):
+            return
+
+        # Make sure we report consumption before commit,
+        # otherwise tests may fail because of commit > consumed
+        if self.consumed_msgs_at_last_commit < self.consumed_msgs:
+            self.send_records_consumed(immediate=True)
+
+        if async is None:
+            async_mode = self.use_async_commit
+        else:
+            async_mode = async
+
+        self.dbg('Committing %d messages (Async=%s)' %
+                 (self.consumed_msgs - self.consumed_msgs_at_last_commit,
+                  async_mode))
+
+        try:
+            self.consumer.commit(async=async_mode)
+        except KafkaException as e:
+            if e.args[0].code() == KafkaError._WAIT_COORD:
+                self.dbg('Ignoring commit failure, still waiting for coordinator')
+            elif e.args[0].code() == KafkaError._NO_OFFSET:
+                self.dbg('No offsets to commit')
+            else:
+                raise
+
+        self.consumed_msgs_at_last_commit = self.consumed_msgs
+
+
+    def msg_consume (self, msg):
+        """ Handle consumed message (or error event) """
+        if msg.error():
+            if msg.error().code() == KafkaError._PARTITION_EOF:
+                # ignore EOF
+                pass
+            else:
+                self.err('Consume failed: %s' % msg.error(), term=True)
+            return
+
+        if False:
+            self.dbg('Read msg from %s [%d] @ %d' % \
+                     (msg.topic(), msg.partition(), msg.offset()))
+
+        if self.max_msgs >= 0 and self.consumed_msgs >= self.max_msgs:
+            return # ignore extra messages
+
+        # Find assignment.
+        a = self.find_assignment(msg.topic(), msg.partition())
+        if a is None:
+            self.err('Received message on unassigned partition %s [%d] @ %d' %
+                     (msg.topic(), msg.partition(), msg.offset()), term=True)
+
+        a.consumed_msgs += 1
+        if a.min_offset == -1:
+            a.min_offset = msg.offset()
+        if a.max_offset < msg.offset():
+            a.max_offset = msg.offset()
+
+        self.consumed_msgs += 1
+
+        self.send_records_consumed(immediate=False)
+        self.do_commit(immediate=False)
+
+
+class AssignedPartition(object):
+    """ Local state container for assigned partition. """
+    def __init__ (self, topic, partition):
+        super(AssignedPartition, self).__init__()
+        self.topic = topic
+        self.partition = partition
+        self.skey = '%s %d' % (self.topic, self.partition)
+        self.consumed_msgs = 0
+        self.min_offset = -1
+        self.max_offset = 0
+
+    def to_dict (self):
+        """ Return a dict of this partition's state """
+        return {'topic': self.topic, 'partition': self.partition,
+                'minOffset': self.min_offset, 'maxOffset': self.max_offset}
+
+
+        
+
+    
+
+
+
+
+        
+
+if __name__ == '__main__':
+
+    parser = argparse.ArgumentParser(description='Verifiable Python Consumer')
+    parser.add_argument('--topic', action='append', type=str, required=True)
+    parser.add_argument('--group-id', dest='group.id', required=True)
+    parser.add_argument('--broker-list', dest='bootstrap.servers', required=True)
+    parser.add_argument('--session-timeout', type=int, dest='session.timeout.ms', default=6000)
+    parser.add_argument('--enable-autocommit', action='store_true', dest='enable.auto.commit', default=False)
+    parser.add_argument('--max-messages', type=int, dest='max_messages', default=-1)
+    parser.add_argument('--assignment-strategy', dest='partition.assignment.strategy')
+    parser.add_argument('--reset-policy', dest='topic.auto.offset.reset', default='earliest')
+    parser.add_argument('--consumer.config', dest='consumer_config')
+    args = vars(parser.parse_args())
+
+    conf = {'broker.version.fallback': '0.9.0',
+            'default.topic.config': dict()}
+
+    VerifiableClient.set_config(conf, args)
+
+    vc = VerifiableConsumer(conf)
+    vc.use_auto_commit = args['enable.auto.commit']
+    vc.max_msgs = args['max_messages']
+
+    vc.dbg('Using config: %s' % conf)
+
+    vc.dbg('Subscribing to %s' % args['topic'])
+    vc.consumer.subscribe(args['topic'],
+                          on_assign=vc.on_assign, on_revoke=vc.on_revoke)
+
+
+    try:
+        while vc.run:
+            msg = vc.consumer.poll(timeout=1.0)
+            if msg is None:
+                # Timeout.
+                # Try reporting consumed messages
+                vc.send_records_consumed(immediate=True)
+                # Commit every poll() timeout instead of on every message.
+                # Also commit on every 1000 messages, whichever comes first.
+                vc.do_commit(immediate=True)
+                continue
+
+            # Handle message (or error event)
+            vc.msg_consume(msg)
+
+    except KeyboardInterrupt:
+        pass
+
+    vc.dbg('Closing consumer')
+    vc.send_records_consumed(immediate=True)
+    if not vc.use_auto_commit:
+        vc.do_commit(immediate=True, async=False)
+
+    vc.consumer.close()
+
+    vc.send({'name': 'shutdown_complete'})
+
+    vc.dbg('All done')
diff --git a/confluent_kafka/kafkatest/verifiable_producer.py b/confluent_kafka/kafkatest/verifiable_producer.py
new file mode 100755
index 0000000..af188a2
--- /dev/null
+++ b/confluent_kafka/kafkatest/verifiable_producer.py
@@ -0,0 +1,141 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse, time
+from confluent_kafka import Producer, KafkaError, KafkaException
+from verifiable_client import VerifiableClient
+
+class VerifiableProducer(VerifiableClient):
+    """
+    confluent-kafka-python backed VerifiableProducer class for use with
+    Kafka's kafkatests client tests.
+    """
+    def __init__ (self, conf):
+        """
+        \p conf is a config dict passed to confluent_kafka.Producer()
+        """
+        super(VerifiableProducer, self).__init__(conf)
+        self.conf['on_delivery'] = self.dr_cb
+        self.conf['default.topic.config']['produce.offset.report'] = True
+        self.producer = Producer(**self.conf)
+        self.num_acked = 0
+        self.num_sent = 0
+        self.num_err = 0
+
+    def dr_cb (self, err, msg):
+        """ Per-message Delivery report callback. Called from poll() """
+        if err:
+            self.num_err += 1
+            self.send({'name': 'producer_send_error',
+                       'message': str(err),
+                       'topic': msg.topic(),
+                       'key': msg.key(),
+                       'value': msg.value()})
+        else:
+            self.num_acked += 1
+            self.send({'name': 'producer_send_success',
+                       'topic': msg.topic(),
+                       'partition': msg.partition(),
+                       'offset': msg.offset(),
+                       'key': msg.key(),
+                       'value': msg.value()})
+                       
+        pass
+
+
+
+
+        
+
+if __name__ == '__main__':
+
+    parser = argparse.ArgumentParser(description='Verifiable Python Producer')
+    parser.add_argument('--topic', type=str, required=True)
+    parser.add_argument('--throughput', type=int, default=0)
+    parser.add_argument('--broker-list', dest='bootstrap.servers', required=True)
+    parser.add_argument('--max-messages', type=int, dest='max_msgs', default=1000000) # avoid infinite
+    parser.add_argument('--value-prefix', dest='value_prefix', type=str, default=None)
+    parser.add_argument('--acks', type=int, dest='topic.request.required.acks', default=-1)
+    parser.add_argument('--producer.config', dest='producer_config')
+    args = vars(parser.parse_args())
+
+    conf = {'broker.version.fallback': '0.9.0',
+            'default.topic.config': dict()}
+
+    VerifiableClient.set_config(conf, args)
+
+    vp = VerifiableProducer(conf)
+
+    vp.max_msgs = args['max_msgs']
+    throughput = args['throughput']
+    topic = args['topic']
+    if args['value_prefix'] is not None:
+        value_fmt = args['value_prefix'] + '.%d'
+    else:
+        value_fmt = '%d'
+
+    if throughput > 0:
+        delay = 1.0/throughput
+    else:
+        delay = 0
+
+    vp.dbg('Producing %d messages at a rate of %d/s' % (vp.max_msgs, throughput))
+
+    try:
+        for i in range(0, vp.max_msgs):
+            if not vp.run:
+                break
+
+            t_end = time.time() + delay
+            while vp.run:
+                try:
+                    vp.producer.produce(topic, value=(value_fmt % i))
+                    vp.num_sent += 1
+                except KafkaException as e:
+                    self.err('produce() #%d/%d failed: %s' % \
+                             (i, vp.max_msgs, str(e)))
+                    vp.num_err += 1
+                except BufferError:
+                    vp.dbg('Local produce queue full (produced %d/%d msgs), waiting for deliveries..' % \
+                           (i, vp.max_msgs))
+                    vp.producer.poll(timeout=0.5)
+                    continue
+                break
+
+
+            # Delay to achieve desired throughput,
+            # but make sure poll is called at least once
+            # to serve DRs.
+            while True:
+                remaining = max(0, t_end - time.time())
+                vp.producer.poll(timeout=remaining)
+                if remaining <= 0.00000001:
+                    break
+
+    except KeyboardInterrupt:
+        pass
+
+    # Flush remaining messages to broker.
+    vp.dbg('Flushing')
+    try:
+        vp.producer.flush()
+    except KeyboardInterrupt:
+        pass
+    
+    vp.send({'name': 'shutdown_complete'})
+
+    vp.dbg('All done')
diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c
new file mode 100644
index 0000000..80e136a
--- /dev/null
+++ b/confluent_kafka/src/Consumer.c
@@ -0,0 +1,721 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "confluent_kafka.h"
+
+
+/****************************************************************************
+ *
+ *
+ * Consumer
+ *
... 2786 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-confluent-kafka.git



More information about the Python-modules-commits mailing list