[Python-modules-commits] [python-confluent-kafka] 01/05: New upstream version 0.9.1.2
Christos Trochalakis
ctrochalakis-guest at moszumanska.debian.org
Fri Sep 23 08:51:55 UTC 2016
This is an automated email from the git hooks/post-receive script.
ctrochalakis-guest pushed a commit to branch master
in repository python-confluent-kafka.
commit ab89e71fb2d0fda472a811916c3e9f5fc0520286
Author: Christos Trochalakis <yatiohi at ideopolis.gr>
Date: Wed Sep 7 10:32:26 2016 +0300
New upstream version 0.9.1.2
---
LICENSE | 202 ++++
MANIFEST.in | 3 +
PKG-INFO | 10 +
README.md | 108 ++
confluent_kafka.egg-info/PKG-INFO | 10 +
confluent_kafka.egg-info/SOURCES.txt | 17 +
confluent_kafka.egg-info/dependency_links.txt | 1 +
confluent_kafka.egg-info/top_level.txt | 1 +
confluent_kafka/__init__.py | 2 +
confluent_kafka/kafkatest/__init__.py | 1 +
confluent_kafka/kafkatest/verifiable_client.py | 80 ++
confluent_kafka/kafkatest/verifiable_consumer.py | 288 +++++
confluent_kafka/kafkatest/verifiable_producer.py | 141 +++
confluent_kafka/src/Consumer.c | 721 ++++++++++++
confluent_kafka/src/Producer.c | 516 +++++++++
confluent_kafka/src/confluent_kafka.c | 1309 ++++++++++++++++++++++
confluent_kafka/src/confluent_kafka.h | 208 ++++
setup.cfg | 5 +
setup.py | 21 +
19 files changed, 3644 insertions(+)
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..e06d208
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..7b32b9e
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,3 @@
+include README.md
+include LICENSE
+include confluent_kafka/src/*.[ch]
diff --git a/PKG-INFO b/PKG-INFO
new file mode 100644
index 0000000..2ef20cb
--- /dev/null
+++ b/PKG-INFO
@@ -0,0 +1,10 @@
+Metadata-Version: 1.0
+Name: confluent-kafka
+Version: 0.9.1.2
+Summary: Confluent's Apache Kafka client for Python
+Home-page: https://github.com/confluentinc/confluent-kafka-python
+Author: Confluent Inc
+Author-email: support at confluent.io
+License: UNKNOWN
+Description: UNKNOWN
+Platform: UNKNOWN
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..23bf7ea
--- /dev/null
+++ b/README.md
@@ -0,0 +1,108 @@
+Confluent's Apache Kafka client for Python
+==========================================
+
+Confluent's Kafka client for Python wraps the librdkafka C library, providing
+full Kafka protocol support with great performance and reliability.
+
+The Python bindings provides a high-level Producer and Consumer with support
+for the balanced consumer groups of Apache Kafka 0.9.
+
+See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info.
+
+**License**: [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0)
+
+
+Usage
+=====
+
+**Producer:**
+
+```python
+from confluent_kafka import Producer
+
+p = Producer({'bootstrap.servers': 'mybroker,mybroker2'})
+for data in some_data_source:
+ p.produce('mytopic', data.encode('utf-8'))
+p.flush()
+```
+
+
+**High-level Consumer:**
+
+```python
+from confluent_kafka import Consumer
+
+c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup',
+ 'default.topic.config': {'auto.offset.reset': 'smallest'}})
+c.subscribe(['mytopic'])
+while running:
+ msg = c.poll()
+ if not msg.error():
+ print('Received message: %s' % msg.value().decode('utf-8'))
+c.close()
+```
+
+See [examples](examples) for more examples.
+
+
+Prerequisites
+=============
+
+ * Python >= 2.7 or Python 3.x
+ * [librdkafka](https://github.com/edenhill/librdkafka) >= 0.9.1
+
+
+Install
+=======
+
+**Install from PyPi:**
+
+ $ pip install confluent-kafka
+
+
+**Install from source / tarball:**
+
+ $ pip install .
+
+
+Build
+=====
+
+ $ python setup.by build
+
+If librdkafka is installed in a non-standard location provide the include and library directories with:
+
+ $ CPLUS_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python setup.py ...
+
+
+Tests
+=====
+
+
+**Run unit-tests:**
+
+ $ py.test
+
+**NOTE**: Requires `py.test`, install by `pip install pytest`
+
+
+**Run integration tests:**
+
+ $ examples/integration_test.py <kafka-broker>
+
+**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'.
+
+
+
+
+Generate documentation
+======================
+Install sphinx and sphinx_rtd_theme packages and then:
+
+ $ make docs
+
+or:
+
+ $ python setup.by build_sphinx
+
+Documentation will be generated in `docs/_build/`.
diff --git a/confluent_kafka.egg-info/PKG-INFO b/confluent_kafka.egg-info/PKG-INFO
new file mode 100644
index 0000000..2ef20cb
--- /dev/null
+++ b/confluent_kafka.egg-info/PKG-INFO
@@ -0,0 +1,10 @@
+Metadata-Version: 1.0
+Name: confluent-kafka
+Version: 0.9.1.2
+Summary: Confluent's Apache Kafka client for Python
+Home-page: https://github.com/confluentinc/confluent-kafka-python
+Author: Confluent Inc
+Author-email: support at confluent.io
+License: UNKNOWN
+Description: UNKNOWN
+Platform: UNKNOWN
diff --git a/confluent_kafka.egg-info/SOURCES.txt b/confluent_kafka.egg-info/SOURCES.txt
new file mode 100644
index 0000000..bc78298
--- /dev/null
+++ b/confluent_kafka.egg-info/SOURCES.txt
@@ -0,0 +1,17 @@
+LICENSE
+MANIFEST.in
+README.md
+setup.py
+confluent_kafka/__init__.py
+confluent_kafka.egg-info/PKG-INFO
+confluent_kafka.egg-info/SOURCES.txt
+confluent_kafka.egg-info/dependency_links.txt
+confluent_kafka.egg-info/top_level.txt
+confluent_kafka/kafkatest/__init__.py
+confluent_kafka/kafkatest/verifiable_client.py
+confluent_kafka/kafkatest/verifiable_consumer.py
+confluent_kafka/kafkatest/verifiable_producer.py
+confluent_kafka/src/Consumer.c
+confluent_kafka/src/Producer.c
+confluent_kafka/src/confluent_kafka.c
+confluent_kafka/src/confluent_kafka.h
\ No newline at end of file
diff --git a/confluent_kafka.egg-info/dependency_links.txt b/confluent_kafka.egg-info/dependency_links.txt
new file mode 100644
index 0000000..8b13789
--- /dev/null
+++ b/confluent_kafka.egg-info/dependency_links.txt
@@ -0,0 +1 @@
+
diff --git a/confluent_kafka.egg-info/top_level.txt b/confluent_kafka.egg-info/top_level.txt
new file mode 100644
index 0000000..1975722
--- /dev/null
+++ b/confluent_kafka.egg-info/top_level.txt
@@ -0,0 +1 @@
+confluent_kafka
diff --git a/confluent_kafka/__init__.py b/confluent_kafka/__init__.py
new file mode 100644
index 0000000..d998bec
--- /dev/null
+++ b/confluent_kafka/__init__.py
@@ -0,0 +1,2 @@
+__all__ = ['cimpl','kafkatest']
+from .cimpl import *
diff --git a/confluent_kafka/kafkatest/__init__.py b/confluent_kafka/kafkatest/__init__.py
new file mode 100644
index 0000000..eea8ccb
--- /dev/null
+++ b/confluent_kafka/kafkatest/__init__.py
@@ -0,0 +1 @@
+""" Python client implementations of the official Kafka tests/kafkatest clients. """
diff --git a/confluent_kafka/kafkatest/verifiable_client.py b/confluent_kafka/kafkatest/verifiable_client.py
new file mode 100644
index 0000000..a0eb1e1
--- /dev/null
+++ b/confluent_kafka/kafkatest/verifiable_client.py
@@ -0,0 +1,80 @@
+# Copyright 2016 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import signal, socket, os, sys, time, json, re, datetime
+
+
+class VerifiableClient(object):
+ """
+ Generic base class for a kafkatest verifiable client.
+ Implements the common kafkatest protocol and semantics.
+ """
+ def __init__ (self, conf):
+ """
+ """
+ super(VerifiableClient, self).__init__()
+ self.conf = conf
+ self.conf['client.id'] = 'python@' + socket.gethostname()
+ self.run = True
+ signal.signal(signal.SIGTERM, self.sig_term)
+ self.dbg('Pid is %d' % os.getpid())
+
+ def sig_term (self, sig, frame):
+ self.dbg('SIGTERM')
+ self.run = False
+
+ @staticmethod
+ def _timestamp ():
+ return time.strftime('%H:%M:%S', time.localtime())
+
+ def dbg (self, s):
+ """ Debugging printout """
+ sys.stderr.write('%% %s DEBUG: %s\n' % (self._timestamp(), s))
+
+ def err (self, s, term=False):
+ """ Error printout, if term=True the process will terminate immediately. """
+ sys.stderr.write('%% %s ERROR: %s\n' % (self._timestamp(), s))
+ if term:
+ sys.stderr.write('%% FATAL ERROR ^\n')
+ sys.exit(1)
+
+ def send (self, d):
+ """ Send dict as JSON to stdout for consumtion by kafkatest handler """
+ d['_time'] = str(datetime.datetime.now())
+ self.dbg('SEND: %s' % json.dumps(d))
+ sys.stdout.write('%s\n' % json.dumps(d))
+ sys.stdout.flush()
+
+
+ @staticmethod
+ def set_config (conf, args):
+ """ Set client config properties using args dict. """
+ for n,v in args.iteritems():
+ if v is None:
+ continue
+ # Things to ignore
+ if '.' not in n:
+ # App config, skip
+ continue
+ if n.startswith('topic.'):
+ # Set "topic.<...>" properties on default topic conf dict
+ conf['default.topic.config'][n[6:]] = v
+ elif n == 'partition.assignment.strategy':
+ # Convert Java class name to config value.
+ # "org.apache.kafka.clients.consumer.RangeAssignor" -> "range"
+ conf[n] = re.sub(r'org.apache.kafka.clients.consumer.(\w+)Assignor',
+ lambda x: x.group(1).lower(), v)
+ else:
+ conf[n] = v
diff --git a/confluent_kafka/kafkatest/verifiable_consumer.py b/confluent_kafka/kafkatest/verifiable_consumer.py
new file mode 100755
index 0000000..3e01367
--- /dev/null
+++ b/confluent_kafka/kafkatest/verifiable_consumer.py
@@ -0,0 +1,288 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse, sys
+from confluent_kafka import Consumer, KafkaError, KafkaException
+from verifiable_client import VerifiableClient
+
+class VerifiableConsumer(VerifiableClient):
+ """
+ confluent-kafka-python backed VerifiableConsumer class for use with
+ Kafka's kafkatests client tests.
+ """
+ def __init__ (self, conf):
+ """
+ \p conf is a config dict passed to confluent_kafka.Consumer()
+ """
+ super(VerifiableConsumer, self).__init__(conf)
+ self.conf['on_commit'] = self.on_commit
+ self.consumer = Consumer(**conf)
+ self.consumed_msgs = 0
+ self.consumed_msgs_last_reported = 0
+ self.consumed_msgs_at_last_commit = 0
+ self.use_auto_commit = False
+ self.use_async_commit = False
+ self.max_msgs = -1
+ self.assignment = []
+ self.assignment_dict = dict()
+
+
+ def find_assignment (self, topic, partition):
+ """ Find and return existing assignment based on \p topic and \p partition,
+ or None on miss. """
+ skey = '%s %d' % (topic, partition)
+ return self.assignment_dict.get(skey)
+
+
+ def send_records_consumed (self, immediate=False):
+ """ Send records_consumed, every 100 messages, on timeout,
+ or if immediate is set. """
+ if (self.consumed_msgs <= self.consumed_msgs_last_reported +
+ (0 if immediate else 100)):
+ return
+
+ if len(self.assignment) == 0:
+ return
+
+ d = {'name': 'records_consumed',
+ 'count': self.consumed_msgs - self.consumed_msgs_last_reported,
+ 'partitions': []}
+
+ for a in self.assignment:
+ if a.min_offset == -1:
+ # Skip partitions that havent had any messages since last time.
+ # This is to circumvent some minOffset checks in kafkatest.
+ continue
+ d['partitions'].append(a.to_dict())
+ a.min_offset = -1
+
+ self.send(d)
+ self.consumed_msgs_last_reported = self.consumed_msgs
+
+
+ def send_assignment (self, evtype, partitions):
+ """ Send assignment update, \p evtype is either 'assigned' or 'revoked' """
+ d = { 'name': 'partitions_' + evtype,
+ 'partitions': [{'topic': x.topic, 'partition': x.partition} for x in partitions]}
+ self.send(d)
+
+
+ def on_assign (self, consumer, partitions):
+ """ Rebalance on_assign callback """
+ old_assignment = self.assignment
+ self.assignment = [AssignedPartition(p.topic, p.partition) for p in partitions]
+ # Move over our last seen offsets so that we can report a proper
+ # minOffset even after a rebalance loop.
+ for a in old_assignment:
+ b = self.find_assignment(a.topic, a.partition)
+ b.min_offset = a.min_offset
+
+ self.assignment_dict = {a.skey: a for a in self.assignment}
+ self.send_assignment('assigned', partitions)
+
+ def on_revoke (self, consumer, partitions):
+ """ Rebalance on_revoke callback """
+ # Send final consumed records prior to rebalancing to make sure
+ # latest consumed is in par with what is going to be committed.
+ self.send_records_consumed(immediate=True)
+ self.assignment = list()
+ self.assignment_dict = dict()
+ self.send_assignment('revoked', partitions)
+ self.do_commit(immediate=True)
+
+
+ def on_commit (self, err, partitions):
+ """ Offsets Committed callback """
+ if err is not None and err.code() == KafkaError._NO_OFFSET:
+ self.dbg('on_commit(): no offsets to commit')
+ return
+
+ # Report consumed messages to make sure consumed position >= committed position
+ self.send_records_consumed(immediate=True)
+
+ d = {'name': 'offsets_committed',
+ 'offsets': []}
+
+ if err is not None:
+ d['success'] = False
+ d['error'] = str(err)
+ else:
+ d['success'] = True
+ d['error'] = ''
+
+ for p in partitions:
+ pd = {'topic': p.topic, 'partition': p.partition,
+ 'offset': p.offset, 'error': str(p.error)}
+ d['offsets'].append(pd)
+
+ self.send(d)
+
+
+ def do_commit (self, immediate=False, async=None):
+ """ Commit every 1000 messages or whenever there is a consume timeout
+ or immediate. """
+ if (self.use_auto_commit or
+ self.consumed_msgs_at_last_commit + (0 if immediate else 1000) >
+ self.consumed_msgs):
+ return
+
+ # Make sure we report consumption before commit,
+ # otherwise tests may fail because of commit > consumed
+ if self.consumed_msgs_at_last_commit < self.consumed_msgs:
+ self.send_records_consumed(immediate=True)
+
+ if async is None:
+ async_mode = self.use_async_commit
+ else:
+ async_mode = async
+
+ self.dbg('Committing %d messages (Async=%s)' %
+ (self.consumed_msgs - self.consumed_msgs_at_last_commit,
+ async_mode))
+
+ try:
+ self.consumer.commit(async=async_mode)
+ except KafkaException as e:
+ if e.args[0].code() == KafkaError._WAIT_COORD:
+ self.dbg('Ignoring commit failure, still waiting for coordinator')
+ elif e.args[0].code() == KafkaError._NO_OFFSET:
+ self.dbg('No offsets to commit')
+ else:
+ raise
+
+ self.consumed_msgs_at_last_commit = self.consumed_msgs
+
+
+ def msg_consume (self, msg):
+ """ Handle consumed message (or error event) """
+ if msg.error():
+ if msg.error().code() == KafkaError._PARTITION_EOF:
+ # ignore EOF
+ pass
+ else:
+ self.err('Consume failed: %s' % msg.error(), term=True)
+ return
+
+ if False:
+ self.dbg('Read msg from %s [%d] @ %d' % \
+ (msg.topic(), msg.partition(), msg.offset()))
+
+ if self.max_msgs >= 0 and self.consumed_msgs >= self.max_msgs:
+ return # ignore extra messages
+
+ # Find assignment.
+ a = self.find_assignment(msg.topic(), msg.partition())
+ if a is None:
+ self.err('Received message on unassigned partition %s [%d] @ %d' %
+ (msg.topic(), msg.partition(), msg.offset()), term=True)
+
+ a.consumed_msgs += 1
+ if a.min_offset == -1:
+ a.min_offset = msg.offset()
+ if a.max_offset < msg.offset():
+ a.max_offset = msg.offset()
+
+ self.consumed_msgs += 1
+
+ self.send_records_consumed(immediate=False)
+ self.do_commit(immediate=False)
+
+
+class AssignedPartition(object):
+ """ Local state container for assigned partition. """
+ def __init__ (self, topic, partition):
+ super(AssignedPartition, self).__init__()
+ self.topic = topic
+ self.partition = partition
+ self.skey = '%s %d' % (self.topic, self.partition)
+ self.consumed_msgs = 0
+ self.min_offset = -1
+ self.max_offset = 0
+
+ def to_dict (self):
+ """ Return a dict of this partition's state """
+ return {'topic': self.topic, 'partition': self.partition,
+ 'minOffset': self.min_offset, 'maxOffset': self.max_offset}
+
+
+
+
+
+
+
+
+
+
+
+if __name__ == '__main__':
+
+ parser = argparse.ArgumentParser(description='Verifiable Python Consumer')
+ parser.add_argument('--topic', action='append', type=str, required=True)
+ parser.add_argument('--group-id', dest='group.id', required=True)
+ parser.add_argument('--broker-list', dest='bootstrap.servers', required=True)
+ parser.add_argument('--session-timeout', type=int, dest='session.timeout.ms', default=6000)
+ parser.add_argument('--enable-autocommit', action='store_true', dest='enable.auto.commit', default=False)
+ parser.add_argument('--max-messages', type=int, dest='max_messages', default=-1)
+ parser.add_argument('--assignment-strategy', dest='partition.assignment.strategy')
+ parser.add_argument('--reset-policy', dest='topic.auto.offset.reset', default='earliest')
+ parser.add_argument('--consumer.config', dest='consumer_config')
+ args = vars(parser.parse_args())
+
+ conf = {'broker.version.fallback': '0.9.0',
+ 'default.topic.config': dict()}
+
+ VerifiableClient.set_config(conf, args)
+
+ vc = VerifiableConsumer(conf)
+ vc.use_auto_commit = args['enable.auto.commit']
+ vc.max_msgs = args['max_messages']
+
+ vc.dbg('Using config: %s' % conf)
+
+ vc.dbg('Subscribing to %s' % args['topic'])
+ vc.consumer.subscribe(args['topic'],
+ on_assign=vc.on_assign, on_revoke=vc.on_revoke)
+
+
+ try:
+ while vc.run:
+ msg = vc.consumer.poll(timeout=1.0)
+ if msg is None:
+ # Timeout.
+ # Try reporting consumed messages
+ vc.send_records_consumed(immediate=True)
+ # Commit every poll() timeout instead of on every message.
+ # Also commit on every 1000 messages, whichever comes first.
+ vc.do_commit(immediate=True)
+ continue
+
+ # Handle message (or error event)
+ vc.msg_consume(msg)
+
+ except KeyboardInterrupt:
+ pass
+
+ vc.dbg('Closing consumer')
+ vc.send_records_consumed(immediate=True)
+ if not vc.use_auto_commit:
+ vc.do_commit(immediate=True, async=False)
+
+ vc.consumer.close()
+
+ vc.send({'name': 'shutdown_complete'})
+
+ vc.dbg('All done')
diff --git a/confluent_kafka/kafkatest/verifiable_producer.py b/confluent_kafka/kafkatest/verifiable_producer.py
new file mode 100755
index 0000000..af188a2
--- /dev/null
+++ b/confluent_kafka/kafkatest/verifiable_producer.py
@@ -0,0 +1,141 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse, time
+from confluent_kafka import Producer, KafkaError, KafkaException
+from verifiable_client import VerifiableClient
+
+class VerifiableProducer(VerifiableClient):
+ """
+ confluent-kafka-python backed VerifiableProducer class for use with
+ Kafka's kafkatests client tests.
+ """
+ def __init__ (self, conf):
+ """
+ \p conf is a config dict passed to confluent_kafka.Producer()
+ """
+ super(VerifiableProducer, self).__init__(conf)
+ self.conf['on_delivery'] = self.dr_cb
+ self.conf['default.topic.config']['produce.offset.report'] = True
+ self.producer = Producer(**self.conf)
+ self.num_acked = 0
+ self.num_sent = 0
+ self.num_err = 0
+
+ def dr_cb (self, err, msg):
+ """ Per-message Delivery report callback. Called from poll() """
+ if err:
+ self.num_err += 1
+ self.send({'name': 'producer_send_error',
+ 'message': str(err),
+ 'topic': msg.topic(),
+ 'key': msg.key(),
+ 'value': msg.value()})
+ else:
+ self.num_acked += 1
+ self.send({'name': 'producer_send_success',
+ 'topic': msg.topic(),
+ 'partition': msg.partition(),
+ 'offset': msg.offset(),
+ 'key': msg.key(),
+ 'value': msg.value()})
+
+ pass
+
+
+
+
+
+
+if __name__ == '__main__':
+
+ parser = argparse.ArgumentParser(description='Verifiable Python Producer')
+ parser.add_argument('--topic', type=str, required=True)
+ parser.add_argument('--throughput', type=int, default=0)
+ parser.add_argument('--broker-list', dest='bootstrap.servers', required=True)
+ parser.add_argument('--max-messages', type=int, dest='max_msgs', default=1000000) # avoid infinite
+ parser.add_argument('--value-prefix', dest='value_prefix', type=str, default=None)
+ parser.add_argument('--acks', type=int, dest='topic.request.required.acks', default=-1)
+ parser.add_argument('--producer.config', dest='producer_config')
+ args = vars(parser.parse_args())
+
+ conf = {'broker.version.fallback': '0.9.0',
+ 'default.topic.config': dict()}
+
+ VerifiableClient.set_config(conf, args)
+
+ vp = VerifiableProducer(conf)
+
+ vp.max_msgs = args['max_msgs']
+ throughput = args['throughput']
+ topic = args['topic']
+ if args['value_prefix'] is not None:
+ value_fmt = args['value_prefix'] + '.%d'
+ else:
+ value_fmt = '%d'
+
+ if throughput > 0:
+ delay = 1.0/throughput
+ else:
+ delay = 0
+
+ vp.dbg('Producing %d messages at a rate of %d/s' % (vp.max_msgs, throughput))
+
+ try:
+ for i in range(0, vp.max_msgs):
+ if not vp.run:
+ break
+
+ t_end = time.time() + delay
+ while vp.run:
+ try:
+ vp.producer.produce(topic, value=(value_fmt % i))
+ vp.num_sent += 1
+ except KafkaException as e:
+ self.err('produce() #%d/%d failed: %s' % \
+ (i, vp.max_msgs, str(e)))
+ vp.num_err += 1
+ except BufferError:
+ vp.dbg('Local produce queue full (produced %d/%d msgs), waiting for deliveries..' % \
+ (i, vp.max_msgs))
+ vp.producer.poll(timeout=0.5)
+ continue
+ break
+
+
+ # Delay to achieve desired throughput,
+ # but make sure poll is called at least once
+ # to serve DRs.
+ while True:
+ remaining = max(0, t_end - time.time())
+ vp.producer.poll(timeout=remaining)
+ if remaining <= 0.00000001:
+ break
+
+ except KeyboardInterrupt:
+ pass
+
+ # Flush remaining messages to broker.
+ vp.dbg('Flushing')
+ try:
+ vp.producer.flush()
+ except KeyboardInterrupt:
+ pass
+
+ vp.send({'name': 'shutdown_complete'})
+
+ vp.dbg('All done')
diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c
new file mode 100644
index 0000000..80e136a
--- /dev/null
+++ b/confluent_kafka/src/Consumer.c
@@ -0,0 +1,721 @@
+/**
+ * Copyright 2016 Confluent Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "confluent_kafka.h"
+
+
+/****************************************************************************
+ *
+ *
+ * Consumer
+ *
... 2786 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-confluent-kafka.git
More information about the Python-modules-commits
mailing list