File Coverage

Rdkafka.xs
Criterion Covered Total %
statement 27 120 22.5
branch 10 78 12.8
condition n/a
subroutine n/a
pod n/a
total 37 198 18.6


line stmt bran cond sub pod time code
1             /* vim: set expandtab sts=4: */
2             #define PERL_NO_GET_CONTEXT
3             #include
4             #include
5             #include
6              
7             #include "ppport.h"
8             #include "rdkafkaxs.h"
9              
10             MODULE = Kafka::Librd PACKAGE = Kafka::Librd PREFIX = krd_
11             PROTOTYPES: DISABLE
12              
13             INCLUDE: const_xs.inc
14              
15             int
16             krd_rd_kafka_version()
17             CODE:
18 1           RETVAL = rd_kafka_version();
19             OUTPUT:
20             RETVAL
21              
22             const char*
23             krd_rd_kafka_version_str()
24             CODE:
25 1           RETVAL = rd_kafka_version_str();
26             OUTPUT:
27             RETVAL
28              
29             rdkafka_t*
30             krd__new(type, params)
31             int type
32             HV* params
33             PREINIT:
34             rd_kafka_conf_t* conf;
35             rd_kafka_t* rk;
36             char errstr[1024];
37             CODE:
38 5           Newx(RETVAL, 1, rdkafka_t);
39 5           conf = krd_parse_config(aTHX_ RETVAL, params);
40 2           rk = rd_kafka_new(type, conf, errstr, 1024);
41 2 50         if (rk == NULL) {
42 0           croak("%s", errstr);
43             }
44 2           RETVAL->rk = rk;
45 2           RETVAL->thx = (IV)PERL_GET_THX;
46             OUTPUT:
47             RETVAL
48              
49             int
50             krd_brokers_add(rdk, brokerlist)
51             rdkafka_t* rdk
52             char* brokerlist
53             CODE:
54 0           RETVAL = rd_kafka_brokers_add(rdk->rk, brokerlist);
55             OUTPUT:
56             RETVAL
57              
58             int
59             krd_subscribe(rdk, topics)
60             rdkafka_t* rdk
61             AV* topics
62             PREINIT:
63             STRLEN strl;
64             int i, len;
65             rd_kafka_topic_partition_list_t* topic_list;
66             char* topic;
67             SV** topic_sv;
68             CODE:
69 0           len = av_len(topics) + 1;
70 0           topic_list = rd_kafka_topic_partition_list_new(len);
71 0 0         for (i=0; i < len; i++) {
72 0           topic_sv = av_fetch(topics, i, 0);
73 0 0         if (topic_sv != NULL) {
74 0 0         topic = SvPV(*topic_sv, strl);
75 0           rd_kafka_topic_partition_list_add(topic_list, topic, -1);
76             }
77             }
78 0           RETVAL = rd_kafka_subscribe(rdk->rk, topic_list);
79 0           rd_kafka_topic_partition_list_destroy(topic_list);
80             OUTPUT:
81             RETVAL
82              
83             int
84             krd_unsubscribe(rdk)
85             rdkafka_t* rdk
86             CODE:
87 0           RETVAL = rd_kafka_unsubscribe(rdk->rk);
88             OUTPUT:
89             RETVAL
90              
91             SV*
92             krd_subscription(rdk)
93             rdkafka_t* rdk
94             PREINIT:
95             rd_kafka_topic_partition_list_t* tpar;
96             rd_kafka_resp_err_t err;
97             AV* tp;
98             CODE:
99 0           err = rd_kafka_subscription(rdk->rk, &tpar);
100 0 0         if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
101 0           croak("Error retrieving subscriptions: %s", rd_kafka_err2str(err));
102             }
103 0           tp = krd_expand_topic_partition_list(aTHX_ tpar);
104 0           rd_kafka_topic_partition_list_destroy(tpar);
105 0           RETVAL = newRV_noinc((SV*)tp);
106             OUTPUT:
107             RETVAL
108              
109             int
110             krd_assign(rdk, tplistsv = NULL)
111             rdkafka_t* rdk
112             SV* tplistsv
113             PREINIT:
114             AV* tplist;
115 0           rd_kafka_topic_partition_list_t* tpar = NULL;
116             CODE:
117 0 0         if (tplistsv != NULL && SvOK(tplistsv)) {
    0          
    0          
    0          
118 0 0         if (!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
    0          
119 0           croak("first argument must be an array reference");
120             }
121 0           tplist = (AV*)SvRV(tplistsv);
122 0           tpar = krd_parse_topic_partition_list(aTHX_ tplist);
123             }
124 0           RETVAL = rd_kafka_assign(rdk->rk, tpar);
125 0 0         if (tpar != NULL)
126 0           rd_kafka_topic_partition_list_destroy(tpar);
127             OUTPUT:
128             RETVAL
129              
130             SV*
131             krd_assignment(rdk)
132             rdkafka_t *rdk
133             PREINIT:
134             rd_kafka_topic_partition_list_t* tpar;
135             rd_kafka_resp_err_t err;
136             AV* tp;
137             CODE:
138 0           err = rd_kafka_assignment(rdk->rk, &tpar);
139 0 0         if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
140 0           croak("Error retrieving assignments: %s", rd_kafka_err2str(err));
141             }
142 0           tp = krd_expand_topic_partition_list(aTHX_ tpar);
143 0           rd_kafka_topic_partition_list_destroy(tpar);
144 0           RETVAL = newRV_noinc((SV*)tp);
145             OUTPUT:
146             RETVAL
147              
148             int
149             krd_commit(rdk, tplistsv = NULL, async = 0)
150             rdkafka_t* rdk
151             SV* tplistsv
152             int async
153             PREINIT:
154             AV* tplist;
155 0           rd_kafka_topic_partition_list_t* tpar = NULL;
156             CODE:
157 0 0         if (tplistsv != NULL && SvOK(tplistsv)) {
    0          
    0          
    0          
158 0 0         if(!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
    0          
159 0           croak("first argument must be an array reference");
160             }
161 0           tplist = (AV*)SvRV(tplistsv);
162 0           tpar = krd_parse_topic_partition_list(aTHX_ tplist);
163             }
164 0           RETVAL = rd_kafka_commit(rdk->rk, tpar, async);
165 0 0         if (tpar != NULL)
166 0           rd_kafka_topic_partition_list_destroy(tpar);
167             OUTPUT:
168             RETVAL
169              
170             int
171             krd_commit_message(rdk, msg, async = 0)
172             rdkafka_t* rdk
173             rd_kafka_message_t* msg
174             int async
175             CODE:
176 0           RETVAL = rd_kafka_commit_message(rdk->rk, msg, async);
177             OUTPUT:
178             RETVAL
179              
180             SV*
181             krd_committed(rdk, tplistsv, timeout_ms)
182             rdkafka_t* rdk
183             SV* tplistsv
184             int timeout_ms
185             PREINIT:
186             AV* tplist;
187 0           rd_kafka_topic_partition_list_t* tpar = NULL;
188             rd_kafka_resp_err_t err;
189             AV* tp;
190             CODE:
191 0 0         if (!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
    0          
192 0           croak("first argument must be an array reference");
193             }
194 0           tplist = (AV*)SvRV(tplistsv);
195 0           tpar = krd_parse_topic_partition_list(aTHX_ tplist);
196 0           err = rd_kafka_committed(rdk->rk, tpar, timeout_ms);
197 0 0         if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
198 0           rd_kafka_topic_partition_list_destroy(tpar);
199 0           croak("Error retrieving commited offsets: %s", rd_kafka_err2str(err));
200             }
201 0           tp = krd_expand_topic_partition_list(aTHX_ tpar);
202 0           rd_kafka_topic_partition_list_destroy(tpar);
203 0           RETVAL = newRV_noinc((SV*)tp);
204             OUTPUT:
205             RETVAL
206              
207             SV*
208             krd_position(rdk, tplistsv)
209             rdkafka_t* rdk
210             SV* tplistsv
211             PREINIT:
212             AV* tplist;
213 0           rd_kafka_topic_partition_list_t* tpar = NULL;
214             rd_kafka_resp_err_t err;
215             AV* tp;
216             CODE:
217 0 0         if (!SvROK(tplistsv) || strncmp(sv_reftype(SvRV(tplistsv), 0), "ARRAY", 6)) {
    0          
218 0           croak("first argument must be an array reference");
219             }
220 0           tplist = (AV*)SvRV(tplistsv);
221 0           tpar = krd_parse_topic_partition_list(aTHX_ tplist);
222 0           err = rd_kafka_position(rdk->rk, tpar);
223 0 0         if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
224 0           rd_kafka_topic_partition_list_destroy(tpar);
225 0           croak("Error retrieving positions: %s", rd_kafka_err2str(err));
226             }
227 0           tp = krd_expand_topic_partition_list(aTHX_ tpar);
228 0           rd_kafka_topic_partition_list_destroy(tpar);
229 0           RETVAL = newRV_noinc((SV*)tp);
230             OUTPUT:
231             RETVAL
232              
233             rd_kafka_message_t*
234             krd_consumer_poll(rdk, timeout_ms)
235             rdkafka_t* rdk
236             int timeout_ms
237             CODE:
238 0           RETVAL = rd_kafka_consumer_poll(rdk->rk, timeout_ms);
239             OUTPUT:
240             RETVAL
241              
242             int
243             krd_consumer_close(rdk)
244             rdkafka_t* rdk
245             CODE:
246 0           RETVAL = rd_kafka_consumer_close(rdk->rk);
247             OUTPUT:
248             RETVAL
249              
250             rd_kafka_topic_t*
251             krd_topic(rdk, topic, params)
252             rdkafka_t* rdk
253             char *topic
254             HV* params
255             PREINIT:
256             rd_kafka_topic_conf_t* tcon;
257             char errstr[1024];
258             CODE:
259 2           tcon = krd_parse_topic_config(aTHX_ params, errstr);
260 2 50         if (tcon == NULL)
261 0           croak("Couldn't parse topic config: %s", errstr);
262 2           RETVAL = rd_kafka_topic_new(rdk->rk, topic, tcon);
263 2           tcon = NULL;
264             OUTPUT:
265             RETVAL
266              
267             int
268             krd_poll(rdk, timeout_ms)
269             rdkafka_t* rdk
270             int timeout_ms
271             CODE:
272 0           RETVAL = rd_kafka_poll(rdk->rk, timeout_ms);
273             OUTPUT:
274             RETVAL
275              
276             int
277             krd_outq_len(rdk)
278             rdkafka_t* rdk
279             CODE:
280 0           RETVAL = rd_kafka_outq_len(rdk->rk);
281             OUTPUT:
282             RETVAL
283              
284             void
285             krd_flush(rdk, timeout_ms)
286             rdkafka_t* rdk
287             int timeout_ms
288             CODE:
289 0           rd_kafka_flush(rdk->rk, timeout_ms);
290              
291             void
292             krd_DESTROY(rdk)
293             rdkafka_t* rdk
294             CODE:
295 2 50         if (rdk->thx == (IV)PERL_GET_THX) {
296 2           Safefree(rdk);
297             }
298              
299             void
300             krd_destroy(rdk)
301             rdkafka_t* rdk
302             CODE:
303 0           rd_kafka_destroy(rdk->rk);
304              
305             void
306             krd_dump(rdk)
307             rdkafka_t* rdk
308             CODE:
309 0           rd_kafka_dump(stdout, rdk->rk);
310              
311             int
312             krd_rd_kafka_wait_destroyed(timeout_ms)
313             int timeout_ms
314             CODE:
315 0           RETVAL = rd_kafka_wait_destroyed(timeout_ms);
316             OUTPUT:
317             RETVAL
318              
319             MODULE = Kafka::Librd PACKAGE = Kafka::Librd::Topic PREFIX = krdt_
320             PROTOTYPES: DISABLE
321              
322             int
323             krdt_produce(rkt, partition, msgflags, payload, key)
324             rd_kafka_topic_t* rkt
325             int partition
326             int msgflags
327             SV* payload
328             SV* key
329             PREINIT:
330             STRLEN plen, klen;
331             char *plptr, *keyptr;
332             CODE:
333 2 50         plptr = SvPVbyte(payload, plen);
334 2 50         if (SvOK(key)) {
    0          
    0          
335 2 50         keyptr = SvPVbyte(key, klen);
336             } else {
337 0           keyptr = NULL;
338 0           klen = 0;
339             }
340 2           RETVAL = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY | msgflags, plptr, plen, keyptr, klen, NULL);
341             OUTPUT:
342             RETVAL
343              
344             void
345             krdt_DESTROY(rkt)
346             rd_kafka_topic_t* rkt
347             CODE:
348 1           rd_kafka_topic_destroy(rkt);
349              
350             void
351             krdt_destroy(rkt)
352             rd_kafka_topic_t* rkt
353             CODE:
354 0           rd_kafka_topic_destroy(rkt);
355              
356             MODULE = Kafka::Librd PACKAGE = Kafka::Librd::Message PREFIX = krdm_
357             PROTOTYPES: DISABLE
358              
359             int
360             krdm_err(msg)
361             rd_kafka_message_t* msg
362             CODE:
363 0           RETVAL = msg->err;
364             OUTPUT:
365             RETVAL
366              
367             int
368             krdm_partition(msg)
369             rd_kafka_message_t* msg
370             CODE:
371 0           RETVAL = msg->partition;
372             OUTPUT:
373             RETVAL
374              
375             const char*
376             krdm_topic(msg)
377             rd_kafka_message_t* msg
378             CODE:
379 0           RETVAL = rd_kafka_topic_name(msg->rkt);
380             OUTPUT:
381             RETVAL
382              
383             SV*
384             krdm_payload(msg)
385             rd_kafka_message_t* msg
386             CODE:
387 0           RETVAL = newSVpvn(msg->payload, msg->len);
388             OUTPUT:
389             RETVAL
390              
391             SV*
392             krdm_key(msg)
393             rd_kafka_message_t* msg
394             CODE:
395 0 0         if (msg->err == 0) {
396 0           RETVAL = newSVpvn(msg->key, msg->key_len);
397             } else {
398 0           RETVAL = &PL_sv_undef;
399             }
400             OUTPUT:
401             RETVAL
402              
403             long
404             krdm_offset(msg)
405             rd_kafka_message_t* msg
406             CODE:
407             /* that will truncate offset if perl doesn't support 64bit ints */
408 0           RETVAL = msg->offset;
409             OUTPUT:
410             RETVAL
411              
412             long
413             krdm_timestamp(msg,...)
414             rd_kafka_message_t* msg
415             CODE:
416             rd_kafka_timestamp_type_t tstype;
417 0           RETVAL = rd_kafka_message_timestamp(msg, &tstype);
418 0 0         if (items > 1) {
419 0 0         if (!SvROK(ST(1)) || strncmp(sv_reftype(SvRV(ST(1)), 0), "SCALAR", 7)) {
    0          
420 0           croak("second argument tstype must be a scalar reference");
421             }
422 0           sv_setiv(SvRV(ST(1)), tstype);
423             }
424             OUTPUT:
425             RETVAL
426              
427             void
428             krdm_DESTROY(msg)
429             rd_kafka_message_t* msg
430             CODE:
431 0           rd_kafka_message_destroy(msg);
432              
433             MODULE = Kafka::Librd PACKAGE = Kafka::Librd::Error PREFIX = krde_
434             PROTOTYPES: DISABLE
435              
436             HV*
437             krde_rd_kafka_get_err_descs()
438             PREINIT:
439             const struct rd_kafka_err_desc* descs;
440             size_t cnt;
441             int i;
442             CODE:
443 6           rd_kafka_get_err_descs(&descs, &cnt);
444 6           RETVAL = newHV();
445 1698 100         for (i = 0; i < cnt; i++) {
446 1692 100         if (descs[i].name != NULL) {
447 834           hv_store(RETVAL, descs[i].name, strnlen(descs[i].name, 1024), newSViv(descs[i].code), 0);
448             }
449             }
450             OUTPUT:
451             RETVAL
452              
453             const char*
454             krde_to_string(code)
455             int code
456             CODE:
457 139           RETVAL = rd_kafka_err2str(code);
458             OUTPUT:
459             RETVAL
460              
461             const char*
462             krde_to_name(code)
463             int code
464             CODE:
465 137           RETVAL = rd_kafka_err2name(code);
466             OUTPUT:
467             RETVAL
468              
469             int
470             krde_last_error()
471             CODE:
472 4           RETVAL = rd_kafka_last_error();
473             OUTPUT:
474             RETVAL
475              
476