[Python-modules-commits] [python-confluent-kafka] 01/08: Import python-confluent-kafka_0.9.4.orig.tar.gz

Christos Trochalakis ctrochalakis at moszumanska.debian.org
Tue Mar 14 14:39:45 UTC 2017


This is an automated email from the git hooks/post-receive script.

ctrochalakis pushed a commit to branch master
in repository python-confluent-kafka.

commit 8cc9a07a0b0cab90ab301153c2b0d3ae36add318
Author: Christos Trochalakis <yatiohi at ideopolis.gr>
Date:   Thu Mar 9 15:58:23 2017 +0200

    Import python-confluent-kafka_0.9.4.orig.tar.gz
---
 PKG-INFO                                           |   2 +-
 README.md                                          |  59 +++-
 confluent_kafka.egg-info/PKG-INFO                  |   2 +-
 confluent_kafka.egg-info/SOURCES.txt               |  22 +-
 confluent_kafka.egg-info/requires.txt              |   6 +
 confluent_kafka.egg-info/top_level.txt             |   1 +
 confluent_kafka/__init__.py                        |  18 +-
 confluent_kafka/avro/__init__.py                   | 162 +++++++++
 .../avro/cached_schema_registry_client.py          | 314 ++++++++++++++++++
 confluent_kafka/avro/serializer/__init__.py        |  40 +++
 .../avro/serializer/message_serializer.py          | 212 ++++++++++++
 confluent_kafka/src/Consumer.c                     | 364 ++++++++++++++-------
 confluent_kafka/src/Producer.c                     | 222 +++++++++----
 confluent_kafka/src/confluent_kafka.c              | 258 +++++++++++----
 confluent_kafka/src/confluent_kafka.h              |  34 +-
 setup.py                                           |  15 +-
 tests/avro/__init__.py                             |   0
 tests/avro/data_gen.py                             | 101 ++++++
 tests/avro/mock_registry.py                        | 189 +++++++++++
 tests/avro/mock_schema_registry_client.py          | 149 +++++++++
 tests/avro/test_avro_producer.py                   |  84 +++++
 tests/avro/test_cached_client.py                   | 130 ++++++++
 tests/avro/test_message_serializer.py              |  79 +++++
 tests/avro/test_mock_client.py                     | 120 +++++++
 tests/avro/test_util.py                            |  37 +++
 tests/test_Consumer.py                             |  62 ----
 tests/test_KafkaError.py                           |  30 --
 tests/test_Producer.py                             |  38 ---
 tests/test_TopicPartition.py                       |  38 ---
 tests/test_docs.py                                 |  29 --
 tests/test_enums.py                                |   9 -
 tests/test_misc.py                                 |  16 -
 tests/test_threads.py                              |  69 ----
 33 files changed, 2353 insertions(+), 558 deletions(-)

diff --git a/PKG-INFO b/PKG-INFO
index 90c0183..4fc2d8f 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: confluent-kafka
-Version: 0.9.2
+Version: 0.9.4
 Summary: Confluent's Apache Kafka client for Python
 Home-page: https://github.com/confluentinc/confluent-kafka-python
 Author: Confluent Inc
