[Python-modules-commits] [python-confluent-kafka] 01/06: Import python-confluent-kafka_0.11.0.orig.tar.gz

Christos Trochalakis ctrochalakis at moszumanska.debian.org
Thu Sep 14 07:32:56 UTC 2017


This is an automated email from the git hooks/post-receive script.

ctrochalakis pushed a commit to branch master
in repository python-confluent-kafka.

commit fe76fbdcf9b80186f9a3e4daef5a50b7e8380d22
Author: Christos Trochalakis <ctrochalakis at debian.org>
Date:   Tue Sep 12 17:16:27 2017 +0300

    Import python-confluent-kafka_0.11.0.orig.tar.gz
---
 PKG-INFO                                           |  2 +-
 README.md                                          | 48 ++++++++++---
 confluent_kafka.egg-info/PKG-INFO                  |  2 +-
 confluent_kafka.egg-info/SOURCES.txt               |  2 +
 confluent_kafka.egg-info/requires.txt              |  3 +-
 confluent_kafka/__init__.py                        |  2 +-
 confluent_kafka/avro/__init__.py                   | 84 ++++++----------------
 .../avro/cached_schema_registry_client.py          |  8 ++-
 confluent_kafka/avro/error.py                      | 31 ++++++++
 confluent_kafka/avro/load.py                       | 47 ++++++++++++
 .../avro/serializer/message_serializer.py          | 22 +++---
 confluent_kafka/kafkatest/verifiable_client.py     | 26 ++++---
 confluent_kafka/kafkatest/verifiable_consumer.py   | 59 ++++++---------
 confluent_kafka/kafkatest/verifiable_producer.py   | 26 +++----
 confluent_kafka/src/Consumer.c                     |  4 +-
 confluent_kafka/src/Producer.c                     | 16 +++--
 confluent_kafka/src/confluent_kafka.c              | 69 ++++++++++++------
 confluent_kafka/src/confluent_kafka.h              |  6 +-
 setup.cfg                                          |  1 -
 setup.py                                           | 12 ++--
 tests/avro/data_gen.py                             |  8 +--
 tests/avro/mock_registry.py                        | 13 ++--
 tests/avro/test_avro_producer.py                   | 23 +++++-
 tests/avro/test_message_serializer.py              |  5 ++
 24 files changed, 323 insertions(+), 196 deletions(-)

diff --git a/PKG-INFO b/PKG-INFO
index 4fc2d8f..d1a6f13 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: confluent-kafka
-Version: 0.9.4
+Version: 0.11.0
 Summary: Confluent's Apache Kafka client for Python
 Home-page: https://github.com/confluentinc/confluent-kafka-python
 Author: Confluent Inc
diff --git a/README.md b/README.md
index 9490751..704dd78 100644
--- a/README.md
+++ b/README.md
@@ -1,8 +1,27 @@
-Confluent's Apache Kafka client for Python
-==========================================
+Confluent's Python Client for Apache Kafka<sup>TM</sup>
+=======================================================
 
