File Coverage

rdkafkaxs.c
Criterion Covered Total %
statement 33 83 39.7
branch 21 66 31.8
condition n/a
subroutine n/a
pod n/a
total 54 149 36.2


line stmt bran cond sub pod time code
1             #include "rdkafkaxs.h"
2             #include "ppport.h"
3              
4             #define ERRSTR_SIZE 1024
5              
6             rd_kafka_topic_partition_list_t*
7 0           krd_parse_topic_partition_list(pTHX_ AV* tplist) {
8             char errstr[ERRSTR_SIZE];
9             rd_kafka_topic_partition_list_t* tpar;
10              
11 0           int tplen = av_len(tplist)+1;
12 0           tpar = rd_kafka_topic_partition_list_new(tplen);
13             int i;
14 0 0         for (i=0; i
15 0           SV** elemr = av_fetch(tplist, i, 0);
16 0 0         if (elemr == NULL)
17 0           continue;
18 0           SV* conf = *elemr;
19 0 0         if (!SvROK(conf) || strncmp(sv_reftype(SvRV(conf), 0), "HASH", 5) != 0) {
    0          
20 0           strncpy(errstr, "elements of topic partition list expected to be hashes", ERRSTR_SIZE);
21 0           goto CROAK;
22             }
23 0           HV* confhv = (HV*)SvRV(conf);
24 0           SV** topicsv = hv_fetch(confhv, "topic", 5, 0);
25 0 0         if (topicsv == NULL) {
26 0           snprintf(errstr, ERRSTR_SIZE, "topic is not specified for element %d of the list", i);
27 0           goto CROAK;
28             }
29             STRLEN len;
30 0 0         char* topic = SvPV(*topicsv, len);
31 0           SV** partitionsv = hv_fetch(confhv, "partition", 9, 0);
32 0 0         if (partitionsv == NULL) {
33 0           snprintf(errstr, ERRSTR_SIZE, "partition is not specified for element %d of the list", i);
34 0           goto CROAK;
35             }
36 0 0         int32_t partition = SvIV(*partitionsv);
37 0           rd_kafka_topic_partition_t* tp = rd_kafka_topic_partition_list_add(tpar, topic, partition);
38 0           hv_iterinit(confhv);
39             HE* he;
40 0 0         while ((he = hv_iternext(confhv)) != NULL) {
41 0 0         char* key = HePV(he, len);
    0          
42 0           SV* val = HeVAL(he);
43 0 0         if (strncmp(key, "topic", 6) == 0 || strncmp(key, "partition", 10) == 0) {
    0          
44             /* this we already handled */
45             ;
46 0 0         } else if (strncmp(key, "offset", 7) == 0) {
47 0 0         tp->offset = SvIV(val);
48 0 0         } else if (strncmp(key, "metadata", 9) == 0) {
49 0 0         tp->metadata = SvPV(val, len);
50 0           tp->metadata_size = len;
51             } else {
52 0           snprintf(errstr, ERRSTR_SIZE, "unknown option %s for element %d of the list", key, i);
53 0           goto CROAK;
54             }
55             }
56             }
57 0           return tpar;
58              
59             CROAK:
60 0           rd_kafka_topic_partition_list_destroy(tpar);
61 0           croak("%s", errstr);
62             return NULL;
63             }
64              
65 0           AV* krd_expand_topic_partition_list(pTHX_ rd_kafka_topic_partition_list_t* tpar) {
66 0           AV* tplist = newAV();
67             int i;
68 0 0         for (i = 0; i < tpar->cnt; i++) {
69 0           rd_kafka_topic_partition_t* elem = &(tpar->elems[i]);
70 0           HV* tp = newHV();
71 0           hv_stores(tp, "topic", newSVpv(elem->topic, 0));
72 0           hv_stores(tp, "partition", newSViv(elem->partition));
73 0           hv_stores(tp, "offset", newSViv(elem->offset));
74 0 0         if(elem->metadata_size > 0) {
75 0           hv_stores(tp, "metadata", newSVpvn(elem->metadata, elem->metadata_size));
76             }
77 0           av_push(tplist, newRV_noinc((SV*)tp));
78             }
79 0           return tplist;
80             }
81              
82 5           rd_kafka_conf_t* krd_parse_config(pTHX_ rdkafka_t *krd, HV* params) {
83             char errstr[ERRSTR_SIZE];
84             rd_kafka_conf_t* krdconf;
85             rd_kafka_conf_res_t res;
86             HE *he;
87              
88 5           krdconf = rd_kafka_conf_new();
89 5           rd_kafka_conf_set_opaque(krdconf, (void *)krd);
90 5           hv_iterinit(params);
91 8 100         while ((he = hv_iternext(params)) != NULL) {
92             STRLEN len;
93 6 50         char* key = HePV(he, len);
    0          
94 6           SV* val = HeVAL(he);
95 6 100         if (strncmp(key, "default_topic_config", len) == 0) {
96 3 50         if (!SvROK(val) || strncmp(sv_reftype(SvRV(val), 0), "HASH", 5) != 0) {
    100          
97 1           strncpy(errstr, "default_topic_config must be a hash reference", ERRSTR_SIZE);
98 3           goto CROAK;
99             }
100 2           rd_kafka_topic_conf_t* topconf = krd_parse_topic_config(aTHX_ (HV*)SvRV(val), errstr);
101 2 100         if (topconf == NULL) goto CROAK;
102 1           rd_kafka_conf_set_default_topic_conf(krdconf, topconf);
103             } else {
104             /* set named configuration property */
105 3 100         char *strval = SvPV(val, len);
106 3           res = rd_kafka_conf_set(
107             krdconf,
108             key,
109             strval,
110             errstr,
111             ERRSTR_SIZE);
112 3 100         if (res != RD_KAFKA_CONF_OK)
113 4           goto CROAK;
114             }
115             }
116              
117 2           return krdconf;
118              
119             CROAK:
120 3           rd_kafka_conf_destroy(krdconf);
121 3           croak("%s", errstr);
122             return NULL;
123             }
124              
125 4           rd_kafka_topic_conf_t* krd_parse_topic_config(pTHX_ HV *params, char* errstr) {
126 4           rd_kafka_topic_conf_t* topconf = rd_kafka_topic_conf_new();
127             rd_kafka_conf_res_t res;
128             HE *he;
129              
130 4           hv_iterinit(params);
131 5 100         while ((he = hv_iternext(params)) != NULL) {
132             STRLEN len;
133 2 50         char* key = HePV(he, len);
    0          
134 2           SV* val = HeVAL(he);
135 2 100         char *strval = SvPV(val, len);
136 2           res = rd_kafka_topic_conf_set(
137             topconf,
138             key,
139             strval,
140             errstr,
141             ERRSTR_SIZE);
142 2 100         if (res != RD_KAFKA_CONF_OK) {
143 1           rd_kafka_topic_conf_destroy(topconf);
144 2           return NULL;
145             }
146             }
147              
148 3           return topconf;
149             }