diff --git a/README.md b/README.md
index 7362208..9490751 100644
--- a/README.md
+++ b/README.md
@@ -46,6 +46,47 @@ while running:
 c.close()
 ```
 
+**AvroProducer**
+
+```python
+from confluent_kafka import avro 
+from confluent_kafka.avro import AvroProducer
+
+value_schema = avro.load('ValueSchema.avsc')
+key_schema = avro.load('KeySchema.avsc')
+value = {"name": "Value"}
+key = {"name": "Key"}
+
+avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
+avroProducer.produce(topic='my_topic', value=value, key=key)
+```
+
+**AvroConsumer**
+
+```python
+from confluent_kafka import KafkaError
+from confluent_kafka.avro import AvroConsumer
+from confluent_kafka.avro.serializer import SerializerError
+
+c = AvroConsumer({'bootstrap.servers': 'mybroker,mybroker2', 'group.id': 'groupid', 'schema.registry.url': 'http://127.0.0.1:8081'})
+c.subscribe(['my_topic'])
+running = True
+while running:
+    try:
+        msg = c.poll(10)
+        if msg:
+            if not msg.error():
+                print(msg.value())
+            elif msg.error().code() != KafkaError._PARTITION_EOF:
+                print(msg.error())
+                running = False
+    except SerializerError as e:
+        print("Message deserialization failed for %s: %s" % (msg, e))
+        running = False
+        
+c.close()
+```
+
 See [examples](examples) for more examples.
 
 
@@ -85,12 +126,18 @@ Install
 **Install from PyPi:**
 
     $ pip install confluent-kafka
+    
+    # for AvroProducer or AvroConsumer
+    $ pip install confluent-kafka[avro]
 
 
 **Install from source / tarball:**
 
     $ pip install .
 
+    # for AvroProducer or AvroConsumer
+    $ pip install .[avro]
+
 
 Build
 =====
@@ -108,16 +155,20 @@ Tests
 
 **Run unit-tests:**
 
-    $ py.test
+In order to run full test suite, simply execute:
 
-**NOTE**: Requires `py.test`, install by `pip install pytest`
+    $ tox -r
+
+**NOTE**: Requires `tox` (please install with `pip install tox`), several supported versions of Python on your path, and `librdkafka` [installed](tools/bootstrap-librdkafka.sh) into `tmp-build`.
 
 
 **Run integration tests:**
 
-    $ examples/integration_test.py <kafka-broker>
+To run the integration tests, uncomment the following line from `tox.ini` and add the paths to your Kafka and Confluent Schema Registry instances. If no Schema Registry path is provided then no AVRO tests will by run. You can also run the integration tests outside of `tox` by running this command from the source root.
+
+    examples/integration_test.py <kafka-broker> [<test-topic>] [<schema-registry>]
 
-**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'.
+**WARNING**: These tests require an active Kafka cluster and will create new topics.
 
 
 
diff --git a/confluent_kafka.egg-info/PKG-INFO b/confluent_kafka.egg-info/PKG-INFO
index 90c0183..4fc2d8f 100644
--- a/confluent_kafka.egg-info/PKG-INFO
+++ b/confluent_kafka.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: confluent-kafka
-Version: 0.9.2
+Version: 0.9.4
 Summary: Confluent's Apache Kafka client for Python
 Home-page: https://github.com/confluentinc/confluent-kafka-python
 Author: Confluent Inc
diff --git a/confluent_kafka.egg-info/SOURCES.txt b/confluent_kafka.egg-info/SOURCES.txt
index d99b51b..e92f2d2 100644
--- a/confluent_kafka.egg-info/SOURCES.txt
+++ b/confluent_kafka.egg-info/SOURCES.txt
@@ -6,7 +6,12 @@ confluent_kafka/__init__.py
 confluent_kafka.egg-info/PKG-INFO
 confluent_kafka.egg-info/SOURCES.txt
 confluent_kafka.egg-info/dependency_links.txt
+confluent_kafka.egg-info/requires.txt
 confluent_kafka.egg-info/top_level.txt
+confluent_kafka/avro/__init__.py
+confluent_kafka/avro/cached_schema_registry_client.py
+confluent_kafka/avro/serializer/__init__.py
+confluent_kafka/avro/serializer/message_serializer.py
 confluent_kafka/kafkatest/__init__.py
 confluent_kafka/kafkatest/verifiable_client.py
 confluent_kafka/kafkatest/verifiable_consumer.py
@@ -15,11 +20,12 @@ confluent_kafka/src/Consumer.c
 confluent_kafka/src/Producer.c
 confluent_kafka/src/confluent_kafka.c
 confluent_kafka/src/confluent_kafka.h
-tests/test_Consumer.py
-tests/test_KafkaError.py
-tests/test_Producer.py
-tests/test_TopicPartition.py
-tests/test_docs.py
-tests/test_enums.py
-tests/test_misc.py
-tests/test_threads.py
\ No newline at end of file
+tests/avro/__init__.py
+tests/avro/data_gen.py
+tests/avro/mock_registry.py
+tests/avro/mock_schema_registry_client.py
+tests/avro/test_avro_producer.py
+tests/avro/test_cached_client.py
+tests/avro/test_message_serializer.py
+tests/avro/test_mock_client.py
+tests/avro/test_util.py
\ No newline at end of file
diff --git a/confluent_kafka.egg-info/requires.txt b/confluent_kafka.egg-info/requires.txt
new file mode 100644
index 0000000..0110967
--- /dev/null
+++ b/confluent_kafka.egg-info/requires.txt
@@ -0,0 +1,6 @@
+
+
+[avro]
+fastavro
+requests
+avro
\ No newline at end of file
diff --git a/confluent_kafka.egg-info/top_level.txt b/confluent_kafka.egg-info/top_level.txt
index 1975722..eb43e45 100644
--- a/confluent_kafka.egg-info/top_level.txt
+++ b/confluent_kafka.egg-info/top_level.txt
@@ -1 +1,2 @@
 confluent_kafka
+tests
diff --git a/confluent_kafka/__init__.py b/confluent_kafka/__init__.py
index d998bec..89ea508 100644
--- a/confluent_kafka/__init__.py
+++ b/confluent_kafka/__init__.py
@@ -1,2 +1,16 @@
-__all__ = ['cimpl','kafkatest']
-from .cimpl import *
+__all__ = ['cimpl', 'avro', 'kafkatest']
+from .cimpl import (Consumer,
+                    KafkaError,
+                    KafkaException,
+                    Message,
+                    Producer,
+                    TopicPartition,
+                    libversion,
+                    version,
+                    TIMESTAMP_NOT_AVAILABLE,
+                    TIMESTAMP_CREATE_TIME,
+                    TIMESTAMP_LOG_APPEND_TIME,
+                    OFFSET_BEGINNING,
+                    OFFSET_END,
+                    OFFSET_STORED,
+                    OFFSET_INVALID)
diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py
new file mode 100644
index 0000000..846efd8
--- /dev/null
+++ b/confluent_kafka/avro/__init__.py
@@ -0,0 +1,162 @@
+"""
+    Avro schema registry module: Deals with encoding and decoding of messages with avro schemas
+
+"""
+import sys
+
+from confluent_kafka import Producer, Consumer
+
+VALID_LEVELS = ['NONE', 'FULL', 'FORWARD', 'BACKWARD']
+
+
+def loads(schema_str):
+    """ Parse a schema given a schema string """
+    if sys.version_info[0] < 3:
+        return schema.parse(schema_str)
+    else:
+        return schema.Parse(schema_str)
+
+
+def load(fp):
+    """ Parse a schema from a file path """
+    with open(fp) as f:
+        return loads(f.read())
+
+
+# avro.schema.RecordSchema and avro.schema.PrimitiveSchema classes are not hashable. Hence defining them explicitely as a quick fix
+def _hash_func(self):
+    return hash(str(self))
+
+
+try:
+    from avro import schema
+
+    schema.RecordSchema.__hash__ = _hash_func
+    schema.PrimitiveSchema.__hash__ = _hash_func
+except ImportError:
+    pass
+
+
+class ClientError(Exception):
+    """ Error thrown by Schema Registry clients """
+
+    def __init__(self, message, http_code=None):
+        self.message = message
+        self.http_code = http_code
+        super(ClientError, self).__init__(self.__str__())
+
+    def __repr__(self):
+        return "ClientError(error={error})".format(error=self.message)
+
+    def __str__(self):
+        return self.message
+
+
+from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
+from confluent_kafka.avro.serializer import (SerializerError,
+                                             KeySerializerError,
+                                             ValueSerializerError)
+from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
+
+
+class AvroProducer(Producer):
+    """
+        Kafka Producer client which does avro schema encoding to messages.
+        Handles schema registration, Message serialization.
+
+        Constructor takes below parameters
+
+        @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url).
+        @:param: default_key_schema: Optional avro schema for key
+        @:param: default_value_schema: Optional avro schema for value
+    """
+
+    def __init__(self, config, default_key_schema=None,
+                 default_value_schema=None):
+        if ('schema.registry.url' not in config.keys()):
+            raise ValueError("Missing parameter: schema.registry.url")
+        schem_registry_url = config["schema.registry.url"]
+        del config["schema.registry.url"]
+
+        super(AvroProducer, self).__init__(config)
+        self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))
+        self._key_schema = default_key_schema
+        self._value_schema = default_value_schema
+
+    def produce(self, **kwargs):
+        """
+            Sends message to kafka by encoding with specified avro schema
+            @:param: topic: topic name
+            @:param: value: An object to serialize
+            @:param: value_schema : Avro schema for value
+            @:param: key: An object to serialize
+            @:param: key_schema : Avro schema for key
+            @:exception: SerializerError
+        """
+        # get schemas from  kwargs if defined
+        key_schema = kwargs.pop('key_schema', self._key_schema)
+        value_schema = kwargs.pop('value_schema', self._value_schema)
+        topic = kwargs.pop('topic', None)
+        if not topic:
+            raise ClientError("Topic name not specified.")
+        value = kwargs.pop('value', None)
+        key = kwargs.pop('key', None)
+
+        if value:
+            if value_schema:
+                value = self._serializer.encode_record_with_schema(topic, value_schema, value)
+            else:
+                raise ValueSerializerError("Avro schema required for values")
+
+        if key:
+            if key_schema:
+                key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
+            else:
+                raise KeySerializerError("Avro schema required for key")
+
+
+        super(AvroProducer, self).produce(topic, value, key, **kwargs)
+
+
+class AvroConsumer(Consumer):
+    """
+    Kafka Consumer client which does avro schema decoding of messages.
+    Handles message deserialization.
+
+    Constructor takes below parameters
+
+    @:param: config: dict object with config parameters containing url for schema registry (schema.registry.url).
+    """
+    def __init__(self, config):
+
+        if ('schema.registry.url' not in config.keys()):
+            raise ValueError("Missing parameter: schema.registry.url")
+        schem_registry_url = config["schema.registry.url"]
+        del config["schema.registry.url"]
+
+        super(AvroConsumer, self).__init__(config)
+        self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schem_registry_url))
+
+    def poll(self, timeout=None):
+        """
+        This is an overriden method from confluent_kafka.Consumer class. This handles message
+        deserialization using avro schema
+
+        @:param timeout
+        @:return message object with deserialized key and value as dict objects
+        """
+        if timeout is None:
+            timeout = -1
+        message = super(AvroConsumer, self).poll(timeout)
+        if message is None:
+            return None
+        if not message.value() and not message.key():
+            return message
+        if not message.error():
+            if message.value() is not None:
+                decoded_value = self._serializer.decode_message(message.value())
+                message.set_value(decoded_value)
+            if message.key() is not None:
+                decoded_key = self._serializer.decode_message(message.key())
+                message.set_key(decoded_key)
+        return message
diff --git a/confluent_kafka/avro/cached_schema_registry_client.py b/confluent_kafka/avro/cached_schema_registry_client.py
new file mode 100644
index 0000000..97735c3
--- /dev/null
+++ b/confluent_kafka/avro/cached_schema_registry_client.py
@@ -0,0 +1,314 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+#
+# derived from https://github.com/verisign/python-confluent-schemaregistry.git
+#
+import json
+import logging
+from collections import defaultdict
+
+import requests
+
+from . import ClientError, VALID_LEVELS
+from . import loads
+
+# Common accept header sent
+ACCEPT_HDR = "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json"
+log = logging.getLogger(__name__)
+
+
+class CachedSchemaRegistryClient(object):
+    """
+    A client that talks to a Schema Registry over HTTP
+
+    See http://confluent.io/docs/current/schema-registry/docs/intro.html
+
+    Errors communicating to the server will result in a ClientError being raised.
+
+    @:param: url: url to schema registry
+    """
+
+    def __init__(self, url, max_schemas_per_subject=1000):
+        """Construct a client by passing in the base URL of the schema registry server"""
+
+        self.url = url.rstrip('/')
+
+        self.max_schemas_per_subject = max_schemas_per_subject
+        # subj => { schema => id }
+        self.subject_to_schema_ids = defaultdict(dict)
+        # id => avro_schema
+        self.id_to_schema = defaultdict(dict)
+        # subj => { schema => version }
+        self.subject_to_schema_versions = defaultdict(dict)
+
+    def _send_request(self, url, method='GET', body=None, headers=None):
+        if body:
+            body = json.dumps(body)
+            body = body.encode('utf8')
+        _headers = dict()
+        _headers["Accept"] = ACCEPT_HDR
+        if body:
+            _headers["Content-Length"] = str(len(body))
+            _headers["Content-Type"] = "application/vnd.schemaregistry.v1+json"
+
+        if headers:
+            for header_name in headers:
+                _headers[header_name] = headers[header_name]
+        if method == 'GET':
+            response = requests.get(url, headers=_headers)
+        elif method == 'POST':
+            response = requests.post(url, body, headers=_headers)
+        elif method == 'PUT':
+            response = requests.put(url, body, headers=_headers)
+        elif method == 'DELETE':
+            response = requests.delete(url, headers=_headers)
+        else:
+            raise ClientError("Invalid HTTP request type")
+
+        result = json.loads(response.text)
+        return (result, response.status_code)
+
+    def _add_to_cache(self, cache, subject, schema, value):
+        sub_cache = cache[subject]
+        sub_cache[schema] = value
+
+    def _cache_schema(self, schema, schema_id, subject=None, version=None):
+        # don't overwrite anything
+        if schema_id in self.id_to_schema:
+            schema = self.id_to_schema[schema_id]
+        else:
+            self.id_to_schema[schema_id] = schema
+
+        if subject:
+            self._add_to_cache(self.subject_to_schema_ids,
+                               subject, schema, schema_id)
+            if version:
+                self._add_to_cache(self.subject_to_schema_versions,
+                                   subject, schema, version)
+
+    def register(self, subject, avro_schema):
+        """
+        POST /subjects/(string: subject)/versions
+        Register a schema with the registry under the given subject
+        and receive a schema id.
+
+        avro_schema must be a parsed schema from the python avro library
+
+        Multiple instances of the same schema will result in cache misses.
+
+        @:param: subject: subject name
+        @:param: avro_schema: Avro schema to be registered
+        @:returns: schema_id: int value
+        """
+
+        schemas_to_id = self.subject_to_schema_ids[subject]
+        schema_id = schemas_to_id.get(avro_schema, None)
+        if schema_id != None:
+            return schema_id
+        # send it up
+        url = '/'.join([self.url, 'subjects', subject, 'versions'])
+        # body is { schema : json_string }
+
+        body = {'schema': json.dumps(avro_schema.to_json())}
+        result, code = self._send_request(url, method='POST', body=body)
+        if code == 409:
+            raise ClientError("Incompatible Avro schema:" + str(code))
+        elif code == 422:
+            raise ClientError("Invalid Avro schema:" + str(code))
+        elif not (code >= 200 and code <= 299):
+            raise ClientError("Unable to register schema. Error code:" + str(code))
+        # result is a dict
+        schema_id = result['id']
+        # cache it
+        self._cache_schema(avro_schema, schema_id, subject)
+        return schema_id
+
+    def get_by_id(self, schema_id):
+        """
+        GET /schemas/ids/{int: id}
+        Retrieve a parsed avro schema by id or None if not found
+        @:param: schema_id: int value
+        @:returns: Avro schema
+        """
+        if schema_id in self.id_to_schema:
+            return self.id_to_schema[schema_id]
+        # fetch from the registry
+        url = '/'.join([self.url, 'schemas', 'ids', str(schema_id)])
+
+        result, code = self._send_request(url)
+        if code == 404:
+            log.error("Schema not found:" + str(code))
+            return None
+        elif not (code >= 200 and code <= 299):
+            log.error("Unable to get schema for the specific ID:" + str(code))
+            return None
+        else:
+            # need to parse the schema
+            schema_str = result.get("schema")
+            try:
+                result = loads(schema_str)
+                # cache it
+                self._cache_schema(result, schema_id)
+                return result
+            except:
+                # bad schema - should not happen
+                raise ClientError("Received bad schema from registry.")
+
+    def get_latest_schema(self, subject):
+        """
+        GET /subjects/(string: subject)/versions/(versionId: version)
+
+        Return the latest 3-tuple of:
+        (the schema id, the parsed avro schema, the schema version)
+        for a particular subject.
+
+        This call always contacts the registry.
+
+        If the subject is not found, (None,None,None) is returned.
+        @:param: subject: subject name
+        @:returns: (schema_id, schema, version)
+        """
+        url = '/'.join([self.url, 'subjects', subject, 'versions', 'latest'])
+
+        result, code = self._send_request(url)
+        if code == 404:
+            log.error("Schema not found:" + str(code))
+            return (None, None, None)
+        elif code == 422:
+            log.error("Invalid version:" + str(code))
+            return (None, None, None)
+        elif not (code >= 200 and code <= 299):
+            return (None, None, None)
+        schema_id = result['id']
+        version = result['version']
+        if schema_id in self.id_to_schema:
+            schema = self.id_to_schema[schema_id]
+        else:
+            try:
+                schema = loads(result['schema'])
+            except:
+                # bad schema - should not happen
+                raise ClientError("Received bad schema from registry.")
+
+        self._cache_schema(schema, schema_id, subject, version)
+        return (schema_id, schema, version)
+
+    def get_version(self, subject, avro_schema):
+        """
+        POST /subjects/(string: subject)
+
+        Get the version of a schema for a given subject.
+
+        Returns None if not found.
+        @:param: subject: subject name
+        @:param: avro_schema: Avro schema
+        @:returns: version
+        """
+        schemas_to_version = self.subject_to_schema_versions[subject]
+        version = schemas_to_version.get(avro_schema, None)
+        if version != None:
+            return version
+
+        url = '/'.join([self.url, 'subjects', subject])
+        body = {'schema': json.dumps(avro_schema.to_json())}
+
+        result, code = self._send_request(url, method='POST', body=body)
+        if code == 404:
+            log.error("Not found:" + str(code))
+            return None
+        elif not (code >= 200 and code <= 299):
+            log.error("Unable to get version of a schema:" + str(code))
+            return None
+        schema_id = result['id']
+        version = result['version']
+        self._cache_schema(avro_schema, schema_id, subject, version)
+        return version
+
+    def test_compatibility(self, subject, avro_schema, version='latest'):
+        """
+        POST /compatibility/subjects/(string: subject)/versions/(versionId: version)
+
+        Test the compatibility of a candidate parsed schema for a given subject.
+
+        By default the latest version is checked against.
+        @:param: subject: subject name
+        @:param: avro_schema: Avro schema
+        @:return: True if compatible, False if not compatible
+        """
+        url = '/'.join([self.url, 'compatibility', 'subjects', subject,
+                        'versions', str(version)])
+        body = {'schema': json.dumps(avro_schema.to_json())}
+        try:
+            result, code = self._send_request(url, method='POST', body=body)
+            if code == 404:
+                log.error(("Subject or version not found:" + str(code)))
+                return False
+            elif code == 422:
+                log.error(("Invalid subject or schema:" + str(code)))
+                return False
+            elif code >= 200 and code <= 299:
+                return result.get('is_compatible')
+            else:
+                log.error("Unable to check the compatibility")
+                False
+        except:
+            return False
+
+    def update_compatibility(self, level, subject=None):
+        """
+        PUT /config/(string: subject)
+
+        Update the compatibility level for a subject.  Level must be one of:
+
+        @:param: level: ex: 'NONE','FULL','FORWARD', or 'BACKWARD'
+        """
+        if level not in VALID_LEVELS:
+            raise ClientError("Invalid level specified: %s" % (str(level)))
+
+        url = '/'.join([self.url, 'config'])
+        if subject:
+            url += '/' + subject
+
+        body = {"compatibility": level}
+        result, code = self._send_request(url, method='PUT', body=body)
+        if code >= 200 and code <= 299:
+            return result['compatibility']
+        else:
+            raise ClientError("Unable to update level: %s. Error code: %d" % (str(level)), code)
+
+    def get_compatibility(self, subject=None):
+        """
+        GET /config
+        Get the current compatibility level for a subject.  Result will be one of:
+
+        @:param: subject: subject name
+        @:return: 'NONE','FULL','FORWARD', or 'BACKWARD'
+        """
+        url = '/'.join([self.url, 'config'])
+        if subject:
+            url += '/' + subject
+
+        result, code = self._send_request(url)
+        if code >= 200 and code <= 299:
+            compatibility = result.get('compatibility', None)
+
+        if not compatibility:
+            compatibility = result.get('compatibilityLevel')
+
+        return compatibility
diff --git a/confluent_kafka/avro/serializer/__init__.py b/confluent_kafka/avro/serializer/__init__.py
new file mode 100644
index 0000000..b3ba458
--- /dev/null
+++ b/confluent_kafka/avro/serializer/__init__.py
@@ -0,0 +1,40 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+class SerializerError(Exception):
+    """Generic error from serializer package"""
+
+    def __init__(self, message):
+        self.message = message
+
+        def __repr__(self):
+            return '{klass}(error={error})'.format(
+                klass=self.__class__.__name__,
+                error=self.message
+            )
+
+        def __str__(self):
+            return self.message
+
+
+class KeySerializerError(SerializerError):
+    pass
+
+
+class ValueSerializerError(SerializerError):
+    pass
diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py
new file mode 100644
index 0000000..47eab5d
--- /dev/null
+++ b/confluent_kafka/avro/serializer/message_serializer.py
@@ -0,0 +1,212 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+#
+# derived from https://github.com/verisign/python-confluent-schemaregistry.git
+#
+import io
+import logging
+import struct
+import sys
+import traceback
+
+import avro
+import avro.io
+
+from confluent_kafka.avro import ClientError
+from confluent_kafka.avro.serializer import (SerializerError,
+                                             KeySerializerError,
+                                             ValueSerializerError)
+
+log = logging.getLogger(__name__)
+
+MAGIC_BYTE = 0
+
+HAS_FAST = False
+try:
+    from fastavro.reader import read_data
+
+    HAS_FAST = True
+except:
+    pass
+
+
+class ContextStringIO(io.BytesIO):
+    """
+    Wrapper to allow use of StringIO via 'with' constructs.
+    """
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+        return False
+
+
+class MessageSerializer(object):
+    """
+    A helper class that can serialize and deserialize messages
+    that need to be encoded or decoded using the schema registry.
+
+    All encode_* methods return a buffer that can be sent to kafka.
+    All decode_* methods expect a buffer received from kafka.
+    """
+
+    def __init__(self, registry_client):
+        self.registry_client = registry_client
+        self.id_to_decoder_func = {}
+        self.id_to_writers = {}
+
+    '''
+
+    '''
+
+    def encode_record_with_schema(self, topic, schema, record, is_key=False):
+        """
+        Given a parsed avro schema, encode a record for the given topic.  The
+        record is expected to be a dictionary.
+
+        The schema is registered with the subject of 'topic-value'
+        @:param topic : Topic name
+        @:param schema : Avro Schema
+        @:param record : An object to serialize
+        @:param is_key : If the record is a key
+        @:returns : Encoded record with schema ID as bytes
+        """
+        serialize_err = KeySerializerError if is_key else ValueSerializerError
+
+        subject_suffix = ('-key' if is_key else '-value')
+        # get the latest schema for the subject
+        subject = topic + subject_suffix
+        # register it
+        schema_id = self.registry_client.register(subject, schema)
+        if not schema_id:
+            message = "Unable to retrieve schema id for subject %s" % (subject)
+            raise serialize_err(message)
+
+        # cache writer
+        self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
+
+        return self.encode_record_with_schema_id(schema_id, record, is_key=is_key)
+
+    def encode_record_with_schema_id(self, schema_id, record, is_key=False):
+        """
+        Encode a record with a given schema id.  The record must
+        be a python dictionary.
+        @:param: schema_id : integer ID
+        @:param: record : An object to serialize
+        @:param is_key : If the record is a key
+        @:returns: decoder function
+        """
+        serialize_err = KeySerializerError if is_key else ValueSerializerError
+
+        # use slow avro
+        if schema_id not in self.id_to_writers:
+            # get the writer + schema
+
+            try:
+                schema = self.registry_client.get_by_id(schema_id)
+                if not schema:
+                    raise serialize_err("Schema does not exist")
+                self.id_to_writers[schema_id] = avro.io.DatumWriter(schema)
+            except ClientError as e:
+                exc_type, exc_value, exc_traceback = sys.exc_info()
+                raise serialize_err( + repr(
+                    traceback.format_exception(exc_type, exc_value, exc_traceback)))
+
+        # get the writer
+        writer = self.id_to_writers[schema_id]
+        with ContextStringIO() as outf:
+            # write the header
+            # magic byte
+
+            outf.write(struct.pack('b', MAGIC_BYTE))
+
+            # write the schema ID in network byte order (big end)
+
+            outf.write(struct.pack('>I', schema_id))
+
+            # write the record to the rest of it
+            # Create an encoder that we'll write to
+            encoder = avro.io.BinaryEncoder(outf)
+            # write the magic byte
+            # write the object in 'obj' as Avro to the fake file...
+            writer.write(record, encoder)
+
+            return outf.getvalue()
+
+    # Decoder support
+    def _get_decoder_func(self, schema_id, payload):
+        if schema_id in self.id_to_decoder_func:
+            return self.id_to_decoder_func[schema_id]
+
+        # fetch from schema reg
+        try:
+            schema = self.registry_client.get_by_id(schema_id)
+        except:
+            schema = None
+
+        if not schema:
+            err = "unable to fetch schema with id %d" % (schema_id)
+            raise SerializerError(err)
+
+        curr_pos = payload.tell()
+        if HAS_FAST:
+            # try to use fast avro
+            try:
+                schema_dict = schema.to_json()
+                obj = read_data(payload, schema_dict)
+                # here means we passed so this is something fastavro can do
+                # seek back since it will be called again for the
+                # same payload - one time hit
+
+                payload.seek(curr_pos)
+                decoder_func = lambda p: read_data(p, schema_dict)
+                self.id_to_decoder_func[schema_id] = decoder_func
+                return self.id_to_decoder_func[schema_id]
+            except:
+                pass
+
+        # here means we should just delegate to slow avro
+        # rewind
+        payload.seek(curr_pos)
+        avro_reader = avro.io.DatumReader(schema)
+
+        def decoder(p):
+            bin_decoder = avro.io.BinaryDecoder(p)
+            return avro_reader.read(bin_decoder)
+
+        self.id_to_decoder_func[schema_id] = decoder
+        return self.id_to_decoder_func[schema_id]
+
+    def decode_message(self, message):
+        """
+        Decode a message from kafka that has been encoded for use with
+        the schema registry.
+        @:param: message
+        """
+        if len(message) <= 5:
+            raise SerializerError("message is too small to decode")
+
+        with ContextStringIO(message) as payload:
+            magic, schema_id = struct.unpack('>bI', payload.read(5))
+            if magic != MAGIC_BYTE:
+                raise SerializerError("message does not start with magic byte")
+            decoder_func = self._get_decoder_func(schema_id, payload)
+            return decoder_func(payload)
diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c
index 443a8a3..820ed84 100644
--- a/confluent_kafka/src/Consumer.c
+++ b/confluent_kafka/src/Consumer.c
... 2686 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-confluent-kafka.git



More information about the Python-modules-commits mailing list