[Python-modules-commits] [python-confluent-kafka] 01/06: Import python-confluent-kafka_0.9.2.orig.tar.gz

Christos Trochalakis ctrochalakis at moszumanska.debian.org
Mon Jan 16 14:03:49 UTC 2017


This is an automated email from the git hooks/post-receive script.

ctrochalakis pushed a commit to tag debian/0.9.2-1
in repository python-confluent-kafka.

commit bd39861dc7688213d0d92f04020ccc777f2a5434
Author: Christos Trochalakis <yatiohi at ideopolis.gr>
Date:   Thu Jan 12 14:59:27 2017 +0200

    Import python-confluent-kafka_0.9.2.orig.tar.gz
---
 PKG-INFO                              |   2 +-
 README.md                             |  35 ++++++-
 confluent_kafka.egg-info/PKG-INFO     |   2 +-
 confluent_kafka.egg-info/SOURCES.txt  |  10 +-
 confluent_kafka/src/Consumer.c        | 169 +++++++++++++++++++--------------
 confluent_kafka/src/Producer.c        | 111 +++++++++++-----------
 confluent_kafka/src/confluent_kafka.c | 173 +++++++++++++++++++++++++++++-----
 confluent_kafka/src/confluent_kafka.h | 124 +++++++++++++++++-------
 setup.py                              |   2 +-
 tests/test_Consumer.py                |  62 ++++++++++++
 tests/test_KafkaError.py              |  30 ++++++
 tests/test_Producer.py                |  38 ++++++++
 tests/test_TopicPartition.py          |  38 ++++++++
 tests/test_docs.py                    |  29 ++++++
 tests/test_enums.py                   |   9 ++
 tests/test_misc.py                    |  16 ++++
 tests/test_threads.py                 |  69 ++++++++++++++
 17 files changed, 725 insertions(+), 194 deletions(-)

diff --git a/PKG-INFO b/PKG-INFO
index 2ef20cb..90c0183 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: confluent-kafka
-Version: 0.9.1.2
+Version: 0.9.2
 Summary: Confluent's Apache Kafka client for Python
 Home-page: https://github.com/confluentinc/confluent-kafka-python
 Author: Confluent Inc
diff --git a/README.md b/README.md
index 23bf7ea..7362208 100644
--- a/README.md
+++ b/README.md
@@ -30,21 +30,48 @@ p.flush()
 **High-level Consumer:**
 
 ```python
-from confluent_kafka import Consumer
+from confluent_kafka import Consumer, KafkaError
 
 c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup',
               'default.topic.config': {'auto.offset.reset': 'smallest'}})
 c.subscribe(['mytopic'])
+running = True
 while running:
     msg = c.poll()
     if not msg.error():
         print('Received message: %s' % msg.value().decode('utf-8'))
+    elif msg.error().code() != KafkaError._PARTITION_EOF:
+        print(msg.error())
+        running = False
 c.close()
 ```
 
 See [examples](examples) for more examples.
 
 
+Broker compatibility
+====================
+The Python client (as well as the underlying C library librdkafka) supports
+all broker versions >= 0.8.
+But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it
+is not safe for a client to assume what protocol version is actually supported
+by the broker, thus you will need to hint the Python client what protocol
+version it may use. This is done through two configuration settings:
+
+ * `broker.version.fallback=YOUR_BROKER_VERSION` (default 0.9.0.1)
+ * `api.version.request=true|false` (default false)
+
+When using a Kafka 0.10 broker or later you only need to set
+`api.version.request=true`.
+If you use Kafka broker 0.9 or 0.8 you should leave
+`api.version.request=false` (default) and set
+`broker.version.fallback` to your broker version,
+e.g `broker.version.fallback=0.9.0.1`.
+
+More info here:
+https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
+
+
 Prerequisites
 =============
 
@@ -68,11 +95,11 @@ Install
 Build
 =====
 
-    $ python setup.by build
+    $ python setup.py build
 
 If librdkafka is installed in a non-standard location provide the include and library directories with:
 
-    $ CPLUS_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python setup.py ...
+    $ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python setup.py ...
 
 
 Tests
@@ -103,6 +130,6 @@ Install sphinx and sphinx_rtd_theme packages and then:
 
 or:
 
-    $ python setup.by build_sphinx
+    $ python setup.py build_sphinx
 
 Documentation will be generated in `docs/_build/`.