-Confluent's Kafka client for Python wraps the librdkafka C library, providing
-full Kafka protocol support with great performance and reliability.
+**confluent-kafka-python** is Confluent's Python client for [Apache Kafka](http://kafka.apache.org/) and the
+[Confluent Platform](https://www.confluent.io/product/compare/).
+
+Features:
+
+- **High performance** - confluent-kafka-python is a lightweight wrapper around
+[librdkafka](https://github.com/edenhill/librdkafka), a finely tuned C
+client.
+
+- **Reliability** - There are a lot of details to get right when writing an Apache Kafka
+client. We get them right in one place (librdkafka) and leverage this work
+across all of our clients (also [confluent-kafka-go](https://github.com/confluentinc/confluent-kafka-go)
+and [confluent-kafka-dotnet](https://github.com/confluentinc/confluent-kafka-dotnet)).
+
+- **Supported** - Commercial support is offered by
+[Confluent](https://confluent.io/).
+
+- **Future proof** - Confluent, founded by the
+creators of Kafka, is building a [streaming platform](https://www.confluent.io/product/compare/)
+with Apache Kafka at its core. It's high priority for us that client features keep
+pace with core Apache Kafka and components of the [Confluent Platform](https://www.confluent.io/product/compare/).
 
 The Python bindings provides a high-level Producer and Consumer with support
 for the balanced consumer groups of Apache Kafka 0.9.
@@ -90,7 +109,7 @@ c.close()
 See [examples](examples) for more examples.
 
 
-Broker compatibility
+Broker Compatibility
 ====================
 The Python client (as well as the underlying C library librdkafka) supports
 all broker versions >= 0.8.
@@ -118,6 +137,15 @@ Prerequisites
 
  * Python >= 2.7 or Python 3.x
  * [librdkafka](https://github.com/edenhill/librdkafka) >= 0.9.1
+ 
+ 
+For **Debian/Ubuntu**** based systems, add this APT repo and then do `sudo apt-get install librdkafka-dev python-dev`:
+http://docs.confluent.io/current/installation.html#installation-apt
+
+For **RedHat** and **RPM**-based distros, add this YUM repo and then do `sudo yum install librdkafka-devel python-devel`:
+http://docs.confluent.io/current/installation.html#rpm-packages-via-yum
+
+On **OSX**, use **homebrew** and do `sudo brew install librdkafka`
 
 
 Install
@@ -164,7 +192,7 @@ In order to run full test suite, simply execute:
 
 **Run integration tests:**
 
-To run the integration tests, uncomment the following line from `tox.ini` and add the paths to your Kafka and Confluent Schema Registry instances. If no Schema Registry path is provided then no AVRO tests will by run. You can also run the integration tests outside of `tox` by running this command from the source root.
+To run the integration tests, uncomment the following line from `tox.ini` and add the paths to your Kafka and Confluent Schema Registry instances. You can also run the integration tests outside of `tox` by running this command from the source root.
 
     examples/integration_test.py <kafka-broker> [<test-topic>] [<schema-registry>]
 
@@ -173,9 +201,13 @@ To run the integration tests, uncomment the following line from `tox.ini` and ad
 
 
 
-Generate documentation
+Generate Documentation
 ======================
-Install sphinx and sphinx_rtd_theme packages and then:
+Install sphinx and sphinx_rtd_theme packages:
+
+    $ pip install sphinx sphinx_rtd_theme
+
+Build HTML docs:
 
     $ make docs
 
diff --git a/confluent_kafka.egg-info/PKG-INFO b/confluent_kafka.egg-info/PKG-INFO
index 4fc2d8f..d1a6f13 100644
--- a/confluent_kafka.egg-info/PKG-INFO
+++ b/confluent_kafka.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: confluent-kafka
-Version: 0.9.4
+Version: 0.11.0
 Summary: Confluent's Apache Kafka client for Python
 Home-page: https://github.com/confluentinc/confluent-kafka-python
 Author: Confluent Inc
diff --git a/confluent_kafka.egg-info/SOURCES.txt b/confluent_kafka.egg-info/SOURCES.txt
index e92f2d2..59e68f9 100644
--- a/confluent_kafka.egg-info/SOURCES.txt
+++ b/confluent_kafka.egg-info/SOURCES.txt
@@ -10,6 +10,8 @@ confluent_kafka.egg-info/requires.txt
 confluent_kafka.egg-info/top_level.txt
 confluent_kafka/avro/__init__.py
 confluent_kafka/avro/cached_schema_registry_client.py
+confluent_kafka/avro/error.py
+confluent_kafka/avro/load.py
 confluent_kafka/avro/serializer/__init__.py
 confluent_kafka/avro/serializer/message_serializer.py
 confluent_kafka/kafkatest/__init__.py
diff --git a/confluent_kafka.egg-info/requires.txt b/confluent_kafka.egg-info/requires.txt
index 0110967..ccada03 100644
--- a/confluent_kafka.egg-info/requires.txt
+++ b/confluent_kafka.egg-info/requires.txt
@@ -1,6 +1,5 @@
 
-
 [avro]
 fastavro
 requests
-avro
\ No newline at end of file
+avro
diff --git a/confluent_kafka/__init__.py b/confluent_kafka/__init__.py
index 89ea508..ef6e59b 100644
--- a/confluent_kafka/__init__.py
+++ b/confluent_kafka/__init__.py
@@ -1,5 +1,5 @@
 __all__ = ['cimpl', 'avro', 'kafkatest']
-from .cimpl import (Consumer,
+from .cimpl import (Consumer,  # noqa
                     KafkaError,
                     KafkaException,
                     Message,
diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py
index 846efd8..118ab2a 100644
--- a/confluent_kafka/avro/__init__.py
+++ b/confluent_kafka/avro/__init__.py
@@ -2,58 +2,12 @@
     Avro schema registry module: Deals with encoding and decoding of messages with avro schemas
 
 """
-import sys
 
 from confluent_kafka import Producer, Consumer
-
-VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
-
-
-def loads(schema_str):
-    """ Parse a schema given a schema string """
-    if sys.version_info[0] < 3:
-        return schema.parse(schema_str)
-    else:
-        return schema.Parse(schema_str)
-
-
-def load(fp):
-    """ Parse a schema from a file path """
-    with open(fp) as f:
-        return loads(f.read())
-
-
-# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitely as a quick fix
-def _hash_func(self):
-    return hash(str(self))
-
-
-try:
-    from avro import schema
-
-    schema.RecordSchema.__hash__ = _hash_func
-    schema.PrimitiveSchema.__hash__ = _hash_func
-except ImportError:
-    pass
-
-
-class ClientError(Exception):
-    """ Error thrown by Schema Registry clients """
-
-    def __init__(self, message, http_code=None):
-        self.message = message
-        self.http_code = http_code
-        super(ClientError, self).__init__(self.__str__())
-
-    def __repr__(self):
-        return "ClientError(error={error})".format(error=self.message)
-
-    def __str__(self):
-        return self.message
-
-
+from confluent_kafka.avro.error import ClientError
+from confluent_kafka.avro.load import load, loads  # noqa
 from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
-from confluent_kafka.avro.serializer import (SerializerError,
+from confluent_kafka.avro.serializer import (SerializerError,  # noqa
                                              KeySerializerError,
                                              ValueSerializerError)
 from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
@@ -72,14 +26,17 @@ class AvroProducer(Producer):
     """
 
     def __init__(self, config, default_key_schema=None,
-                 default_value_schema=None):
-        if ('schema.registry.url' not in config.keys()):
-            raise ValueError("Missing parameter: schema.registry.url")
-        schem_registry_url = config["schema.registry.url"]
-        del config["schema.registry.url"]
+                 default_value_schema=None, schema_registry=None):
+        schema_registry_url = config.pop("schema.registry.url", None)
+        if schema_registry is None:
+            if schema_registry_url is None:
+                raise ValueError("Missing parameter: schema.registry.url")
+            schema_registry = CachedSchemaRegistryClient(url=schema_registry_url)
+        elif schema_registry_url is not None:
+            raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
 
         super(AvroProducer, self).__init__(config)
-        self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))
+        self._serializer = MessageSerializer(schema_registry)
         self._key_schema = default_key_schema
         self._value_schema = default_value_schema
 
@@ -114,7 +71,6 @@ class AvroProducer(Producer):
             else:
                 raise KeySerializerError("Avro schema required for key")
 
-
         super(AvroProducer, self).produce(topic, value, key, **kwargs)
 
 
@@ -127,15 +83,17 @@ class AvroConsumer(Consumer):
 
     @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url).
     """
-    def __init__(self, config):
-
-        if ('schema.registry.url' not in config.keys()):
-            raise ValueError("Missing parameter: schema.registry.url")
-        schem_registry_url = config["schema.registry.url"]
-        del config["schema.registry.url"]
+    def __init__(self, config, schema_registry=None):
+        schema_registry_url = config.pop("schema.registry.url", None)
+        if schema_registry is None:
+            if schema_registry_url is None:
+                raise ValueError("Missing parameter: schema.registry.url")
+            schema_registry = CachedSchemaRegistryClient(url=schema_registry_url)
+        elif schema_registry_url is not None:
+            raise ValueError("Cannot pass schema_registry along with schema.registry.url config")
 
         super(AvroConsumer, self).__init__(config)
-        self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))
+        self._serializer = MessageSerializer(schema_registry)
 
     def poll(self, timeout=None):
         """
diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py
index 97735c3..5573b5b 100644
--- a/confluent_kafka/avro/cached_schema_registry_client.py
+++ b/confluent_kafka/avro/cached_schema_registry_client.py
@@ -25,9 +25,11 @@ from collections import defaultdict
 
 import requests
 
-from . import ClientError, VALID_LEVELS
+from .error import ClientError
 from . import loads
 
+VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
+
 # Common accept header sent
 ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json"
 log = logging.getLogger(__name__)
@@ -119,7 +121,7 @@ class CachedSchemaRegistryClient(object):
 
         schemas_to_id = self.subject_to_schema_ids[subject]
         schema_id = schemas_to_id.get(avro_schema, None)
-        if schema_id != None:
+        if schema_id is not None:
             return schema_id
         # send it up
         url = '/'.join([self.url, 'subjects', subject, 'versions'])
@@ -222,7 +224,7 @@ class CachedSchemaRegistryClient(object):
         """
         schemas_to_version = self.subject_to_schema_versions[subject]
         version = schemas_to_version.get(avro_schema, None)
-        if version != None:
+        if version is not None:
             return version
 
         url = '/'.join([self.url, 'subjects', subject])
diff --git a/confluent_kafka/avro/error.py b/confluent_kafka/avro/error.py
new file mode 100644
index 0000000..b879c4b
--- /dev/null
+++ b/confluent_kafka/avro/error.py
@@ -0,0 +1,31 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+class ClientError(Exception):
+    """ Error thrown by Schema Registry clients """
+
+    def __init__(self, message, http_code=None):
+        self.message = message
+        self.http_code = http_code
+        super(ClientError, self).__init__(self.__str__())
+
+    def __repr__(self):
+        return "ClientError(error={error})".format(error=self.message)
+
+    def __str__(self):
+        return self.message
diff --git a/confluent_kafka/avro/load.py b/confluent_kafka/avro/load.py
new file mode 100644
index 0000000..5c683dc
--- /dev/null
+++ b/confluent_kafka/avro/load.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+
+def loads(schema_str):
+    """ Parse a schema given a schema string """
+    if sys.version_info[0] < 3:
+        return schema.parse(schema_str)
+    else:
+        return schema.Parse(schema_str)
+
+
+def load(fp):
+    """ Parse a schema from a file path """
+    with open(fp) as f:
+        return loads(f.read())
+
+
+# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitly as
+# a quick fix
+def _hash_func(self):
+    return hash(str(self))
+
+
+try:
+    from avro import schema
+
+    schema.RecordSchema.__hash__ = _hash_func
+    schema.PrimitiveSchema.__hash__ = _hash_func
+except ImportError:
+    schema = None
diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py
index 47eab5d..c3a4b94 100644
--- a/confluent_kafka/avro/serializer/message_serializer.py
+++ b/confluent_kafka/avro/serializer/message_serializer.py
@@ -125,10 +125,9 @@ class MessageSerializer(object):
                 if not schema:
                     raise serialize_err("Schema does not exist")
                 self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
-            except ClientError as e:
+            except ClientError:
                 exc_type, exc_value, exc_traceback = sys.exc_info()
-                raise serialize_err( + repr(
-                    traceback.format_exception(exc_type, exc_value, exc_traceback)))
+                raise serialize_err(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))
 
         # get the writer
         writer = self.id_to_writers[schema_id]
@@ -171,14 +170,15 @@ class MessageSerializer(object):
             # try to use fast avro
             try:
                 schema_dict = schema.to_json()
-                obj = read_data(payload, schema_dict)
-                # here means we passed so this is something fastavro can do
-                # seek back since it will be called again for the
-                # same payload - one time hit
+                read_data(payload, schema_dict)
 
+                # If we reach this point, this means we have fastavro and it can
+                # do this deserialization. Rewind since this method just determines
+                # the reader function and we need to deserialize again along the
+                # normal path.
                 payload.seek(curr_pos)
-                decoder_func = lambda p: read_data(p, schema_dict)
-                self.id_to_decoder_func[schema_id] = decoder_func
+
+                self.id_to_decoder_func[schema_id] = lambda p: read_data(p, schema_dict)
                 return self.id_to_decoder_func[schema_id]
             except:
                 pass
@@ -201,6 +201,10 @@ class MessageSerializer(object):
         the schema registry.
         @:param: message
         """
+
+        if message is None:
+            return None
+
         if len(message) <= 5:
             raise SerializerError("message is too small to decode")
 
diff --git a/confluent_kafka/kafkatest/verifiable_client.py b/confluent_kafka/kafkatest/verifiable_client.py
index a0eb1e1..d001527 100644
--- a/confluent_kafka/kafkatest/verifiable_client.py
+++ b/confluent_kafka/kafkatest/verifiable_client.py
@@ -13,7 +13,14 @@
 # limitations under the License.
 
 
-import signal, socket, os, sys, time, json, re, datetime
+import datetime
+import json
+import os
+import re
+import signal
+import socket
+import sys
+import time
 
 
 class VerifiableClient(object):
@@ -21,7 +28,7 @@ class VerifiableClient(object):
     Generic base class for a kafkatest verifiable client.
     Implements the common kafkatest protocol and semantics.
     """
-    def __init__ (self, conf):
+    def __init__(self, conf):
         """
         """
         super(VerifiableClient, self).__init__()
@@ -31,37 +38,36 @@ class VerifiableClient(object):
         signal.signal(signal.SIGTERM, self.sig_term)
         self.dbg('Pid is %d' % os.getpid())
 
-    def sig_term (self, sig, frame):
+    def sig_term(self, sig, frame):
         self.dbg('SIGTERM')
         self.run = False
 
     @staticmethod
-    def _timestamp ():
+    def _timestamp():
         return time.strftime('%H:%M:%S', time.localtime())
 
-    def dbg (self, s):
+    def dbg(self, s):
         """ Debugging printout """
         sys.stderr.write('%% %s DEBUG: %s\n' % (self._timestamp(), s))
 
-    def err (self, s, term=False):
+    def err(self, s, term=False):
         """ Error printout, if term=True the process will terminate immediately. """
         sys.stderr.write('%% %s ERROR: %s\n' % (self._timestamp(), s))
         if term:
             sys.stderr.write('%% FATAL ERROR ^\n')
             sys.exit(1)
 
-    def send (self, d):
+    def send(self, d):
         """ Send dict as JSON to stdout for consumtion by kafkatest handler """
         d['_time'] = str(datetime.datetime.now())
         self.dbg('SEND: %s' % json.dumps(d))
         sys.stdout.write('%s\n' % json.dumps(d))
         sys.stdout.flush()
 
-
     @staticmethod
-    def set_config (conf, args):
+    def set_config(conf, args):
         """ Set client config properties using args dict. """
-        for n,v in args.iteritems():
+        for n, v in args.iteritems():
             if v is None:
                 continue
             # Things to ignore
diff --git a/confluent_kafka/kafkatest/verifiable_consumer.py b/confluent_kafka/kafkatest/verifiable_consumer.py
index 3e01367..1184afe 100755
--- a/confluent_kafka/kafkatest/verifiable_consumer.py
+++ b/confluent_kafka/kafkatest/verifiable_consumer.py
@@ -15,16 +15,17 @@
 # limitations under the License.
 #
 
-import argparse, sys
+import argparse
 from confluent_kafka import Consumer, KafkaError, KafkaException
 from verifiable_client import VerifiableClient
 
+
 class VerifiableConsumer(VerifiableClient):
     """
     confluent-kafka-python backed VerifiableConsumer class for use with
     Kafka's kafkatests client tests.
     """
-    def __init__ (self, conf):
+    def __init__(self, conf):
         """
         \p conf is a config dict passed to confluent_kafka.Consumer()
         """
@@ -40,19 +41,16 @@ class VerifiableConsumer(VerifiableClient):
         self.assignment = []
         self.assignment_dict = dict()
 
-
-    def find_assignment (self, topic, partition):
+    def find_assignment(self, topic, partition):
         """ Find and return existing assignment based on \p topic and \p partition,
         or None on miss. """
         skey = '%s %d' % (topic, partition)
         return self.assignment_dict.get(skey)
 
-
-    def send_records_consumed (self, immediate=False):
+    def send_records_consumed(self, immediate=False):
         """ Send records_consumed, every 100 messages, on timeout,
             or if immediate is set. """
-        if (self.consumed_msgs <= self.consumed_msgs_last_reported +
-            (0 if immediate else 100)):
+        if self.consumed_msgs <= self.consumed_msgs_last_reported + (0 if immediate else 100):
             return
 
         if len(self.assignment) == 0:
@@ -73,15 +71,13 @@ class VerifiableConsumer(VerifiableClient):
         self.send(d)
         self.consumed_msgs_last_reported = self.consumed_msgs
 
-
-    def send_assignment (self, evtype, partitions):
+    def send_assignment(self, evtype, partitions):
         """ Send assignment update, \p evtype is either 'assigned' or 'revoked' """
-        d = { 'name': 'partitions_' + evtype,
-              'partitions': [{'topic': x.topic, 'partition': x.partition} for x in partitions]}
+        d = {'name': 'partitions_' + evtype,
+             'partitions': [{'topic': x.topic, 'partition': x.partition} for x in partitions]}
         self.send(d)
 
-
-    def on_assign (self, consumer, partitions):
+    def on_assign(self, consumer, partitions):
         """ Rebalance on_assign callback """
         old_assignment = self.assignment
         self.assignment = [AssignedPartition(p.topic, p.partition) for p in partitions]
@@ -94,7 +90,7 @@ class VerifiableConsumer(VerifiableClient):
         self.assignment_dict = {a.skey: a for a in self.assignment}
         self.send_assignment('assigned', partitions)
 
-    def on_revoke (self, consumer, partitions):
+    def on_revoke(self, consumer, partitions):
         """ Rebalance on_revoke callback """
         # Send final consumed records prior to rebalancing to make sure
         # latest consumed is in par with what is going to be committed.
@@ -104,8 +100,7 @@ class VerifiableConsumer(VerifiableClient):
         self.send_assignment('revoked', partitions)
         self.do_commit(immediate=True)
 
-
-    def on_commit (self, err, partitions):
+    def on_commit(self, err, partitions):
         """ Offsets Committed callback """
         if err is not None and err.code() == KafkaError._NO_OFFSET:
             self.dbg('on_commit(): no offsets to commit')
@@ -131,13 +126,12 @@ class VerifiableConsumer(VerifiableClient):
 
         self.send(d)
 
-
-    def do_commit (self, immediate=False, async=None):
+    def do_commit(self, immediate=False, async=None):
         """ Commit every 1000 messages or whenever there is a consume timeout
             or immediate. """
-        if (self.use_auto_commit or
-            self.consumed_msgs_at_last_commit + (0 if immediate else 1000) >
-            self.consumed_msgs):
+        if (self.use_auto_commit
+                or self.consumed_msgs_at_last_commit + (0 if immediate else 1000) >
+                self.consumed_msgs):
             return
 
         # Make sure we report consumption before commit,
@@ -166,8 +160,7 @@ class VerifiableConsumer(VerifiableClient):
 
         self.consumed_msgs_at_last_commit = self.consumed_msgs
 
-
-    def msg_consume (self, msg):
+    def msg_consume(self, msg):
         """ Handle consumed message (or error event) """
         if msg.error():
             if msg.error().code() == KafkaError._PARTITION_EOF:
@@ -178,11 +171,11 @@ class VerifiableConsumer(VerifiableClient):
             return
 
         if False:
-            self.dbg('Read msg from %s [%d] @ %d' % \
+            self.dbg('Read msg from %s [%d] @ %d' %
                      (msg.topic(), msg.partition(), msg.offset()))
 
         if self.max_msgs >= 0 and self.consumed_msgs >= self.max_msgs:
-            return # ignore extra messages
+            return  # ignore extra messages
 
         # Find assignment.
         a = self.find_assignment(msg.topic(), msg.partition())
@@ -204,7 +197,7 @@ class VerifiableConsumer(VerifiableClient):
 
 class AssignedPartition(object):
     """ Local state container for assigned partition. """
-    def __init__ (self, topic, partition):
+    def __init__(self, topic, partition):
         super(AssignedPartition, self).__init__()
         self.topic = topic
         self.partition = partition
@@ -213,21 +206,12 @@ class AssignedPartition(object):
         self.min_offset = -1
         self.max_offset = 0
 
-    def to_dict (self):
+    def to_dict(self):
         """ Return a dict of this partition's state """
         return {'topic': self.topic, 'partition': self.partition,
                 'minOffset': self.min_offset, 'maxOffset': self.max_offset}
 
 
-        
-
-    
-
-
-
-
-        
-
 if __name__ == '__main__':
 
     parser = argparse.ArgumentParser(description='Verifiable Python Consumer')
@@ -257,7 +241,6 @@ if __name__ == '__main__':
     vc.consumer.subscribe(args['topic'],
                           on_assign=vc.on_assign, on_revoke=vc.on_revoke)
 
-
     try:
         while vc.run:
             msg = vc.consumer.poll(timeout=1.0)
diff --git a/confluent_kafka/kafkatest/verifiable_producer.py b/confluent_kafka/kafkatest/verifiable_producer.py
index af188a2..46042cf 100755
--- a/confluent_kafka/kafkatest/verifiable_producer.py
+++ b/confluent_kafka/kafkatest/verifiable_producer.py
@@ -15,16 +15,18 @@
 # limitations under the License.
 #
 
-import argparse, time
-from confluent_kafka import Producer, KafkaError, KafkaException
+import argparse
+import time
+from confluent_kafka import Producer, KafkaException
 from verifiable_client import VerifiableClient
 
+
 class VerifiableProducer(VerifiableClient):
     """
     confluent-kafka-python backed VerifiableProducer class for use with
     Kafka's kafkatests client tests.
     """
-    def __init__ (self, conf):
+    def __init__(self, conf):
         """
         \p conf is a config dict passed to confluent_kafka.Producer()
         """
@@ -36,7 +38,7 @@ class VerifiableProducer(VerifiableClient):
         self.num_sent = 0
         self.num_err = 0
 
-    def dr_cb (self, err, msg):
+    def dr_cb(self, err, msg):
         """ Per-message Delivery report callback. Called from poll() """
         if err:
             self.num_err += 1
@@ -53,13 +55,9 @@ class VerifiableProducer(VerifiableClient):
                        'offset': msg.offset(),
                        'key': msg.key(),
                        'value': msg.value()})
-                       
-        pass
-
-
 
+        pass
 
-        
 
 if __name__ == '__main__':
 
@@ -67,7 +65,7 @@ if __name__ == '__main__':
     parser.add_argument('--topic', type=str, required=True)
     parser.add_argument('--throughput', type=int, default=0)
     parser.add_argument('--broker-list', dest='bootstrap.servers', required=True)
-    parser.add_argument('--max-messages', type=int, dest='max_msgs', default=1000000) # avoid infinite
+    parser.add_argument('--max-messages', type=int, dest='max_msgs', default=1000000)  # avoid infinite
     parser.add_argument('--value-prefix', dest='value_prefix', type=str, default=None)
     parser.add_argument('--acks', type=int, dest='topic.request.required.acks', default=-1)
     parser.add_argument('--producer.config', dest='producer_config')
@@ -106,17 +104,15 @@ if __name__ == '__main__':
                     vp.producer.produce(topic, value=(value_fmt % i))
                     vp.num_sent += 1
                 except KafkaException as e:
-                    self.err('produce() #%d/%d failed: %s' % \
-                             (i, vp.max_msgs, str(e)))
+                    vp.err('produce() #%d/%d failed: %s' % (i, vp.max_msgs, str(e)))
                     vp.num_err += 1
                 except BufferError:
-                    vp.dbg('Local produce queue full (produced %d/%d msgs), waiting for deliveries..' % \
+                    vp.dbg('Local produce queue full (produced %d/%d msgs), waiting for deliveries..' %
                            (i, vp.max_msgs))
                     vp.producer.poll(timeout=0.5)
                     continue
                 break
 
-
             # Delay to achieve desired throughput,
             # but make sure poll is called at least once
             # to serve DRs.
@@ -135,7 +131,7 @@ if __name__ == '__main__':
         vp.producer.flush()
     except KeyboardInterrupt:
         pass
-    
+
     vp.send({'name': 'shutdown_complete'})
 
     vp.dbg('All done')
diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c
index 820ed84..f835edd 100644
--- a/confluent_kafka/src/Consumer.c
+++ b/confluent_kafka/src/Consumer.c
@@ -463,7 +463,7 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
         if (!rkm)
                 Py_RETURN_NONE;
 
-        msgobj = Message_new0(rkm);
+        msgobj = Message_new0(self, rkm);
         rd_kafka_message_destroy(rkm);
 
         return msgobj;
@@ -770,6 +770,8 @@ static int Consumer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
                 return -1;
         }
 
+        self->type = RD_KAFKA_CONSUMER;
+
         if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
                                        args, kwargs)))
                 return -1; /* Exception raised by ..conf_setup() */
diff --git a/confluent_kafka/src/Producer.c b/confluent_kafka/src/Producer.c
index 8ba69a9..01a1bdd 100644
--- a/confluent_kafka/src/Producer.c
+++ b/confluent_kafka/src/Producer.c
@@ -152,7 +152,11 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
 		goto done;
 	}
 
-	msgobj = Message_new0(rkm);
+        /* Skip callback if delivery.report.only.error=true */
+        if (self->u.Producer.dr_only_error && !rkm->err)
+                goto done;
+
+	msgobj = Message_new0(self, rkm);
 	
 	args = Py_BuildValue("(OO)",
 			     Message_error((Message *)msgobj, NULL),
@@ -324,9 +328,11 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
 #if !HAVE_PRODUCEV
         if (timestamp) {
                 PyErr_Format(PyExc_NotImplementedError,
-                             "Producer timestamps require librdkafka "
-                             "version >=v0.9.4 (currently on %s)",
-                             rd_kafka_version_str());
+                             "Producer timestamps require "
+                             "confluent-kafka-python built for librdkafka "
+                             "version >=v0.9.4 (librdkafka runtime 0x%x, "
+                             "buildtime 0x%x)",
+                             rd_kafka_version(), RD_KAFKA_VERSION);
                 return NULL;
         }
 #endif
@@ -527,6 +533,8 @@ static int Producer_init (PyObject *selfobj, PyObject *args, PyObject *kwargs) {
                 return -1;
         }
 
+        self->type = RD_KAFKA_PRODUCER;
+
         if (!(conf = common_conf_setup(RD_KAFKA_PRODUCER, self,
                                        args, kwargs)))
                 return -1;
diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c
index 1d087cd..5dbc32e 100644
--- a/confluent_kafka/src/confluent_kafka.c
+++ b/confluent_kafka/src/confluent_kafka.c
@@ -258,7 +258,10 @@ PyObject *KafkaError_new0 (rd_kafka_resp_err_t err, const char *fmt, ...) {
  PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str) {
 	if (!err)
 		Py_RETURN_NONE;
-	return KafkaError_new0(err, "%s", str);
+        if (str)
+                return KafkaError_new0(err, "%s", str);
+        else
+                return KafkaError_new0(err, NULL);
 }
 
 
@@ -529,17 +532,19 @@ PyTypeObject MessageType = {
 /**
  * @brief Internal factory to create Message object from message_t
  */
-PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
+PyObject *Message_new0 (const Handle *handle, const rd_kafka_message_t *rkm) {
 	Message *self;
 
 	self = (Message *)MessageType.tp_alloc(&MessageType, 0);
 	if (!self)
 		return NULL;
 
-	self->error = KafkaError_new_or_None(rkm->err,
-					     rkm->err ?
-					     rd_kafka_message_errstr(rkm) :
-					     NULL);
+        /* Only use message error string on Consumer, for Producers
+         * it will contain the original message payload. */
+        self->error = KafkaError_new_or_None(
+                rkm->err,
+                (rkm->err && handle->type != RD_KAFKA_PRODUCER) ?
+                rd_kafka_message_errstr(rkm) : NULL);
 
 	if (rkm->rkt)
 		self->topic = cfl_PyUnistr(
@@ -1119,7 +1124,26 @@ static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
 		}
 
 		return 1;
-	}
+
+        } else if (!strcmp(name, "delivery.report.only.error")) {
+                /* Since we allocate msgstate for each produced message
+                 * with a callback we can't use delivery.report.only.error
+                 * as-is, as we wouldn't be able to ever free those msgstates.
+                 * Instead we shortcut this setting in the Python client,
+                 * providing the same functionality from dr_msg_cb trampoline.
+                 */
+
+                if (!PyBool_Check(valobj)) {
+                        cfl_PyErr_Format(
+                                RD_KAFKA_RESP_ERR__INVALID_ARG,
+                                "%s requires bool", name);
+                        return -1;
+                }
+
+                self->u.Producer.dr_only_error = valobj == Py_True;
+
+                return 1;
+        }
 
 	return 0; /* Not handled */
 }
