[Python-modules-commits] [python-confluent-kafka] 01/06: Import python-confluent-kafka_0.9.2.orig.tar.gz
Christos Trochalakis
ctrochalakis at moszumanska.debian.org
Mon Jan 16 14:03:49 UTC 2017
This is an automated email from the git hooks/post-receive script.
ctrochalakis pushed a commit to tag debian/0.9.2-1
in repository python-confluent-kafka.
commit bd39861dc7688213d0d92f04020ccc777f2a5434
Author: Christos Trochalakis <yatiohi at ideopolis.gr>
Date: Thu Jan 12 14:59:27 2017 +0200
Import python-confluent-kafka_0.9.2.orig.tar.gz
---
PKG-INFO | 2 +-
README.md | 35 ++++++-
confluent_kafka.egg-info/PKG-INFO | 2 +-
confluent_kafka.egg-info/SOURCES.txt | 10 +-
confluent_kafka/src/Consumer.c | 169 +++++++++++++++++++--------------
confluent_kafka/src/Producer.c | 111 +++++++++++-----------
confluent_kafka/src/confluent_kafka.c | 173 +++++++++++++++++++++++++++++-----
confluent_kafka/src/confluent_kafka.h | 124 +++++++++++++++++-------
setup.py | 2 +-
tests/test_Consumer.py | 62 ++++++++++++
tests/test_KafkaError.py | 30 ++++++
tests/test_Producer.py | 38 ++++++++
tests/test_TopicPartition.py | 38 ++++++++
tests/test_docs.py | 29 ++++++
tests/test_enums.py | 9 ++
tests/test_misc.py | 16 ++++
tests/test_threads.py | 69 ++++++++++++++
17 files changed, 725 insertions(+), 194 deletions(-)
diff --git a/PKG-INFO b/PKG-INFO
index 2ef20cb..90c0183 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.0
Name: confluent-kafka
-Version: 0.9.1.2
+Version: 0.9.2
Summary: Confluent's Apache Kafka client for Python
Home-page: https://github.com/confluentinc/confluent-kafka-python
Author: Confluent Inc
diff --git a/README.md b/README.md
index 23bf7ea..7362208 100644
--- a/README.md
+++ b/README.md
@@ -30,21 +30,48 @@ p.flush()
**High-level Consumer:**
```python
-from confluent_kafka import Consumer
+from confluent_kafka import Consumer, KafkaError
c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe(['mytopic'])
+running = True
while running:
msg = c.poll()
if not msg.error():
print('Received message: %s' % msg.value().decode('utf-8'))
+ elif msg.error().code() != KafkaError._PARTITION_EOF:
+ print(msg.error())
+ running = False
c.close()
```
See [examples](examples) for more examples.
+Broker compatibility
+====================
+The Python client (as well as the underlying C library librdkafka) supports
+all broker versions >= 0.8.
+But due to the nature of the Kafka protocol in broker versions 0.8 and 0.9 it
+is not safe for a client to assume what protocol version is actually supported
+by the broker, thus you will need to hint the Python client what protocol
+version it may use. This is done through two configuration settings:
+
+ * `broker.version.fallback=YOUR_BROKER_VERSION` (default 0.9.0.1)
+ * `api.version.request=true|false` (default false)
+
+When using a Kafka 0.10 broker or later you only need to set
+`api.version.request=true`.
+If you use Kafka broker 0.9 or 0.8 you should leave
+`api.version.request=false` (default) and set
+`broker.version.fallback` to your broker version,
+e.g `broker.version.fallback=0.9.0.1`.
+
+More info here:
+https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
+
+
Prerequisites
=============
@@ -68,11 +95,11 @@ Install
Build
=====
- $ python setup.by build
+ $ python setup.py build
If librdkafka is installed in a non-standard location provide the include and library directories with:
- $ CPLUS_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python setup.py ...
+ $ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python setup.py ...
Tests
@@ -103,6 +130,6 @@ Install sphinx and sphinx_rtd_theme packages and then:
or:
- $ python setup.by build_sphinx
+ $ python setup.py build_sphinx
Documentation will be generated in `docs/_build/`.
diff --git a/confluent_kafka.egg-info/PKG-INFO b/confluent_kafka.egg-info/PKG-INFO
index 2ef20cb..90c0183 100644
--- a/confluent_kafka.egg-info/PKG-INFO
+++ b/confluent_kafka.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.0
Name: confluent-kafka
-Version: 0.9.1.2
+Version: 0.9.2
Summary: Confluent's Apache Kafka client for Python
Home-page: https://github.com/confluentinc/confluent-kafka-python
Author: Confluent Inc
diff --git a/confluent_kafka.egg-info/SOURCES.txt b/confluent_kafka.egg-info/SOURCES.txt
index bc78298..d99b51b 100644
--- a/confluent_kafka.egg-info/SOURCES.txt
+++ b/confluent_kafka.egg-info/SOURCES.txt
@@ -14,4 +14,12 @@ confluent_kafka/kafkatest/verifiable_producer.py
confluent_kafka/src/Consumer.c
confluent_kafka/src/Producer.c
confluent_kafka/src/confluent_kafka.c
-confluent_kafka/src/confluent_kafka.h
\ No newline at end of file
+confluent_kafka/src/confluent_kafka.h
+tests/test_Consumer.py
+tests/test_KafkaError.py
+tests/test_Producer.py
+tests/test_TopicPartition.py
+tests/test_docs.py
+tests/test_enums.py
+tests/test_misc.py
+tests/test_threads.py
\ No newline at end of file
diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c
index 80e136a..443a8a3 100644
--- a/confluent_kafka/src/Consumer.c
+++ b/confluent_kafka/src/Consumer.c
@@ -28,23 +28,26 @@
****************************************************************************/
-static int Consumer_clear (Consumer *self) {
- if (self->on_assign) {
- Py_DECREF(self->on_assign);
- self->on_assign = NULL;
+static int Consumer_clear (Handle *self) {
+ if (self->u.Consumer.on_assign) {
+ Py_DECREF(self->u.Consumer.on_assign);
+ self->u.Consumer.on_assign = NULL;
}
- if (self->on_revoke) {
- Py_DECREF(self->on_revoke);
- self->on_revoke = NULL;
+ if (self->u.Consumer.on_revoke) {
+ Py_DECREF(self->u.Consumer.on_revoke);
+ self->u.Consumer.on_revoke = NULL;
}
- if (self->on_commit) {
- Py_DECREF(self->on_commit);
- self->on_commit = NULL;
+ if (self->u.Consumer.on_commit) {
+ Py_DECREF(self->u.Consumer.on_commit);
+ self->u.Consumer.on_commit = NULL;
}
+
+ Handle_clear(self);
+
return 0;
}
-static void Consumer_dealloc (Consumer *self) {
+static void Consumer_dealloc (Handle *self) {
PyObject_GC_UnTrack(self);
Consumer_clear(self);
@@ -55,12 +58,17 @@ static void Consumer_dealloc (Consumer *self) {
Py_TYPE(self)->tp_free((PyObject *)self);
}
-static int Consumer_traverse (Consumer *self,
- visitproc visit, void *arg) {
- if (self->on_assign)
- Py_VISIT(self->on_assign);
- if (self->on_revoke)
- Py_VISIT(self->on_revoke);
+static int Consumer_traverse (Handle *self,
+ visitproc visit, void *arg) {
+ if (self->u.Consumer.on_assign)
+ Py_VISIT(self->u.Consumer.on_assign);
+ if (self->u.Consumer.on_revoke)
+ Py_VISIT(self->u.Consumer.on_revoke);
+ if (self->u.Consumer.on_commit)
+ Py_VISIT(self->u.Consumer.on_commit);
+
+ Handle_traverse(self, visit, arg);
+
return 0;
}
@@ -69,7 +77,7 @@ static int Consumer_traverse (Consumer *self,
-static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
+static PyObject *Consumer_subscribe (Handle *self, PyObject *args,
PyObject *kwargs) {
rd_kafka_topic_partition_list_t *topics;
@@ -100,7 +108,7 @@ static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
return NULL;
}
- topics = rd_kafka_topic_partition_list_new(PyList_Size(tlist));
+ topics = rd_kafka_topic_partition_list_new((int)PyList_Size(tlist));
for (pos = 0 ; pos < PyList_Size(tlist) ; pos++) {
PyObject *o = PyList_GetItem(tlist, pos);
PyObject *uo;
@@ -130,29 +138,29 @@ static PyObject *Consumer_subscribe (Consumer *self, PyObject *args,
/*
* Update rebalance callbacks
*/
- if (self->on_assign) {
- Py_DECREF(self->on_assign);
- self->on_assign = NULL;
+ if (self->u.Consumer.on_assign) {
+ Py_DECREF(self->u.Consumer.on_assign);
+ self->u.Consumer.on_assign = NULL;
}
if (on_assign) {
- self->on_assign = on_assign;
- Py_INCREF(self->on_assign);
+ self->u.Consumer.on_assign = on_assign;
+ Py_INCREF(self->u.Consumer.on_assign);
}
- if (self->on_revoke) {
- Py_DECREF(self->on_revoke);
- self->on_revoke = NULL;
+ if (self->u.Consumer.on_revoke) {
+ Py_DECREF(self->u.Consumer.on_revoke);
+ self->u.Consumer.on_revoke = NULL;
}
if (on_revoke) {
- self->on_revoke = on_revoke;
- Py_INCREF(self->on_revoke);
+ self->u.Consumer.on_revoke = on_revoke;
+ Py_INCREF(self->u.Consumer.on_revoke);
}
Py_RETURN_NONE;
}
-static PyObject *Consumer_unsubscribe (Consumer *self,
+static PyObject *Consumer_unsubscribe (Handle *self,
PyObject *ignore) {
rd_kafka_resp_err_t err;
@@ -169,7 +177,7 @@ static PyObject *Consumer_unsubscribe (Consumer *self,
}
-static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
+static PyObject *Consumer_assign (Handle *self, PyObject *tlist) {
rd_kafka_topic_partition_list_t *c_parts;
rd_kafka_resp_err_t err;
@@ -177,7 +185,7 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
if (!(c_parts = py_to_c_parts(tlist)))
return NULL;
- self->rebalance_assigned++;
+ self->u.Consumer.rebalance_assigned++;
err = rd_kafka_assign(self->rk, c_parts);
@@ -185,7 +193,7 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
if (err) {
cfl_PyErr_Format(err,
- "Failed to set assignemnt: %s",
+ "Failed to set assignment: %s",
rd_kafka_err2str(err));
return NULL;
}
@@ -194,11 +202,11 @@ static PyObject *Consumer_assign (Consumer *self, PyObject *tlist) {
}
-static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) {
+static PyObject *Consumer_unassign (Handle *self, PyObject *ignore) {
rd_kafka_resp_err_t err;
- self->rebalance_assigned++;
+ self->u.Consumer.rebalance_assigned++;
err = rd_kafka_assign(self->rk, NULL);
if (err) {
@@ -213,7 +221,7 @@ static PyObject *Consumer_unassign (Consumer *self, PyObject *ignore) {
-static PyObject *Consumer_commit (Consumer *self, PyObject *args,
+static PyObject *Consumer_commit (Handle *self, PyObject *args,
PyObject *kwargs) {
rd_kafka_resp_err_t err;
@@ -281,7 +289,7 @@ static PyObject *Consumer_commit (Consumer *self, PyObject *args,
-static PyObject *Consumer_committed (Consumer *self, PyObject *args,
+static PyObject *Consumer_committed (Handle *self, PyObject *args,
PyObject *kwargs) {
PyObject *plist;
@@ -317,7 +325,7 @@ static PyObject *Consumer_committed (Consumer *self, PyObject *args,
}
-static PyObject *Consumer_position (Consumer *self, PyObject *args,
+static PyObject *Consumer_position (Handle *self, PyObject *args,
PyObject *kwargs) {
PyObject *plist;
@@ -352,26 +360,23 @@ static PyObject *Consumer_position (Consumer *self, PyObject *args,
-static PyObject *Consumer_poll (Consumer *self, PyObject *args,
+static PyObject *Consumer_poll (Handle *self, PyObject *args,
PyObject *kwargs) {
double tmout = -1.0f;
static char *kws[] = { "timeout", NULL };
rd_kafka_message_t *rkm;
PyObject *msgobj;
+ CallState cs;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
return NULL;
- self->callback_crashed = 0;
- self->thread_state = PyEval_SaveThread();
+ CallState_begin(self, &cs);
rkm = rd_kafka_consumer_poll(self->rk, tmout >= 0 ?
(int)(tmout * 1000.0f) : -1);
- PyEval_RestoreThread(self->thread_state);
- self->thread_state = NULL;
-
- if (self->callback_crashed)
+ if (!CallState_end(self, &cs))
return NULL;
if (!rkm)
@@ -384,10 +389,22 @@ static PyObject *Consumer_poll (Consumer *self, PyObject *args,
}
-static PyObject *Consumer_close (Consumer *self, PyObject *ignore) {
- self->thread_state = PyEval_SaveThread();
+static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
+ CallState cs;
+ int raise = 0;
+
+ CallState_begin(self, &cs);
+
rd_kafka_consumer_close(self->rk);
- PyEval_RestoreThread(self->thread_state);
+
+ raise = !CallState_end(self, &cs);
+
+ rd_kafka_destroy(self->rk);
+ self->rk = NULL;
+
+ if (raise)
+ return NULL;
+
Py_RETURN_NONE;
}
@@ -523,7 +540,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
PyTypeObject ConsumerType = {
PyVarObject_HEAD_INIT(NULL, 0)
"cimpl.Consumer", /*tp_name*/
- sizeof(Consumer), /*tp_basicsize*/
+ sizeof(Handle), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)Consumer_dealloc, /*tp_dealloc*/
0, /*tp_print*/
@@ -584,14 +601,17 @@ PyTypeObject ConsumerType = {
static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *c_parts,
void *opaque) {
- Consumer *self = opaque;
+ Handle *self = opaque;
+ CallState *cs;
- PyEval_RestoreThread(self->thread_state);
+ cs = CallState_get(self);
- self->rebalance_assigned = 0;
+ self->u.Consumer.rebalance_assigned = 0;
- if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS && self->on_assign) ||
- (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS && self->on_revoke)) {
+ if ((err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS &&
+ self->u.Consumer.on_assign) ||
+ (err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS &&
+ self->u.Consumer.on_revoke)) {
PyObject *parts;
PyObject *args, *result;
@@ -605,21 +625,22 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
- self->thread_state = PyEval_SaveThread();
- self->callback_crashed++;
+ CallState_crash(cs);
+ CallState_resume(cs);
return;
}
result = PyObject_CallObject(
err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ?
- self->on_assign : self->on_revoke, args);
+ self->u.Consumer.on_assign :
+ self->u.Consumer.on_revoke, args);
Py_DECREF(args);
if (result)
Py_DECREF(result);
else {
- self->callback_crashed++;
+ CallState_crash(cs);
rd_kafka_yield(rk);
}
}
@@ -628,33 +649,37 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
* to synchronize state, if the user did not do this from callback,
* or there was no callback, or the callback failed, then we perform
* that assign() call here instead. */
- if (!self->rebalance_assigned) {
+ if (!self->u.Consumer.rebalance_assigned) {
if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
rd_kafka_assign(rk, c_parts);
else
rd_kafka_assign(rk, NULL);
}
- self->thread_state = PyEval_SaveThread();
+ CallState_resume(cs);
}
static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
rd_kafka_topic_partition_list_t *c_parts,
void *opaque) {
- Consumer *self = opaque;
+ Handle *self = opaque;
PyObject *parts, *k_err, *args, *result;
+ CallState *cs;
- if (!self->on_commit)
+ if (!self->u.Consumer.on_commit)
return;
- PyEval_RestoreThread(self->thread_state);
+ cs = CallState_get(self);
/* Insantiate error object */
k_err = KafkaError_new_or_None(err, NULL);
/* Construct list of TopicPartition based on 'c_parts' */
- parts = c_parts_to_py(c_parts);
+ if (c_parts)
+ parts = c_parts_to_py(c_parts);
+ else
+ parts = PyList_New(0);
args = Py_BuildValue("(OO)", k_err, parts);
@@ -664,39 +689,39 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
- self->thread_state = PyEval_SaveThread();
- self->callback_crashed++;
+ CallState_crash(cs);
+ CallState_resume(cs);
return;
}
- result = PyObject_CallObject(self->on_commit, args);
+ result = PyObject_CallObject(self->u.Consumer.on_commit, args);
Py_DECREF(args);
if (result)
Py_DECREF(result);
else {
- self->callback_crashed++;
+ CallState_crash(cs);
rd_kafka_yield(rk);
}
- self->thread_state = PyEval_SaveThread();
+ CallState_resume(cs);
}
static PyObject *Consumer_new (PyTypeObject *type, PyObject *args,
PyObject *kwargs) {
- Consumer *self;
+ Handle *self;
char errstr[256];
rd_kafka_conf_t *conf;
- self = (Consumer *)ConsumerType.tp_alloc(&ConsumerType, 0);
+ self = (Handle *)ConsumerType.tp_alloc(&ConsumerType, 0);
if (!self)
return NULL;
if (!(conf = common_conf_setup(RD_KAFKA_CONSUMER, self,
- args, kwargs))) {
+ args, kwargs))) {
Py_DECREF(self);
return NULL;
}
diff --git a/confluent_kafka/src/Producer.c b/confluent_kafka/src/Producer.c
index c69ca07..a5b16bd 100644
--- a/confluent_kafka/src/Producer.c
+++ b/confluent_kafka/src/Producer.c
@@ -46,7 +46,7 @@
* Per-message state.
*/
struct Producer_msgstate {
- Producer *self;
+ Handle *self;
PyObject *dr_cb;
PyObject *partitioner_cb;
};
@@ -57,8 +57,8 @@ struct Producer_msgstate {
* Returns NULL if neither dr_cb or partitioner_cb is set.
*/
static __inline struct Producer_msgstate *
-Producer_msgstate_new (Producer *self,
- PyObject *dr_cb, PyObject *partitioner_cb) {
+Producer_msgstate_new (Handle *self,
+ PyObject *dr_cb, PyObject *partitioner_cb) {
struct Producer_msgstate *msgstate;
if (!dr_cb && !partitioner_cb)
@@ -88,19 +88,22 @@ Producer_msgstate_destroy (struct Producer_msgstate *msgstate) {
}
-static int Producer_clear (Producer *self) {
- if (self->default_dr_cb) {
- Py_DECREF(self->default_dr_cb);
- self->default_dr_cb = NULL;
+static int Producer_clear (Handle *self) {
+ if (self->u.Producer.default_dr_cb) {
+ Py_DECREF(self->u.Producer.default_dr_cb);
+ self->u.Producer.default_dr_cb = NULL;
}
- if (self->partitioner_cb) {
- Py_DECREF(self->partitioner_cb);
- self->partitioner_cb = NULL;
+ if (self->u.Producer.partitioner_cb) {
+ Py_DECREF(self->u.Producer.partitioner_cb);
+ self->u.Producer.partitioner_cb = NULL;
}
+
+ Handle_clear(self);
+
return 0;
}
-static void Producer_dealloc (Producer *self) {
+static void Producer_dealloc (Handle *self) {
PyObject_GC_UnTrack(self);
Producer_clear(self);
@@ -111,12 +114,15 @@ static void Producer_dealloc (Producer *self) {
Py_TYPE(self)->tp_free((PyObject *)self);
}
-static int Producer_traverse (Producer *self,
- visitproc visit, void *arg) {
- if (self->default_dr_cb)
- Py_VISIT(self->default_dr_cb);
- if (self->partitioner_cb)
- Py_VISIT(self->partitioner_cb);
+static int Producer_traverse (Handle *self,
+ visitproc visit, void *arg) {
+ if (self->u.Producer.default_dr_cb)
+ Py_VISIT(self->u.Producer.default_dr_cb);
+ if (self->u.Producer.partitioner_cb)
+ Py_VISIT(self->u.Producer.partitioner_cb);
+
+ Handle_traverse(self, visit, arg);
+
return 0;
}
@@ -124,7 +130,8 @@ static int Producer_traverse (Producer *self,
static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
void *opaque) {
struct Producer_msgstate *msgstate = rkm->_private;
- Producer *self = opaque;
+ Handle *self = opaque;
+ CallState *cs;
PyObject *args;
PyObject *result;
PyObject *msgobj;
@@ -132,7 +139,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
if (!msgstate)
return;
- PyEval_RestoreThread(self->thread_state);
+ cs = CallState_get(self);
if (!msgstate->dr_cb) {
/* No callback defined */
@@ -150,7 +157,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
- self->callback_crashed++;
+ CallState_crash(cs);
goto done;
}
@@ -160,13 +167,13 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
if (result)
Py_DECREF(result);
else {
- self->callback_crashed++;
+ CallState_crash(cs);
rd_kafka_yield(rk);
}
done:
Producer_msgstate_destroy(msgstate);
- self->thread_state = PyEval_SaveThread();
+ CallState_resume(cs);
}
@@ -178,7 +185,7 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
size_t keylen,
int32_t partition_cnt,
void *rkt_opaque, void *msg_opaque) {
- Producer *self = rkt_opaque;
+ Handle *self = rkt_opaque;
struct Producer_msgstate *msgstate = msg_opaque;
PyGILState_STATE gstate;
PyObject *result;
@@ -188,9 +195,9 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
if (!msgstate) {
/* Fall back on default C partitioner if neither a per-msg
* partitioner nor a default Python partitioner is available */
- return self->c_partitioner_cb(rkt, keydata, keylen,
- partition_cnt, rkt_opaque,
- msg_opaque);
+ return self->u.Producer.c_partitioner_cb(rkt, keydata, keylen,
+ partition_cnt,
+ rkt_opaque, msg_opaque);
}
gstate = PyGILState_Ensure();
@@ -198,9 +205,11 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
if (!msgstate->partitioner_cb) {
/* Fall back on default C partitioner if neither a per-msg
* partitioner nor a default Python partitioner is available */
- r = msgstate->self->c_partitioner_cb(rkt, keydata, keylen,
- partition_cnt, rkt_opaque,
- msg_opaque);
+ r = msgstate->self->u.Producer.c_partitioner_cb(rkt,
+ keydata, keylen,
+ partition_cnt,
+ rkt_opaque,
+ msg_opaque);
goto done;
}
@@ -210,7 +219,6 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
if (!args) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
"Unable to build callback args");
- printf("Failed to build args\n");
goto done;
}
@@ -219,7 +227,7 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
Py_DECREF(args);
if (result) {
- r = PyLong_AsLong(result);
+ r = (int32_t)PyLong_AsLong(result);
if (PyErr_Occurred())
printf("FIXME: partition_cb returned wrong type "
"(expected long), how to propagate?\n");
@@ -237,7 +245,7 @@ int32_t Producer_partitioner_cb (const rd_kafka_topic_t *rkt,
-static PyObject *Producer_produce (Producer *self, PyObject *args,
+static PyObject *Producer_produce (Handle *self, PyObject *args,
PyObject *kwargs) {
const char *topic, *value = NULL, *key = NULL;
int value_len = 0, key_len = 0;
@@ -271,10 +279,10 @@ static PyObject *Producer_produce (Producer *self, PyObject *args,
return NULL;
}
- if (!dr_cb)
- dr_cb = self->default_dr_cb;
- if (!partitioner_cb)
- partitioner_cb = self->partitioner_cb;
+ if (!dr_cb || dr_cb == Py_None)
+ dr_cb = self->u.Producer.default_dr_cb;
+ if (!partitioner_cb || partitioner_cb == Py_None)
+ partitioner_cb = self->u.Producer.partitioner_cb;
/* Create msgstate if necessary, may return NULL if no callbacks
* are wanted. */
@@ -311,28 +319,23 @@ static PyObject *Producer_produce (Producer *self, PyObject *args,
* @returns -1 if callback crashed (or poll() failed), else the number
* of events served.
*/
-static int Producer_poll0 (Producer *self, int tmout) {
+static int Producer_poll0 (Handle *self, int tmout) {
int r;
+ CallState cs;
- self->callback_crashed = 0;
- self->thread_state = PyEval_SaveThread();
+ CallState_begin(self, &cs);
r = rd_kafka_poll(self->rk, tmout);
- PyEval_RestoreThread(self->thread_state);
- self->thread_state = NULL;
-
- if (PyErr_CheckSignals() == -1)
- return -1;
-
- if (self->callback_crashed)
+ if (!CallState_end(self, &cs)) {
return -1;
+ }
return r;
}
-static PyObject *Producer_poll (Producer *self, PyObject *args,
+static PyObject *Producer_poll (Handle *self, PyObject *args,
PyObject *kwargs) {
double tmout;
int r;
@@ -349,7 +352,7 @@ static PyObject *Producer_poll (Producer *self, PyObject *args,
}
-static PyObject *Producer_flush (Producer *self, PyObject *ignore) {
+static PyObject *Producer_flush (Handle *self, PyObject *ignore) {
while (rd_kafka_outq_len(self->rk) > 0) {
if (Producer_poll0(self, 500) == -1)
return NULL;
@@ -367,7 +370,7 @@ static PyMethodDef Producer_methods[] = {
" This is an asynchronous operation, an application may use the "
"``callback`` (alias ``on_delivery``) argument to pass a function "
"(or lambda) that will be called from :py:func:`poll()` when the "
- "message has been succesfully delivered or permanently fails delivery.\n"
+ "message has been successfully delivered or permanently fails delivery.\n"
"\n"
" :param str topic: Topic to produce message to\n"
" :param str|bytes value: Message payload\n"
@@ -375,7 +378,7 @@ static PyMethodDef Producer_methods[] = {
" :param int partition: Partition to produce to, elses uses the "
"configured partitioner.\n"
" :param func on_delivery(err,msg): Delivery report callback to call "
- "(from :py:func:`poll()` or :py:func:`flush()`) on succesful or "
+ "(from :py:func:`poll()` or :py:func:`flush()`) on successful or "
"failed delivery\n"
"\n"
" :rtype: None\n"
@@ -415,7 +418,7 @@ static PyMethodDef Producer_methods[] = {
};
-static Py_ssize_t Producer__len__ (Producer *self) {
+static Py_ssize_t Producer__len__ (Handle *self) {
return rd_kafka_outq_len(self->rk);
}
@@ -431,7 +434,7 @@ static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
PyTypeObject ProducerType = {
PyVarObject_HEAD_INIT(NULL, 0)
"cimpl.Producer", /*tp_name*/
- sizeof(Producer), /*tp_basicsize*/
+ sizeof(Handle), /*tp_basicsize*/
0, /*tp_itemsize*/
(destructor)Producer_dealloc, /*tp_dealloc*/
0, /*tp_print*/
@@ -484,11 +487,11 @@ PyTypeObject ProducerType = {
static PyObject *Producer_new (PyTypeObject *type, PyObject *args,
PyObject *kwargs) {
- Producer *self;
+ Handle *self;
char errstr[256];
rd_kafka_conf_t *conf;
- self = (Producer *)ProducerType.tp_alloc(&ProducerType, 0);
+ self = (Handle *)ProducerType.tp_alloc(&ProducerType, 0);
if (!self)
return NULL;
diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c
index e85ef20..0ab967b 100644
--- a/confluent_kafka/src/confluent_kafka.c
+++ b/confluent_kafka/src/confluent_kafka.c
@@ -133,7 +133,7 @@ static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2,
if (Py_TYPE(o2) == &KafkaErrorType)
code2 = ((KafkaError *)o2)->code;
else
- code2 = PyLong_AsLong(o2);
+ code2 = (int)PyLong_AsLong(o2);
switch (op)
{
@@ -164,7 +164,7 @@ static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2,
Py_INCREF(result);
return result;
}
-
+
static PyTypeObject KafkaErrorType = {
PyVarObject_HEAD_INIT(NULL, 0)
@@ -320,7 +320,7 @@ static PyObject *Message_partition (Message *self, PyObject *ignore) {
static PyObject *Message_offset (Message *self, PyObject *ignore) {
if (self->offset >= 0)
- return PyLong_FromLong(self->offset);
+ return PyLong_FromLongLong(self->offset);
else
Py_RETURN_NONE;
}
@@ -560,7 +560,7 @@ static PyObject *TopicPartition_new (PyTypeObject *type, PyObject *args,
return TopicPartition_new0(topic, partition, offset, 0);
}
-
+
static int TopicPartition_traverse (TopicPartition *self,
@@ -758,7 +758,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
}
return parts;
-
+
}
/**
@@ -776,7 +776,7 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
return NULL;
}
- c_parts = rd_kafka_topic_partition_list_new(PyList_Size(plist));
+ c_parts = rd_kafka_topic_partition_list_new((int)PyList_Size(plist));
for (i = 0 ; i < PyList_Size(plist) ; i++) {
TopicPartition *tp = (TopicPartition *)
@@ -801,6 +801,40 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
}
+/****************************************************************************
+ *
+ *
+ * Common callbacks
+ *
+ *
+ *
+ *
+ ****************************************************************************/
+static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
+ Handle *h = opaque;
+ PyObject *eo, *result;
+ CallState *cs;
+
+ cs = CallState_get(h);
+ if (!h->error_cb) {
+ /* No callback defined */
+ goto done;
+ }
+
+ eo = KafkaError_new0(err, "%s", reason);
+ result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
+ Py_DECREF(eo);
+
+ if (result) {
+ Py_DECREF(result);
+ } else {
+ CallState_crash(cs);
+ rd_kafka_yield(h->rk);
+ }
+
+ done:
+ CallState_resume(cs);
+}
/****************************************************************************
@@ -814,6 +848,30 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
****************************************************************************/
+
+/**
+ * Clear Python object references in Handle
+ */
+void Handle_clear (Handle *h) {
+ if (h->error_cb) {
+ Py_DECREF(h->error_cb);
+ }
+
+ PyThread_delete_key(h->tlskey);
+}
+
+/**
+ * GC traversal for Python object references
+ */
+int Handle_traverse (Handle *h, visitproc visit, void *arg) {
+ if (h->error_cb)
+ Py_VISIT(h->error_cb);
+
+ return 0;
+}
+
+
+
/**
* Populate topic conf from provided dict.
*
@@ -879,7 +937,7 @@ static int populate_topic_conf (rd_kafka_topic_conf_t *tconf, const char *what,
*
* @returns 1 if handled, 0 if unknown, or -1 on failure (exception raised).
*/
-static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf,
+static int producer_conf_set_special (Handle *self, rd_kafka_conf_t *conf,
rd_kafka_topic_conf_t *tconf,
const char *name, PyObject *valobj) {
PyObject *vs;
@@ -894,8 +952,8 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf,
return -1;
}
- self->default_dr_cb = valobj;
- Py_INCREF(self->default_dr_cb);
+ self->u.Producer.default_dr_cb = valobj;
+ Py_INCREF(self->u.Producer.default_dr_cb);
return 1;
@@ -947,11 +1005,11 @@ static int producer_conf_set_special (Producer *self, rd_kafka_conf_t *conf,
return -1;
}
- if (self->partitioner_cb)
- Py_DECREF(self->partitioner_cb);
+ if (self->u.Producer.partitioner_cb)
+ Py_DECREF(self->u.Producer.partitioner_cb);
- self->partitioner_cb = valobj;
- Py_INCREF(self->partitioner_cb);
... 664 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-confluent-kafka.git
More information about the Python-modules-commits
mailing list