diff --git a/confluent_kafka.egg-info/PKG-INFO b/confluent_kafka.egg-info/PKG-INFO
index 2ef20cb..90c0183 100644
--- a/confluent_kafka.egg-info/PKG-INFO
+++ b/confluent_kafka.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: confluent-kafka
-Version: 0.9.1.2
+Version: 0.9.2
 Summary: Confluent's Apache Kafka client for Python
 Home-page: https://github.com/confluentinc/confluent-kafka-python
 Author: Confluent Inc
diff --git a/confluent_kafka.egg-info/SOURCES.txt b/confluent_kafka.egg-info/SOURCES.txt
index bc78298..d99b51b 100644
--- a/confluent_kafka.egg-info/SOURCES.txt
+++ b/confluent_kafka.egg-info/SOURCES.txt
@@ -14,4 +14,12 @@ confluent_kafka/kafkatest/verifiable_producer.py
 confluent_kafka/src/Consumer.c
 confluent_kafka/src/Producer.c
 confluent_kafka/src/confluent_kafka.c
-confluent_kafka/src/confluent_kafka.h
\ No newline at end of file
+confluent_kafka/src/confluent_kafka.h
+tests/test_Consumer.py
+tests/test_KafkaError.py
+tests/test_Producer.py
+tests/test_TopicPartition.py
+tests/test_docs.py
+tests/test_enums.py
+tests/test_misc.py
+tests/test_threads.py
\ No newline at end of file
diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c
index 80e136a..443a8a3 100644
--- a/confluent_kafka/src/Consumer.c
+++ b/confluent_kafka/src/Consumer.c
@@ -28,23 +28,26 @@
  ****************************************************************************/
 
 