@@ -1278,16 +1302,21 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
 		/*
 		 * Pass configuration property through to librdkafka.
 		 */
-		if (!(vs = cfl_PyObject_Unistr(vo))) {
-			PyErr_SetString(PyExc_TypeError,
-					"expected configuration property "
-					"value as type unicode string");
-			rd_kafka_topic_conf_destroy(tconf);
-			rd_kafka_conf_destroy(conf);
-			Py_DECREF(ks);
-			return NULL;
-		}
-		v = cfl_PyUnistr_AsUTF8(vs);
+                if (vo == Py_None) {
+                        v = NULL;
+                } else {
+                        if (!(vs = cfl_PyObject_Unistr(vo))) {
+                                PyErr_SetString(PyExc_TypeError,
+                                                "expected configuration "
+                                                "property value as type "
+                                                "unicode string");
+                                rd_kafka_topic_conf_destroy(tconf);
+                                rd_kafka_conf_destroy(conf);
+                                Py_DECREF(ks);
+                                return NULL;
+                        }
+                        v = cfl_PyUnistr_AsUTF8(vs);
+                }
 
 		if (rd_kafka_conf_set(conf, k, v, errstr, sizeof(errstr)) !=
 		    RD_KAFKA_CONF_OK) {
@@ -1295,12 +1324,12 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
 					  "%s", errstr);
 			rd_kafka_topic_conf_destroy(tconf);
 			rd_kafka_conf_destroy(conf);
-			Py_DECREF(vs);
+                        Py_XDECREF(vs);
 			Py_DECREF(ks);
 			return NULL;
 		}
 
-		Py_DECREF(vs);
+                Py_XDECREF(vs);
 		Py_DECREF(ks);
 	}
 
@@ -1398,7 +1427,7 @@ static PyObject *libversion (PyObject *self, PyObject *args) {
 }
 
 static PyObject *version (PyObject *self, PyObject *args) {
-	return Py_BuildValue("si", "0.9.2", 0x00090100);
+	return Py_BuildValue("si", "0.11.0", 0x000b0000);
 }
 
 static PyMethodDef cimpl_methods[] = {
diff --git a/confluent_kafka/src/confluent_kafka.h b/confluent_kafka/src/confluent_kafka.h
index fe2d6d8..899220b 100644
--- a/confluent_kafka/src/confluent_kafka.h
+++ b/confluent_kafka/src/confluent_kafka.h
@@ -105,6 +105,7 @@ PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str);
... 205 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-confluent-kafka.git



More information about the Python-modules-commits mailing list