File Coverage

lib/EV/Hiredis.xs
Criterion Covered Total %
statement 23 173 13.2
branch 14 98 14.2
condition n/a
subroutine n/a
pod n/a
total 37 271 13.6


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7              
8             #include "hiredis.h"
9             #include "async.h"
10             #include "libev_adapter.h"
11             #include "ngx-queue.h"
12              
13             typedef struct ev_hiredis_s ev_hiredis_t;
14             typedef struct ev_hiredis_cb_s ev_hiredis_cb_t;
15              
16             typedef ev_hiredis_t* EV__Hiredis;
17             typedef struct ev_loop* EV__Loop;
18              
19             struct ev_hiredis_s {
20             struct ev_loop* loop;
21             redisAsyncContext* ac;
22             SV* error_handler;
23             SV* connect_handler;
24             ngx_queue_t cb_queue; /* for long term callbacks such as subscribe */
25             };
26              
27             struct ev_hiredis_cb_s {
28             SV* cb;
29             ngx_queue_t queue;
30             int persist;
31             };
32              
33 0           static void emit_error(EV__Hiredis self, SV* error) {
34 0 0         if (NULL == self->error_handler) return;
35              
36 0           dSP;
37              
38 0           ENTER;
39 0           SAVETMPS;
40              
41 0 0         PUSHMARK(SP);
42 0 0         XPUSHs(error);
43 0           PUTBACK;
44              
45 0           call_sv(self->error_handler, G_DISCARD);
46              
47 0 0         FREETMPS;
48 0           LEAVE;
49             }
50              
51 0           static void emit_error_str(EV__Hiredis self, char* error) {
52 0 0         if (NULL == self->error_handler) return;
53 0           emit_error(self, sv_2mortal(newSVpv(error, 0)));
54             }
55              
56 1           static void remove_cb_queue(EV__Hiredis self) {
57             ngx_queue_t* q;
58             ev_hiredis_cb_t* cbt;
59              
60 1 50         while (!ngx_queue_empty(&self->cb_queue)) {
61             q = ngx_queue_last(&self->cb_queue);
62 0           cbt = ngx_queue_data(q, ev_hiredis_cb_t, queue);
63 0           ngx_queue_remove(q);
64              
65 0           SvREFCNT_dec(cbt->cb);
66 0           Safefree(cbt);
67             }
68 1           }
69              
70 0           static void EV__hiredis_connect_cb(redisAsyncContext* c, int status) {
71 0           EV__Hiredis self = (EV__Hiredis)c->data;
72              
73 0 0         if (REDIS_OK != status) {
74 0           self->ac = NULL;
75 0           emit_error_str(self, c->errstr);
76             }
77             else {
78 0 0         if (NULL == self->connect_handler) return;
79              
80 0           dSP;
81              
82 0           ENTER;
83 0           SAVETMPS;
84              
85 0 0         PUSHMARK(SP);
86 0           PUTBACK;
87              
88 0           call_sv(self->connect_handler, G_DISCARD);
89              
90 0 0         FREETMPS;
91 0           LEAVE;
92             }
93             }
94              
95 0           static void EV__hiredis_disconnect_cb(redisAsyncContext* c, int status) {
96 0           EV__Hiredis self = (EV__Hiredis)c->data;
97             SV* sv_error;
98              
99 0 0         if (REDIS_OK == status) {
100 0           self->ac = NULL;
101             }
102             else {
103 0           sv_error = sv_2mortal(newSVpv(c->errstr, 0));
104 0           self->ac = NULL;
105 0           emit_error(self, sv_error);
106             }
107              
108 0           remove_cb_queue(self);
109 0           }
110              
111 0           static void connect_common(EV__Hiredis self) {
112             int r;
113             SV* sv_error = NULL;
114              
115 0           self->ac->data = (void*)self;
116              
117 0           r = redisLibevAttach(self->loop, self->ac);
118 0 0         if (REDIS_OK != r) {
119 0           redisAsyncFree(self->ac);
120 0           self->ac = NULL;
121 0           emit_error_str(self, "connect error: cannot attach libev");
122 0           return;
123             }
124              
125 0           redisAsyncSetConnectCallback(self->ac, (redisConnectCallback*)EV__hiredis_connect_cb);
126 0           redisAsyncSetDisconnectCallback(self->ac, (redisDisconnectCallback*)EV__hiredis_disconnect_cb);
127              
128 0 0         if (self->ac->err) {
129 0           sv_error = sv_2mortal(newSVpvf("connect error: %s", self->ac->errstr));
130 0           redisAsyncFree(self->ac);
131 0           self->ac = NULL;
132 0           emit_error(self, sv_error);
133 0           return;
134             }
135             }
136              
137 0           static SV* EV__hiredis_decode_reply(redisReply* reply) {
138             SV* res = NULL;
139              
140 0           switch (reply->type) {
141             case REDIS_REPLY_STRING:
142             case REDIS_REPLY_ERROR:
143             case REDIS_REPLY_STATUS:
144 0           res = sv_2mortal(newSVpvn(reply->str, reply->len));
145 0           break;
146              
147             case REDIS_REPLY_INTEGER:
148 0           res = sv_2mortal(newSViv(reply->integer));
149 0           break;
150             case REDIS_REPLY_NIL:
151 0           res = sv_2mortal(newSV(0));
152 0           break;
153              
154             case REDIS_REPLY_ARRAY: {
155 0           AV* av = (AV*)sv_2mortal((SV*)newAV());
156 0           res = newRV_inc((SV*)av);
157              
158             size_t i;
159 0 0         for (i = 0; i < reply->elements; i++) {
160 0           av_push(av, SvREFCNT_inc(EV__hiredis_decode_reply(reply->element[i])));
161             }
162             break;
163             }
164             }
165              
166 0           return res;
167             }
168              
169 0           static void EV__hiredis_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
170             ev_hiredis_cb_t* cbt;
171             SV* sv_reply;
172             SV* sv_err;
173              
174             PERL_UNUSED_VAR(c);
175              
176             cbt = (ev_hiredis_cb_t*)privdata;
177              
178 0 0         if (NULL == reply) {
179 0           fprintf(stderr, "here error: %s\n", c->errstr);
180              
181 0           dSP;
182              
183 0           ENTER;
184 0           SAVETMPS;
185              
186 0           sv_err = sv_2mortal(newSVpv(c->errstr, 0));
187              
188 0 0         PUSHMARK(SP);
189 0           PUSHs(&PL_sv_undef);
190 0           PUSHs(sv_err);
191 0           PUTBACK;
192              
193 0           call_sv(cbt->cb, G_DISCARD);
194              
195 0 0         FREETMPS;
196 0           LEAVE;
197             }
198             else {
199 0           dSP;
200              
201 0           ENTER;
202 0           SAVETMPS;
203              
204 0 0         PUSHMARK(SP);
205 0           sv_reply = EV__hiredis_decode_reply((redisReply*)reply);
206 0 0         if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
207 0           PUSHs(&PL_sv_undef);
208 0           PUSHs(sv_reply);
209             }
210             else {
211 0           PUSHs(sv_reply);
212             }
213 0           PUTBACK;
214              
215 0           call_sv(cbt->cb, G_DISCARD);
216              
217 0 0         FREETMPS;
218 0           LEAVE;
219             }
220              
221 0 0         if (0 == cbt->persist) {
222 0           SvREFCNT_dec(cbt->cb);
223 0           ngx_queue_remove(&cbt->queue);
224 0           Safefree(cbt);
225             }
226 0           }
227              
228             MODULE = EV::Hiredis PACKAGE = EV::Hiredis
229              
230             BOOT:
231             {
232 11 50         I_EV_API("EV::Hiredis");
    50          
    50          
    50          
233             }
234              
235             EV::Hiredis
236             _new(char* class, EV::Loop loop);
237             CODE:
238             {
239             PERL_UNUSED_VAR(class);
240 1           Newxz(RETVAL, sizeof(ev_hiredis_t), ev_hiredis_t);
241 1           ngx_queue_init(&RETVAL->cb_queue);
242 1           RETVAL->loop = loop;
243             }
244             OUTPUT:
245             RETVAL
246              
247             void
248             DESTROY(EV::Hiredis self);
249             CODE:
250             {
251 1           self->loop = NULL;
252 1 50         if (NULL != self->ac) {
253 0           redisAsyncFree(self->ac);
254 0           self->ac = NULL;
255             }
256 1 50         if (NULL != self->error_handler) {
257             SvREFCNT_dec(self->error_handler);
258 1           self->error_handler = NULL;
259             }
260 1 50         if (NULL != self->connect_handler) {
261             SvREFCNT_dec(self->connect_handler);
262 0           self->connect_handler = NULL;
263             }
264              
265 1           remove_cb_queue(self);
266              
267 1           Safefree(self);
268             }
269              
270             void
271             connect(EV::Hiredis self, char* hostname, int port = 6379);
272             CODE:
273             {
274 0 0         if (NULL != self->ac) {
275 0           croak("already connected");
276             return;
277             }
278              
279 0           self->ac = redisAsyncConnect(hostname, port);
280 0 0         if (NULL == self->ac) {
281 0           croak("cannot allocate memory");
282             return;
283             }
284              
285 0           connect_common(self);
286             }
287              
288             void
289             connect_unix(EV::Hiredis self, char* path);
290             CODE:
291             {
292 0 0         if (NULL != self->ac) {
293 0           croak("already connected");
294             return;
295             }
296              
297 0           self->ac = redisAsyncConnectUnix(path);
298 0 0         if (NULL == self->ac) {
299 0           croak("cannot allocate memory");
300             return;
301             }
302              
303 0           connect_common(self);
304             }
305              
306             void
307             disconnect(EV::Hiredis self);
308             CODE:
309             {
310 0 0         if (NULL == self->ac) {
311 0           emit_error_str(self, "not connected");
312 0           return;
313             }
314              
315 0           redisAsyncDisconnect(self->ac);
316             }
317              
318             CV*
319             on_error(EV::Hiredis self, CV* handler = NULL);
320             CODE:
321             {
322 1 50         if (NULL != self->error_handler) {
323             SvREFCNT_dec(self->error_handler);
324 0           self->error_handler = NULL;
325             }
326              
327 1 50         if (NULL != handler) {
328 1           self->error_handler = SvREFCNT_inc(handler);
329             }
330              
331 1           RETVAL = (CV*)self->error_handler;
332             }
333             OUTPUT:
334             RETVAL
335              
336             void
337             on_connect(EV::Hiredis self, CV* handler = NULL);
338             CODE:
339             {
340 0 0         if (NULL != handler) {
341 0 0         if (NULL != self->connect_handler) {
342             SvREFCNT_dec(self->connect_handler);
343 0           self->connect_handler = NULL;
344             }
345              
346 0           self->connect_handler = SvREFCNT_inc(handler);
347             }
348              
349 0 0         if (self->connect_handler) {
350 0           ST(0) = self->connect_handler;
351 0           XSRETURN(1);
352             }
353             else {
354 0           XSRETURN(0);
355             }
356             }
357              
358             int
359             command(EV::Hiredis self, ...);
360             PREINIT:
361             SV* cb;
362             char** argv;
363             size_t* argvlen;
364             STRLEN len;
365             int argc, i;
366             ev_hiredis_cb_t* cbt;
367             CODE:
368             {
369 1 50         if (items <= 2) {
370 0           croak("Usage: command(\"command\", ..., $callback)");
371             }
372              
373 1           cb = ST(items - 1);
374 1 50         if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) {
    50          
375 0           croak("last arguments should be CODE reference");
376             }
377              
378 1 50         if (NULL == self->ac) {
379 1           croak("connect required before call command");
380             }
381              
382 0           argc = items - 2;
383 0 0         Newx(argv, sizeof(char*) * argc, char*);
384 0 0         Newx(argvlen, sizeof(size_t) * argc, size_t);
385              
386 0 0         for (i = 0; i < argc; i++) {
387 0 0         argv[i] = SvPV(ST(i + 1), len);
388 0           argvlen[i] = len;
389             }
390              
391 0           Newx(cbt, sizeof(ev_hiredis_cb_t), ev_hiredis_cb_t);
392 0           cbt->cb = SvREFCNT_inc(cb);
393 0           ngx_queue_init(&cbt->queue);
394 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
395              
396 0 0         if (0 == strcasecmp(argv[0], "subscribe")
397 0 0         || 0 == strcasecmp(argv[0], "psubscribe")
398 0 0         || 0 == strcasecmp(argv[0], "monitor")
399             ) {
400 0           cbt->persist = 1;
401             }
402             else {
403 0           cbt->persist = 0;
404             }
405              
406 0           RETVAL = redisAsyncCommandArgv(
407             self->ac, EV__hiredis_reply_cb, (void*)cbt,
408             argc, (const char**)argv, argvlen
409             );
410              
411 0           Safefree(argv);
412 0           Safefree(argvlen);
413             }
414             OUTPUT:
415             RETVAL
416