-static int Consumer_clear (Consumer *self) {
-	if (self->on_assign) {
-		Py_DECREF(self->on_assign);
-		self->on_assign = NULL;
+static int Consumer_clear (Handle *self) {
+	if (self->u.Consumer.on_assign) {
+		Py_DECREF(self->u.Consumer.on_assign);
+		self->u.Consumer.on_assign = NULL;
 	}
-	if (self->on_revoke) {
-		Py_DECREF(self->on_revoke);
-		self->on_revoke = NULL;
+	if (self->u.Consumer.on_revoke) {
+		Py_DECREF(self->u.Consumer.on_revoke);
+		self->u.Consumer.on_revoke = NULL;
 	}
-	if (self->on_commit) {
-		Py_DECREF(self->on_commit);
-		self->on_commit = NULL;
+	if (self->u.Consumer.on_commit) {
+		Py_DECREF(self->u.Consumer.on_commit);
+		self->u.Consumer.on_commit = NULL;
 	}
+
+	Handle_clear(self);
+
 	return 0;
 }
 
-static void Consumer_dealloc (Consumer *self) {
+static void Consumer_dealloc (Handle *self) {
 	PyObject_GC_UnTrack(self);
 
 	Consumer_clear(self);
@@ -55,12 +58,17 @@ static void Consumer_dealloc (Consumer *self) {
 	Py_TYPE(self)->tp_free((PyObject *)self);
 }
 
-static int Consumer_traverse (Consumer *self,
-				  visitproc visit, void *arg) {
-	if (self->on_assign)
-		Py_VISIT(self->on_assign);
-	if (self->on_revoke)
-		Py_VISIT(self->on_revoke);
+static int Consumer_traverse (Handle *self,
+			      visitproc visit, void *arg) {
+	if (self->u.Consumer.on_assign)
+		Py_VISIT(self->u.Consumer.on_assign);
+	if (self->u.Consumer.on_revoke)
+		Py_VISIT(self->u.Consumer.on_revoke);
+	if (self->u.Consumer.on_commit)
+		Py_VISIT(self->u.Consumer.on_commit);
+
+	Handle_traverse(self, visit, arg);
+
 	return 0;
 }
 
@@ -69,7 +77,7 @@ static int Consumer_traverse (Consumer *self,
 
 
 
-static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
+static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
 					 PyObject *kwargs) {
 
 	rd_kafka_topic_partition_list_t *topics;
@@ -100,7 +108,7 @@ static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
 		return NULL;
 	}
 
-	topics = rd_kafka_topic_partition_list_new(PyList_Size(tlist));
+	topics = rd_kafka_topic_partition_list_new((int)PyList_Size(tlist));
 	for (pos = 0 ; pos < PyList_Size(tlist) ; pos++) {
 		PyObject *o = PyList_GetItem(tlist, pos);
 		PyObject *uo;
@@ -130,29 +138,29 @@ static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
 	/*
 	 * Update rebalance callbacks
 	 */
-	if (self->on_assign) {
-		Py_DECREF(self->on_assign);
-		self->on_assign = NULL;
+	if (self->u.Consumer.on_assign) {
+		Py_DECREF(self->u.Consumer.on_assign);
+		self->u.Consumer.on_assign = NULL;
 	}
 	if (on_assign) {
-		self->on_assign = on_assign;
-		Py_INCREF(self->on_assign);
+		self->u.Consumer.on_assign = on_assign;
+		Py_INCREF(self->u.Consumer.on_assign);
 	}
 
-	if (self->on_revoke) {
-		Py_DECREF(self->on_revoke);
-		self->on_revoke = NULL;
+	if (self->u.Consumer.on_revoke) {
+		Py_DECREF(self->u.Consumer.on_revoke);
+		self->u.Consumer.on_revoke = NULL;
 	}
 	if (on_revoke) {
-		self->on_revoke = on_revoke;
-		Py_INCREF(self->on_revoke);
+		self->u.Consumer.on_revoke = on_revoke;
+		Py_INCREF(self->u.Consumer.on_revoke);
 	}
 
 	Py_RETURN_NONE;
 }
 
 
-static PyObject *Consumer_unsubscribe (Consumer *self,
+static PyObject *Consumer_unsubscribe (Handle *self,
 					   PyObject *ignore) {
 
 	rd_kafka_resp_err_t err;
@@ -169,7 +177,7 @@ static PyObject *Consumer_unsubscribe (Consumer *self,
 }
 
 
-static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
+static PyObject *Consumer_assign (Handle *self, PyObject *tlist) {
 
 	rd_kafka_topic_partition_list_t *c_parts;
 	rd_kafka_resp_err_t err;
@@ -177,7 +185,7 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
 	if (!(c_parts = py_to_c_parts(tlist)))
 		return NULL;
 
-	self->rebalance_assigned++;
+	self->u.Consumer.rebalance_assigned++;
 
 	err = rd_kafka_assign(self->rk, c_parts);
 
@@ -185,7 +193,7 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
 
 	if (err) {
 		cfl_PyErr_Format(err,
-				 "Failed to set assignemnt: %s",
+				 "Failed to set assignment: %s",
 				 rd_kafka_err2str(err));
 		return NULL;
 	}
@@ -194,11 +202,11 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
 }
 
 
-static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) {
+static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) {
 
 	rd_kafka_resp_err_t err;
 
-	self->rebalance_assigned++;
+	self->u.Consumer.rebalance_assigned++;
 
 	err = rd_kafka_assign(self->rk, NULL);
 	if (err) {
@@ -213,7 +221,7 @@ static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) {
 
 
 
-static PyObject *Consumer_commit (Consumer *self, PyObject *args,
+static PyObject *Consumer_commit (Handle *self, PyObject *args,
 					PyObject *kwargs) {
 
 	rd_kafka_resp_err_t err;
@@ -281,7 +289,7 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args,
 
 
 
-static PyObject *Consumer_committed (Consumer *self, PyObject *args,
+static PyObject *Consumer_committed (Handle *self, PyObject *args,
 					 PyObject *kwargs) {
 
 	PyObject *plist;
@@ -317,7 +325,7 @@ static PyObject *Consumer_committed (Consumer *self, PyObject *args,
 }
 
 
-static PyObject *Consumer_position (Consumer *self, PyObject *args,
+static PyObject *Consumer_position (Handle *self, PyObject *args,
 					PyObject *kwargs) {
 
 	PyObject *plist;
@@ -352,26 +360,23 @@ static PyObject *Consumer_position (Consumer *self, PyObject *args,
 
 
 
-static PyObject *Consumer_poll (Consumer *self, PyObject *args,
+static PyObject *Consumer_poll (Handle *self, PyObject *args,
 				    PyObject *kwargs) {
 	double tmout = -1.0f;
 	static char *kws[] = { "timeout", NULL };
 	rd_kafka_message_t *rkm;
 	PyObject *msgobj;
+	CallState cs;
 
 	if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
 		return NULL;
 
-	self->callback_crashed = 0;
-	self->thread_state = PyEval_SaveThread();
+	CallState_begin(self, &cs);
 
 	rkm = rd_kafka_consumer_poll(self->rk, tmout >= 0 ?
 				     (int)(tmout * 1000.0f) : -1);
 
-	PyEval_RestoreThread(self->thread_state);
-	self->thread_state = NULL;
-
-	if (self->callback_crashed)
+	if (!CallState_end(self, &cs))
 		return NULL;
 
 	if (!rkm)
@@ -384,10 +389,22 @@ static PyObject *Consumer_poll (Consumer *self, PyObject *args,
 }
 
 
-static PyObject *Consumer_close (Consumer *self, PyObject *ignore) {
-	self->thread_state = PyEval_SaveThread();
+static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
+	CallState cs;
+	int raise = 0;
+
+	CallState_begin(self, &cs);
+
 	rd_kafka_consumer_close(self->rk);
-	PyEval_RestoreThread(self->thread_state);
+
+	raise = !CallState_end(self, &cs);
+
+	rd_kafka_destroy(self->rk);
+	self->rk = NULL;
+
+	if (raise)
+		return NULL;
+
 	Py_RETURN_NONE;
 }
 
@@ -523,7 +540,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
 PyTypeObject ConsumerType = {
 	PyVarObject_HEAD_INIT(NULL, 0)
 	"cimpl.Consumer",        /*tp_name*/
-	sizeof(Consumer),      /*tp_basicsize*/
+	sizeof(Handle),          /*tp_basicsize*/
 	0,                         /*tp_itemsize*/
 	(destructor)Consumer_dealloc, /*tp_dealloc*/
 	0,                         /*tp_print*/
@@ -584,14 +601,17 @@ PyTypeObject ConsumerType = {
 static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
 				   rd_kafka_topic_partition_list_t *c_parts,
 				   void *opaque) {
-	Consumer *self = opaque;
+	Handle *self = opaque;
+	CallState *cs;
 
-	PyEval_RestoreThread(self->thread_state);
+	cs = CallState_get(self);
 
-	self->rebalance_assigned = 0;
+	self->u.Consumer.rebalance_assigned = 0;
 
-	if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && self->on_assign) ||
-	    (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS && self->on_revoke)) {
+	if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS &&
+	     self->u.Consumer.on_assign) ||
+	    (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS &&
+	     self->u.Consumer.on_revoke)) {
 		PyObject *parts;
 		PyObject *args, *result;
 
@@ -605,21 +625,22 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
 		if (!args) {
 			cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
 					 "Unable to build callback args");
-			self->thread_state = PyEval_SaveThread();
-			self->callback_crashed++;
+			CallState_crash(cs);
+			CallState_resume(cs);
 			return;
 		}
 
 		result = PyObject_CallObject(
 			err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
-			self->on_assign : self->on_revoke, args);
+			self->u.Consumer.on_assign :
+			self->u.Consumer.on_revoke, args);
 
 		Py_DECREF(args);
 
 		if (result)
 			Py_DECREF(result);
 		else {
-			self->callback_crashed++;
+			CallState_crash(cs);
 			rd_kafka_yield(rk);
 		}
 	}
@@ -628,33 +649,37 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
 	 * to synchronize state, if the user did not do this from callback,
 	 * or there was no callback, or the callback failed, then we perform
 	 * that assign() call here instead. */
-	if (!self->rebalance_assigned) {
+	if (!self->u.Consumer.rebalance_assigned) {
 		if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
 			rd_kafka_assign(rk, c_parts);
 		else
 			rd_kafka_assign(rk, NULL);
 	}
 
-	self->thread_state = PyEval_SaveThread();
+	CallState_resume(cs);
 }
 
 
 static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
 				       rd_kafka_topic_partition_list_t *c_parts,
 				       void *opaque) {
-	Consumer *self = opaque;
+	Handle *self = opaque;
 	PyObject *parts, *k_err, *args, *result;
+	CallState *cs;
 
-	if (!self->on_commit)
+	if (!self->u.Consumer.on_commit)
 		return;
 
-	PyEval_RestoreThread(self->thread_state);
+	cs = CallState_get(self);
 
 	/* Insantiate error object */
 	k_err = KafkaError_new_or_None(err, NULL);
 
 	/* Construct list of TopicPartition based on 'c_parts' */
-	parts = c_parts_to_py(c_parts);
+	if (c_parts)
+		parts = c_parts_to_py(c_parts);
+	else
+		parts = PyList_New(0);
 
 	args = Py_BuildValue("(OO)", k_err, parts);
 
@@ -664,39 +689,39 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
 	if (!args) {
 		cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
 				 "Unable to build callback args");
-		self->thread_state = PyEval_SaveThread();
-		self->callback_crashed++;
+		CallState_crash(cs);
+		CallState_resume(cs);
 		return;
 	}
 
-	result = PyObject_CallObject(self->on_commit, args);
+	result = PyObject_CallObject(self->u.Consumer.on_commit, args);
 
 	Py_DECREF(args);
 
 	if (result)
 		Py_DECREF(result);
 	else {
-		self->callback_crashed++;
+		CallState_crash(cs);
 		rd_kafka_yield(rk);
 	}
 
-	self->thread_state = PyEval_SaveThread();
+	CallState_resume(cs);
 }
 
 
 
 static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
 				   PyObject *kwargs) {
-	Consumer *self;
+	Handle *self;
 	char errstr[256];
 	rd_kafka_conf_t *conf;
 
-	self = (Consumer *)ConsumerType.tp_alloc(&ConsumerType, 0);
+	self = (Handle *)ConsumerType.tp_alloc(&ConsumerType, 0);
 	if (!self)
 		return NULL;
 
 	if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
-					   args, kwargs))) {
+				       args, kwargs))) {
 		Py_DECREF(self);
 		return NULL;
 	}
diff --git a/confluent_kafka/src/Producer.c b/confluent_kafka/src/Producer.c
index c69ca07..a5b16bd 100644
--- a/confluent_kafka/src/Producer.c
+++ b/confluent_kafka/src/Producer.c
@@ -46,7 +46,7 @@
  * Per-message state.
  */
 struct Producer_msgstate {
-	Producer *self;
+	Handle   *self;
 	PyObject *dr_cb;
 	PyObject *partitioner_cb;
 };
@@ -57,8 +57,8 @@ struct Producer_msgstate {
  * Returns NULL if neither dr_cb or partitioner_cb is set.
  */
 static __inline struct Producer_msgstate *
-Producer_msgstate_new (Producer *self,
-			   PyObject *dr_cb, PyObject *partitioner_cb) {
+Producer_msgstate_new (Handle *self,
+		       PyObject *dr_cb, PyObject *partitioner_cb) {
 	struct Producer_msgstate *msgstate;
 
 	if (!dr_cb && !partitioner_cb)
@@ -88,19 +88,22 @@ Producer_msgstate_destroy (struct Producer_msgstate *msgstate) {
 }
 
 
-static int Producer_clear (Producer *self) {
-	if (self->default_dr_cb) {
-		Py_DECREF(self->default_dr_cb);
-		self->default_dr_cb = NULL;
+static int Producer_clear (Handle *self) {
+	if (self->u.Producer.default_dr_cb) {
+		Py_DECREF(self->u.Producer.default_dr_cb);
+		self->u.Producer.default_dr_cb = NULL;
 	}
-	if (self->partitioner_cb) {
-		Py_DECREF(self->partitioner_cb);
-		self->partitioner_cb = NULL;
+	if (self->u.Producer.partitioner_cb) {
+		Py_DECREF(self->u.Producer.partitioner_cb);
+		self->u.Producer.partitioner_cb = NULL;
 	}
+
+	Handle_clear(self);
+
 	return 0;
 }
 
-static void Producer_dealloc (Producer *self) {
+static void Producer_dealloc (Handle *self) {
 	PyObject_GC_UnTrack(self);
 
 	Producer_clear(self);
@@ -111,12 +114,15 @@ static void Producer_dealloc (Producer *self) {
 	Py_TYPE(self)->tp_free((PyObject *)self);
 }
 
-static int Producer_traverse (Producer *self,
-				  visitproc visit, void *arg) {
-	if (self->default_dr_cb)
-		Py_VISIT(self->default_dr_cb);
-	if (self->partitioner_cb)
-		Py_VISIT(self->partitioner_cb);
+static int Producer_traverse (Handle *self,
+			      visitproc visit, void *arg) {
+	if (self->u.Producer.default_dr_cb)
+		Py_VISIT(self->u.Producer.default_dr_cb);
+	if (self->u.Producer.partitioner_cb)
+		Py_VISIT(self->u.Producer.partitioner_cb);
+
+	Handle_traverse(self, visit, arg);
+
 	return 0;
 }
 
@@ -124,7 +130,8 @@ static int Producer_traverse (Producer *self,
 static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
 			   void *opaque) {
 	struct Producer_msgstate *msgstate = rkm->_private;
-	Producer *self = opaque;
+	Handle *self = opaque;
+	CallState *cs;
 	PyObject *args;
 	PyObject *result;
 	PyObject *msgobj;
@@ -132,7 +139,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
 	if (!msgstate)
 		return;
 
-	PyEval_RestoreThread(self->thread_state);
+	cs = CallState_get(self);
 
 	if (!msgstate->dr_cb) {
 		/* No callback defined */
@@ -150,7 +157,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
 	if (!args) {
 		cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
 				 "Unable to build callback args");
-		self->callback_crashed++;
+		CallState_crash(cs);
 		goto done;
 	}
 
@@ -160,13 +167,13 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
 	if (result)
 		Py_DECREF(result);
 	else {
-		self->callback_crashed++;
+		CallState_crash(cs);
 		rd_kafka_yield(rk);
 	}
 
  done:
 	Producer_msgstate_destroy(msgstate);
-	self->thread_state = PyEval_SaveThread();
+	CallState_resume(cs);
 }
 
 
@@ -178,7 +185,7 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
 				 size_t keylen,
 				 int32_t partition_cnt,
 				 void *rkt_opaque, void *msg_opaque) {
-	Producer *self = rkt_opaque;
+	Handle *self = rkt_opaque;
 	struct Producer_msgstate *msgstate = msg_opaque;
 	PyGILState_STATE gstate;
 	PyObject *result;
@@ -188,9 +195,9 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
 	if (!msgstate) {
 		/* Fall back on default C partitioner if neither a per-msg
 		 * partitioner nor a default Python partitioner is available */
-		return self->c_partitioner_cb(rkt, keydata, keylen,
-					      partition_cnt, rkt_opaque,
-					      msg_opaque);
+		return self->u.Producer.c_partitioner_cb(rkt, keydata, keylen,
+							 partition_cnt,
+							 rkt_opaque, msg_opaque);
 	}
 
 	gstate = PyGILState_Ensure();
@@ -198,9 +205,11 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
 	if (!msgstate->partitioner_cb) {
 		/* Fall back on default C partitioner if neither a per-msg
 		 * partitioner nor a default Python partitioner is available */
-		r = msgstate->self->c_partitioner_cb(rkt, keydata, keylen,
-						     partition_cnt, rkt_opaque,
-						     msg_opaque);
+		r = msgstate->self->u.Producer.c_partitioner_cb(rkt,
+								keydata, keylen,
+								partition_cnt,
+								rkt_opaque,
+								msg_opaque);
 		goto done;
 	}
 
@@ -210,7 +219,6 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
 	if (!args) {
 		cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
 				 "Unable to build callback args");
-		printf("Failed to build args\n");
 		goto done;
 	}
 
@@ -219,7 +227,7 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
 	Py_DECREF(args);
 
 	if (result) {
-		r = PyLong_AsLong(result);
+		r = (int32_t)PyLong_AsLong(result);
 		if (PyErr_Occurred())
 			printf("FIXME: partition_cb returned wrong type "
 			       "(expected long), how to propagate?\n");
@@ -237,7 +245,7 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
 
 
 
-static PyObject *Producer_produce (Producer *self, PyObject *args,
+static PyObject *Producer_produce (Handle *self, PyObject *args,
 				       PyObject *kwargs) {
 	const char *topic, *value = NULL, *key = NULL;
 	int value_len = 0, key_len = 0;
@@ -271,10 +279,10 @@ static PyObject *Producer_produce (Producer *self, PyObject *args,
 		return NULL;
 	}
 
-	if (!dr_cb)
-		dr_cb = self->default_dr_cb;
-	if (!partitioner_cb)
-		partitioner_cb = self->partitioner_cb;
+	if (!dr_cb || dr_cb == Py_None)
+		dr_cb = self->u.Producer.default_dr_cb;
+	if (!partitioner_cb || partitioner_cb == Py_None)
+		partitioner_cb = self->u.Producer.partitioner_cb;
 
 	/* Create msgstate if necessary, may return NULL if no callbacks
 	 * are wanted. */
@@ -311,28 +319,23 @@ static PyObject *Producer_produce (Producer *self, PyObject *args,
  * @returns -1 if callback crashed (or poll() failed), else the number
  * of events served.
  */
-static int Producer_poll0 (Producer *self, int tmout) {
+static int Producer_poll0 (Handle *self, int tmout) {
 	int r;
+	CallState cs;
 
-	self->callback_crashed = 0;
-	self->thread_state = PyEval_SaveThread();
+	CallState_begin(self, &cs);
 
 	r = rd_kafka_poll(self->rk, tmout);
 
-	PyEval_RestoreThread(self->thread_state);
-	self->thread_state = NULL;
-
-	if (PyErr_CheckSignals() == -1)
-		return -1;
-
-	if (self->callback_crashed)
+	if (!CallState_end(self, &cs)) {
 		return -1;
+	}
 
 	return r;
 }
 
 
-static PyObject *Producer_poll (Producer *self, PyObject *args,
+static PyObject *Producer_poll (Handle *self, PyObject *args,
 				    PyObject *kwargs) {
 	double tmout;
 	int r;
@@ -349,7 +352,7 @@ static PyObject *Producer_poll (Producer *self, PyObject *args,
 }
 
 
-static PyObject *Producer_flush (Producer *self, PyObject *ignore) {
+static PyObject *Producer_flush (Handle *self, PyObject *ignore) {
 	while (rd_kafka_outq_len(self->rk) > 0) {
 		if (Producer_poll0(self, 500) == -1)
 			return NULL;
@@ -367,7 +370,7 @@ static PyMethodDef Producer_methods[] = {
 	  "  This is an asynchronous operation, an application may use the "
 	  "``callback`` (alias ``on_delivery``) argument to pass a function "
 	  "(or lambda) that will be called from :py:func:`poll()` when the "
-	  "message has been succesfully delivered or permanently fails delivery.\n"
+	  "message has been successfully delivered or permanently fails delivery.\n"
 	  "\n"
 	  "  :param str topic: Topic to produce message to\n"
 	  "  :param str|bytes value: Message payload\n"
@@ -375,7 +378,7 @@ static PyMethodDef Producer_methods[] = {
 	  "  :param int partition: Partition to produce to, elses uses the "
 	  "configured partitioner.\n"
 	  "  :param func on_delivery(err,msg): Delivery report callback to call "
-	  "(from :py:func:`poll()` or :py:func:`flush()`) on succesful or "
+	  "(from :py:func:`poll()` or :py:func:`flush()`) on successful or "
 	  "failed delivery\n"
 	  "\n"
 	  "  :rtype: None\n"
@@ -415,7 +418,7 @@ static PyMethodDef Producer_methods[] = {
 };
 
 
-static Py_ssize_t Producer__len__ (Producer *self) {
+static Py_ssize_t Producer__len__ (Handle *self) {
 	return rd_kafka_outq_len(self->rk);
 }
 
@@ -431,7 +434,7 @@ static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
 PyTypeObject ProducerType = {
 	PyVarObject_HEAD_INIT(NULL, 0)
 	"cimpl.Producer",        /*tp_name*/
-	sizeof(Producer),      /*tp_basicsize*/
+	sizeof(Handle),      /*tp_basicsize*/
 	0,                         /*tp_itemsize*/
 	(destructor)Producer_dealloc, /*tp_dealloc*/
 	0,                         /*tp_print*/
@@ -484,11 +487,11 @@ PyTypeObject ProducerType = {
 
 static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
 				   PyObject *kwargs) {
-	Producer *self;
+	Handle *self;
 	char errstr[256];
 	rd_kafka_conf_t *conf;
 
-	self = (Producer *)ProducerType.tp_alloc(&ProducerType, 0);
+	self = (Handle *)ProducerType.tp_alloc(&ProducerType, 0);
 	if (!self)
 		return NULL;
 
diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c
index e85ef20..0ab967b 100644
--- a/confluent_kafka/src/confluent_kafka.c
+++ b/confluent_kafka/src/confluent_kafka.c
@@ -133,7 +133,7 @@ static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2,
 	if (Py_TYPE(o2) == &KafkaErrorType)
 		code2 = ((KafkaError *)o2)->code;
 	else
-		code2 = PyLong_AsLong(o2);
+		code2 = (int)PyLong_AsLong(o2);
 
 	switch (op)
 	{
@@ -164,7 +164,7 @@ static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2,
 	Py_INCREF(result);
 	return result;
 }
-				       
+
 
 static PyTypeObject KafkaErrorType = {
 	PyVarObject_HEAD_INIT(NULL, 0)
@@ -320,7 +320,7 @@ static PyObject *Message_partition (Message *self, PyObject *ignore) {
 
 static PyObject *Message_offset (Message *self, PyObject *ignore) {
 	if (self->offset >= 0)
-		return PyLong_FromLong(self->offset);
+		return PyLong_FromLongLong(self->offset);
 	else
 		Py_RETURN_NONE;
 }
@@ -560,7 +560,7 @@ static PyObject *TopicPartition_new (PyTypeObject *type, PyObject *args,
 
 	return TopicPartition_new0(topic, partition, offset, 0);
 }
-	
+
 
 
 static int TopicPartition_traverse (TopicPartition *self,
@@ -758,7 +758,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
 	}
 
 	return parts;
-		
+
 }
 
 /**
@@ -776,7 +776,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
 		return NULL;
 	}
 
-	c_parts = rd_kafka_topic_partition_list_new(PyList_Size(plist));
+	c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));
 
 	for (i = 0 ; i < PyList_Size(plist) ; i++) {
 		TopicPartition *tp = (TopicPartition *)
@@ -801,6 +801,40 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
 }
 
 
+/****************************************************************************
+ *
+ *
+ * Common callbacks
+ *
+ *
+ *
+ *
+ ****************************************************************************/
+static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
+	Handle *h = opaque;
+	PyObject *eo, *result;
+	CallState *cs;
+
+	cs = CallState_get(h);
+	if (!h->error_cb) {
+		/* No callback defined */
+		goto done;
+	}
+
+	eo = KafkaError_new0(err, "%s", reason);
+	result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
+	Py_DECREF(eo);
+
+	if (result) {
+		Py_DECREF(result);
+	} else {
+		CallState_crash(cs);
+		rd_kafka_yield(h->rk);
+	}
+
+ done:
+	CallState_resume(cs);
+}
 
 
 /****************************************************************************
@@ -814,6 +848,30 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
  ****************************************************************************/
 
 
+
+/**
+ * Clear Python object references in Handle
+ */
+void Handle_clear (Handle *h) {
+	if (h->error_cb) {
+		Py_DECREF(h->error_cb);
+	}
+
+	PyThread_delete_key(h->tlskey);
+}
+
+/**
+ * GC traversal for Python object references
+ */
+int Handle_traverse (Handle *h, visitproc visit, void *arg) {
+	if (h->error_cb)
+		Py_VISIT(h->error_cb);
+
+	return 0;
+}
+
+
+
 /**
  * Populate topic conf from provided dict.
  *
@@ -879,7 +937,7 @@ static int populate_topic_conf (rd_kafka_topic_conf_t *tconf, const char *what,
  *
  * @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
  */
-static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf,
+static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
 				      rd_kafka_topic_conf_t *tconf,
 				      const char *name, PyObject *valobj) {
 	PyObject *vs;
@@ -894,8 +952,8 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf,
 			return -1;
 		}
 
-		self->default_dr_cb = valobj;
-		Py_INCREF(self->default_dr_cb);
+		self->u.Producer.default_dr_cb = valobj;
+		Py_INCREF(self->u.Producer.default_dr_cb);
 
 		return 1;
 
@@ -947,11 +1005,11 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf,
 				return -1;
 			}
 
-			if (self->partitioner_cb)
-				Py_DECREF(self->partitioner_cb);
+			if (self->u.Producer.partitioner_cb)
+				Py_DECREF(self->u.Producer.partitioner_cb);
 
-			self->partitioner_cb = valobj;
-			Py_INCREF(self->partitioner_cb);
... 664 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-confluent-kafka.git



More information about the Python-modules-commits mailing list