File Coverage

lib/EV/Hiredis.xs
Criterion Covered Total %
statement 25 197 12.6
branch 16 114 14.0
condition n/a
subroutine n/a
pod n/a
total 41 311 13.1


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4             #include "ppport.h"
5              
6             #include "EVAPI.h"
7              
8             #include "hiredis.h"
9             #include "async.h"
10             #include "libev_adapter.h"
11             #include "ngx-queue.h"
12              
13             typedef struct ev_hiredis_s ev_hiredis_t;
14             typedef struct ev_hiredis_cb_s ev_hiredis_cb_t;
15              
16             typedef ev_hiredis_t* EV__Hiredis;
17             typedef struct ev_loop* EV__Loop;
18              
19             struct ev_hiredis_s {
20             struct ev_loop* loop;
21             redisAsyncContext* ac;
22             SV* error_handler;
23             SV* connect_handler;
24             struct timeval* connect_timeout;
25             struct timeval* command_timeout;
26             ngx_queue_t cb_queue; /* for long term callbacks such as subscribe */
27             };
28              
29             struct ev_hiredis_cb_s {
30             SV* cb;
31             ngx_queue_t queue;
32             int persist;
33             };
34              
35 0           static void emit_error(EV__Hiredis self, SV* error) {
36 0 0         if (NULL == self->error_handler) return;
37              
38 0           dSP;
39              
40 0           ENTER;
41 0           SAVETMPS;
42              
43 0 0         PUSHMARK(SP);
44 0 0         XPUSHs(error);
45 0           PUTBACK;
46              
47 0           call_sv(self->error_handler, G_DISCARD);
48              
49 0 0         FREETMPS;
50 0           LEAVE;
51             }
52              
53 0           static void emit_error_str(EV__Hiredis self, char* error) {
54 0 0         if (NULL == self->error_handler) return;
55 0           emit_error(self, sv_2mortal(newSVpv(error, 0)));
56             }
57              
58 1           static void remove_cb_queue(EV__Hiredis self) {
59             ngx_queue_t* q;
60             ev_hiredis_cb_t* cbt;
61              
62 1 50         while (!ngx_queue_empty(&self->cb_queue)) {
63             q = ngx_queue_last(&self->cb_queue);
64 0           cbt = ngx_queue_data(q, ev_hiredis_cb_t, queue);
65 0           ngx_queue_remove(q);
66              
67 0           SvREFCNT_dec(cbt->cb);
68 0           Safefree(cbt);
69             }
70 1           }
71              
72 0           static void EV__hiredis_connect_cb(redisAsyncContext* c, int status) {
73 0           EV__Hiredis self = (EV__Hiredis)c->data;
74              
75 0 0         if (REDIS_OK != status) {
76 0           self->ac = NULL;
77 0           emit_error_str(self, c->errstr);
78             }
79             else {
80 0 0         if (NULL == self->connect_handler) return;
81              
82 0           dSP;
83              
84 0           ENTER;
85 0           SAVETMPS;
86              
87 0 0         PUSHMARK(SP);
88 0           PUTBACK;
89              
90 0           call_sv(self->connect_handler, G_DISCARD);
91              
92 0 0         FREETMPS;
93 0           LEAVE;
94             }
95             }
96              
97 0           static void EV__hiredis_disconnect_cb(redisAsyncContext* c, int status) {
98 0           EV__Hiredis self = (EV__Hiredis)c->data;
99             SV* sv_error;
100              
101 0 0         if (REDIS_OK == status) {
102 0           self->ac = NULL;
103             }
104             else {
105 0           sv_error = sv_2mortal(newSVpv(c->errstr, 0));
106 0           self->ac = NULL;
107 0           emit_error(self, sv_error);
108             }
109              
110 0           remove_cb_queue(self);
111 0           }
112              
113             static void pre_connect_common(EV__Hiredis self, redisOptions* opts) {
114 0 0         if (NULL != self->connect_timeout) {
    0          
115 0           opts->connect_timeout = self->connect_timeout;
116             }
117 0 0         if (NULL != self->command_timeout) {
    0          
118 0           opts->command_timeout = self->command_timeout;
119             }
120             }
121              
122 0           static void connect_common(EV__Hiredis self) {
123             int r;
124             SV* sv_error = NULL;
125              
126 0           self->ac->data = (void*)self;
127              
128 0           r = redisLibevAttach(self->loop, self->ac);
129 0 0         if (REDIS_OK != r) {
130 0           redisAsyncFree(self->ac);
131 0           self->ac = NULL;
132 0           emit_error_str(self, "connect error: cannot attach libev");
133 0           return;
134             }
135              
136 0           redisAsyncSetConnectCallback(self->ac, (redisConnectCallback*)EV__hiredis_connect_cb);
137 0           redisAsyncSetDisconnectCallback(self->ac, (redisDisconnectCallback*)EV__hiredis_disconnect_cb);
138              
139 0 0         if (self->ac->err) {
140 0           sv_error = sv_2mortal(newSVpvf("connect error: %s", self->ac->errstr));
141 0           redisAsyncFree(self->ac);
142 0           self->ac = NULL;
143 0           emit_error(self, sv_error);
144 0           return;
145             }
146             }
147              
148 0           static SV* EV__hiredis_decode_reply(redisReply* reply) {
149             SV* res = NULL;
150              
151 0           switch (reply->type) {
152             case REDIS_REPLY_STRING:
153             case REDIS_REPLY_ERROR:
154             case REDIS_REPLY_STATUS:
155 0           res = newSVpvn(reply->str, reply->len);
156 0           break;
157              
158             case REDIS_REPLY_INTEGER:
159 0           res = newSViv(reply->integer);
160 0           break;
161             case REDIS_REPLY_NIL:
162 0           res = newSV(0);
163 0           break;
164              
165             case REDIS_REPLY_ARRAY: {
166 0           AV* av = newAV();
167 0           av_extend(av, (SSize_t)reply->elements);
168             size_t i;
169 0 0         for (i = 0; i < reply->elements; i++) {
170 0           av_push(av, EV__hiredis_decode_reply(reply->element[i]));
171             }
172 0           res = newRV_noinc((SV*)av);
173 0           break;
174             }
175             }
176              
177 0           return res;
178             }
179              
180 0           static void EV__hiredis_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
181             ev_hiredis_cb_t* cbt;
182             SV* sv_reply;
183             SV* sv_err;
184              
185             PERL_UNUSED_VAR(c);
186              
187             cbt = (ev_hiredis_cb_t*)privdata;
188              
189 0 0         if (NULL == reply) {
190 0           fprintf(stderr, "here error: %s\n", c->errstr);
191              
192 0           dSP;
193              
194 0           ENTER;
195 0           SAVETMPS;
196              
197 0           sv_err = sv_2mortal(newSVpv(c->errstr, 0));
198              
199 0 0         PUSHMARK(SP);
200 0           PUSHs(&PL_sv_undef);
201 0           PUSHs(sv_err);
202 0           PUTBACK;
203              
204 0           call_sv(cbt->cb, G_DISCARD);
205              
206 0 0         FREETMPS;
207 0           LEAVE;
208             }
209             else {
210 0           dSP;
211              
212 0           ENTER;
213 0           SAVETMPS;
214              
215 0 0         PUSHMARK(SP);
216 0           sv_reply = sv_2mortal(EV__hiredis_decode_reply((redisReply*)reply));
217 0 0         if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
218 0           PUSHs(&PL_sv_undef);
219 0           PUSHs(sv_reply);
220             }
221             else {
222 0           PUSHs(sv_reply);
223             }
224 0           PUTBACK;
225              
226 0           call_sv(cbt->cb, G_DISCARD);
227              
228 0 0         FREETMPS;
229 0           LEAVE;
230             }
231              
232 0 0         if (0 == cbt->persist) {
233 0           SvREFCNT_dec(cbt->cb);
234 0           ngx_queue_remove(&cbt->queue);
235 0           Safefree(cbt);
236             }
237 0           }
238              
239             MODULE = EV::Hiredis PACKAGE = EV::Hiredis
240              
241             BOOT:
242             {
243 11 50         I_EV_API("EV::Hiredis");
    50          
    50          
    50          
244             }
245              
246             EV::Hiredis
247             _new(char* class, EV::Loop loop);
248             CODE:
249             {
250             PERL_UNUSED_VAR(class);
251 1           Newxz(RETVAL, 1, ev_hiredis_t);
252 1           ngx_queue_init(&RETVAL->cb_queue);
253 1           RETVAL->loop = loop;
254             }
255             OUTPUT:
256             RETVAL
257              
258             void
259             DESTROY(EV::Hiredis self);
260             CODE:
261             {
262 1           self->loop = NULL;
263 1 50         if (NULL != self->ac) {
264 0           redisAsyncFree(self->ac);
265 0           self->ac = NULL;
266             }
267 1 50         if (NULL != self->error_handler) {
268             SvREFCNT_dec(self->error_handler);
269 1           self->error_handler = NULL;
270             }
271 1 50         if (NULL != self->connect_handler) {
272             SvREFCNT_dec(self->connect_handler);
273 0           self->connect_handler = NULL;
274             }
275 1 50         if (NULL != self->connect_timeout) {
276 0           Safefree(self->connect_timeout);
277 0           self->connect_timeout = NULL;
278             }
279 1 50         if (NULL != self->command_timeout) {
280 0           Safefree(self->command_timeout);
281 0           self->command_timeout = NULL;
282             }
283              
284 1           remove_cb_queue(self);
285              
286 1           Safefree(self);
287             }
288              
289             void
290             connect(EV::Hiredis self, char* hostname, int port = 6379);
291             CODE:
292             {
293 0 0         if (NULL != self->ac) {
294 0           croak("already connected");
295             return;
296             }
297              
298 0           redisOptions opts = {0};
299             pre_connect_common(self, &opts);
300 0           REDIS_OPTIONS_SET_TCP(&opts, hostname, port);
301 0           self->ac = redisAsyncConnectWithOptions(&opts);
302 0 0         if (NULL == self->ac) {
303 0           croak("cannot allocate memory");
304             return;
305             }
306              
307 0           connect_common(self);
308             }
309              
310             void
311             connect_unix(EV::Hiredis self, const char* path);
312             CODE:
313             {
314 0 0         if (NULL != self->ac) {
315 0           croak("already connected");
316             return;
317             }
318              
319 0           redisOptions opts = {0};
320             pre_connect_common(self, &opts);
321 0           REDIS_OPTIONS_SET_UNIX(&opts, path);
322 0           self->ac = redisAsyncConnectWithOptions(&opts);
323 0 0         if (NULL == self->ac) {
324 0           croak("cannot allocate memory");
325             return;
326             }
327              
328 0           connect_common(self);
329             }
330              
331             void
332             disconnect(EV::Hiredis self);
333             CODE:
334             {
335 0 0         if (NULL == self->ac) {
336 0           emit_error_str(self, "not connected");
337 0           return;
338             }
339              
340 0           redisAsyncDisconnect(self->ac);
341             }
342              
343             void
344             connect_timeout(EV::Hiredis self, int timeout_ms);
345             CODE:
346             {
347 0 0         if (NULL == self->connect_timeout) {
348 0           Newx(self->connect_timeout, 1, struct timeval);
349             }
350 0           self->connect_timeout->tv_sec = timeout_ms / 1000;
351 0           self->connect_timeout->tv_usec = (timeout_ms % 1000) * 1000;
352             }
353              
354             void
355             command_timeout(EV::Hiredis self, int timeout_ms);
356             CODE:
357             {
358 0 0         if (NULL == self->command_timeout) {
359 0           Newx(self->command_timeout, 1, struct timeval);
360             }
361 0           self->command_timeout->tv_sec = timeout_ms / 1000;
362 0           self->command_timeout->tv_usec = (timeout_ms % 1000) * 1000;
363             }
364              
365             CV*
366             on_error(EV::Hiredis self, CV* handler = NULL);
367             CODE:
368             {
369 1 50         if (NULL != self->error_handler) {
370             SvREFCNT_dec(self->error_handler);
371 0           self->error_handler = NULL;
372             }
373              
374 1 50         if (NULL != handler) {
375 1           self->error_handler = SvREFCNT_inc(handler);
376             }
377              
378 1           RETVAL = (CV*)self->error_handler;
379             }
380             OUTPUT:
381             RETVAL
382              
383             void
384             on_connect(EV::Hiredis self, CV* handler = NULL);
385             CODE:
386             {
387 0 0         if (NULL != handler) {
388 0 0         if (NULL != self->connect_handler) {
389             SvREFCNT_dec(self->connect_handler);
390 0           self->connect_handler = NULL;
391             }
392              
393 0           self->connect_handler = SvREFCNT_inc(handler);
394             }
395              
396 0 0         if (self->connect_handler) {
397 0           ST(0) = self->connect_handler;
398 0           XSRETURN(1);
399             }
400             else {
401 0           XSRETURN(0);
402             }
403             }
404              
405             int
406             command(EV::Hiredis self, ...);
407             PREINIT:
408             SV* cb;
409             char** argv;
410             size_t* argvlen;
411             STRLEN len;
412             int argc, i;
413             ev_hiredis_cb_t* cbt;
414             CODE:
415             {
416 1 50         if (items <= 2) {
417 0           croak("Usage: command(\"command\", ..., $callback)");
418             }
419              
420 1           cb = ST(items - 1);
421 1 50         if (!(SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV)) {
    50          
422 0           croak("last arguments should be CODE reference");
423             }
424              
425 1 50         if (NULL == self->ac) {
426 1           croak("connect required before call command");
427             }
428              
429 0           argc = items - 2;
430 0 0         Newx(argv, argc, char*);
431 0 0         Newx(argvlen, argc, size_t);
432              
433 0 0         for (i = 0; i < argc; i++) {
434 0 0         argv[i] = SvPV(ST(i + 1), len);
435 0           argvlen[i] = len;
436             }
437              
438 0           Newx(cbt, 1, ev_hiredis_cb_t);
439 0           cbt->cb = SvREFCNT_inc(cb);
440 0           ngx_queue_init(&cbt->queue);
441 0           ngx_queue_insert_tail(&self->cb_queue, &cbt->queue);
442              
443 0 0         if (0 == strcasecmp(argv[0], "subscribe")
444 0 0         || 0 == strcasecmp(argv[0], "psubscribe")
445 0 0         || 0 == strcasecmp(argv[0], "monitor")
446             ) {
447 0           cbt->persist = 1;
448             }
449             else {
450 0           cbt->persist = 0;
451             }
452              
453 0           RETVAL = redisAsyncCommandArgv(
454             self->ac, EV__hiredis_reply_cb, (void*)cbt,
455             argc, (const char**)argv, argvlen
456             );
457              
458 0           Safefree(argv);
459 0           Safefree(argvlen);
460             }
461             OUTPUT:
462             RETVAL