File Coverage

lib/Redis/Fast.xs
Criterion Covered Total %
statement 343 686 50.0
branch 207 636 32.5
condition n/a
subroutine n/a
pod n/a
total 550 1322 41.6


line stmt bran cond sub pod time code
1             #include "EXTERN.h"
2             #include "perl.h"
3             #include "XSUB.h"
4              
5             #include "ppport.h"
6             #include "hiredis.h"
7             #include "async.h"
8              
9             #include
10             #include
11             #include
12             #include
13             #include
14             #include
15              
16             #define MAX_ERROR_SIZE 256
17              
18             #define WAIT_FOR_EVENT_OK 0
19             #define WAIT_FOR_EVENT_READ_TIMEOUT 1
20             #define WAIT_FOR_EVENT_WRITE_TIMEOUT 2
21             #define WAIT_FOR_EVENT_EXCEPTION 3
22              
23             #define FLAG_INSIDE_TRANSACTION 0x01
24             #define FLAG_INSIDE_WATCH 0x02
25              
26             #define DEBUG_MSG(fmt, ...) \
27             if (self->debug) { \
28             fprintf(stderr, "[%d][%d][%s:%d:%s]: ", getpid(), getppid(), __FILE__, __LINE__, __func__); \
29             fprintf(stderr, fmt, __VA_ARGS__); \
30             fprintf(stderr, "\n"); \
31             }
32              
33             #define EQUALS_COMMAND(len, cmd, expected) ((len) == sizeof(expected) - 1 && memcmp(cmd, expected, sizeof(expected) - 1) == 0)
34              
35             typedef struct redis_fast_s {
36             redisAsyncContext* ac;
37             char* hostname;
38             int port;
39             char* path;
40             char* error;
41             double reconnect;
42             int every;
43             int debug;
44             double cnx_timeout;
45             double read_timeout;
46             double write_timeout;
47             int current_database;
48             int need_reconnect;
49             int is_connected;
50             SV* on_connect;
51             SV* on_build_sock;
52             SV* data;
53             SV* reconnect_on_error;
54             double next_reconnect_on_error_at;
55             int proccess_sub_count;
56             int is_subscriber;
57             int expected_subs;
58             pid_t pid;
59             int flags;
60             } redis_fast_t, *Redis__Fast;
61              
62             typedef struct redis_fast_reply_s {
63             SV* result;
64             SV* error;
65             } redis_fast_reply_t;
66              
67             typedef redis_fast_reply_t (*CUSTOM_DECODE)(Redis__Fast self, redisReply* reply, int collect_errors);
68              
69             typedef struct redis_fast_sync_cb_s {
70             redis_fast_reply_t ret;
71             int collect_errors;
72             CUSTOM_DECODE custom_decode;
73             int on_flags;
74             int off_flags;
75             } redis_fast_sync_cb_t;
76              
77             typedef struct redis_fast_async_cb_s {
78             SV* cb;
79             int collect_errors;
80             CUSTOM_DECODE custom_decode;
81             int on_flags;
82             int off_flags;
83             const void* command_name;
84             STRLEN command_length;
85             } redis_fast_async_cb_t;
86              
87             typedef struct redis_fast_subscribe_cb_s {
88             Redis__Fast self;
89             SV* cb;
90             } redis_fast_subscribe_cb_t;
91              
92              
93             #define WAIT_FOR_READ 0x01
94             #define WAIT_FOR_WRITE 0x02
95             typedef struct redis_fast_event_s {
96             int flags;
97             Redis__Fast self;
98             } redis_fast_event_t;
99              
100              
101 203           static void AddRead(void *privdata) {
102             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
103 203           Redis__Fast self = e->self;
104 203           e->flags |= WAIT_FOR_READ;
105 203 50         DEBUG_MSG("flags = %x", e->flags);
106 203           }
107              
108 0           static void DelRead(void *privdata) {
109             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
110 0           Redis__Fast self = e->self;
111 0           e->flags &= ~WAIT_FOR_READ;
112 0 0         DEBUG_MSG("flags = %x", e->flags);
113 0           }
114              
115 137           static void AddWrite(void *privdata) {
116             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
117 137           Redis__Fast self = e->self;
118 137           e->flags |= WAIT_FOR_WRITE;
119 137 50         DEBUG_MSG("flags = %x", e->flags);
120 137           }
121              
122 136           static void DelWrite(void *privdata) {
123             redis_fast_event_t *e = (redis_fast_event_t*)privdata;
124 136           Redis__Fast self = e->self;
125 136           e->flags &= ~WAIT_FOR_WRITE;
126 136 50         DEBUG_MSG("flags = %x", e->flags);
127 136           }
128              
129 69           static void Cleanup(void *privdata) {
130 69           free(privdata);
131 69           }
132              
133 69           static int Attach(redisAsyncContext *ac) {
134 69           Redis__Fast self = (Redis__Fast)ac->data;
135             redis_fast_event_t *e;
136              
137             /* Nothing should be attached when something is already attached */
138 69 50         if (ac->ev.data != NULL)
139             return REDIS_ERR;
140              
141             /* Create container for context and r/w events */
142 69           e = (redis_fast_event_t*)malloc(sizeof(*e));
143 69           e->flags = 0;
144 69           e->self = self;
145              
146             /* Register functions to start/stop listening for events */
147 69           ac->ev.addRead = AddRead;
148 69           ac->ev.delRead = DelRead;
149 69           ac->ev.addWrite = AddWrite;
150 69           ac->ev.delWrite = DelWrite;
151 69           ac->ev.cleanup = Cleanup;
152 69           ac->ev.data = e;
153              
154 69           return REDIS_OK;
155             }
156              
157 55559           static int wait_for_event(Redis__Fast self, double read_timeout, double write_timeout) {
158             redisContext *c;
159             int fd;
160             redis_fast_event_t *e;
161             struct pollfd pollfd;
162             int rc;
163             double timeout = -1;
164             int timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
165             int ms;
166              
167 55559 50         if(self==NULL) return WAIT_FOR_EVENT_EXCEPTION;
168 55559 50         if(self->ac==NULL) return WAIT_FOR_EVENT_EXCEPTION;
169              
170             c = &(self->ac->c);
171 55559           fd = c->fd;
172 55559           e = (redis_fast_event_t*)self->ac->ev.data;
173 55559 50         if(e==NULL) return 0;
174              
175 55559 100         if((e->flags & (WAIT_FOR_READ|WAIT_FOR_WRITE)) == (WAIT_FOR_READ|WAIT_FOR_WRITE)) {
176 68 50         DEBUG_MSG("set READ and WRITE, compare read_timeout = %f and write_timeout = %f",
177             read_timeout, write_timeout);
178 68 100         if(read_timeout < 0 && write_timeout < 0) {
    50          
179             timeout = -1;
180             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
181 2 50         } else if(read_timeout < 0) {
182             timeout = write_timeout;
183             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
184 2 50         } else if(write_timeout < 0) {
185             timeout = read_timeout;
186             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
187 55559 0         } else if(read_timeout < write_timeout) {
188             timeout = read_timeout;
189             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
190             } else {
191             timeout = write_timeout;
192             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
193             }
194 55491 100         } else if(e->flags & WAIT_FOR_READ) {
195 68 50         DEBUG_MSG("set READ, read_timeout = %f", read_timeout);
196             timeout = read_timeout;
197             timeout_mode = WAIT_FOR_EVENT_READ_TIMEOUT;
198 55423 50         } else if(e->flags & WAIT_FOR_WRITE) {
199 55423 50         DEBUG_MSG("set WRITE, write_timeout = %f", write_timeout);
200             timeout = write_timeout;
201             timeout_mode = WAIT_FOR_EVENT_WRITE_TIMEOUT;
202             }
203              
204             START_POLL:
205 55559 100         if (timeout < 0) {
206             ms = -1;
207             } else {
208 4           ms = (int)(timeout * 1000 + 0.999);
209             }
210 55559 50         DEBUG_MSG("select start, timeout is %f", timeout);
211 55559           pollfd.fd = fd;
212 55559           pollfd.events = 0;
213 55559           pollfd.revents = 0;
214 55559 100         if(e->flags & WAIT_FOR_READ) { pollfd.events |= POLLIN; }
215 55559 100         if(e->flags & WAIT_FOR_WRITE) { pollfd.events |= POLLOUT; }
216             rc = poll(&pollfd, 1, ms);
217 55559 50         DEBUG_MSG("poll returns %d", rc);
218 55559 100         if(rc == 0) {
219 1 50         DEBUG_MSG("%s", "timeout");
220             return timeout_mode;
221             }
222              
223 55558 50         if(rc < 0) {
224 0 0         DEBUG_MSG("exception: %s", strerror(errno));
225 0 0         if( errno == EINTR ) {
226 0 0         PERL_ASYNC_CHECK();
227 0 0         DEBUG_MSG("%s", "recieved interrupt. retry wait_for_event");
228             goto START_POLL;
229             }
230             return WAIT_FOR_EVENT_EXCEPTION;
231             }
232 55558 50         if(self->ac && (pollfd.revents & POLLIN) != 0) {
    100          
233 67 50         DEBUG_MSG("ready to %s", "read");
234 67           redisAsyncHandleRead(self->ac);
235             }
236 55558 100         if(self->ac && (pollfd.revents & (POLLOUT|POLLHUP)) != 0) {
    100          
237 55491 50         DEBUG_MSG("ready to %s", "write");
238 55491           redisAsyncHandleWrite(self->ac);
239             }
240 55558 100         if((pollfd.revents & (POLLERR|POLLNVAL)) != 0) {
241 1 50         DEBUG_MSG(
    0          
    0          
242             "exception: %s%s",
243             (pollfd.revents & POLLERR) ? "POLLERR " : "",
244             (pollfd.revents & POLLNVAL) ? "POLLNVAL " : "");
245             return WAIT_FOR_EVENT_EXCEPTION;
246             }
247              
248 55557 50         DEBUG_MSG("%s", "finish");
249             return WAIT_FOR_EVENT_OK;
250             }
251              
252              
253 130           static int _wait_all_responses(Redis__Fast self) {
254 130 50         DEBUG_MSG("%s", "start");
255 265 100         while(self->ac && self->ac->replies.tail) {
    100          
256 136           int res = wait_for_event(self, self->read_timeout, self->write_timeout);
257 136 100         if (res != WAIT_FOR_EVENT_OK) {
258 1 50         DEBUG_MSG("error: %d", res);
259             return res;
260             }
261             }
262 129 50         DEBUG_MSG("%s", "finish");
263             return WAIT_FOR_EVENT_OK;
264             }
265              
266              
267 69           static void Redis__Fast_connect_cb(redisAsyncContext* c, int status) {
268 69           Redis__Fast self = (Redis__Fast)c->data;
269 69 50         DEBUG_MSG("connected status = %d", status);
270 69 100         if(status != REDIS_OK) {
271             // Connection Error!!
272             // Redis context will close automatically
273 1           self->ac = NULL;
274             } else {
275 68           self->is_connected = 1;
276             }
277 69           }
278              
279 68           static void Redis__Fast_disconnect_cb(redisAsyncContext* c, int status) {
280 68           Redis__Fast self = (Redis__Fast)c->data;
281             PERL_UNUSED_VAR(status);
282 68 50         DEBUG_MSG("disconnected status = %d", status);
283 68           self->ac = NULL;
284 68           }
285              
286 69           static redisAsyncContext* __build_sock(Redis__Fast self)
287             {
288             redisAsyncContext *ac;
289             double timeout;
290             int res;
291              
292 69 50         DEBUG_MSG("%s", "start");
293              
294 69 50         if(self->on_build_sock) {
295 0           dSP;
296              
297 0           ENTER;
298 0           SAVETMPS;
299              
300 0 0         PUSHMARK(SP);
301 0           call_sv(self->on_build_sock, G_DISCARD | G_NOARGS);
302              
303 0 0         FREETMPS;
304 0           LEAVE;
305             }
306              
307 69 100         if(self->path) {
308 66           ac = redisAsyncConnectUnix(self->path);
309             } else {
310 3           ac = redisAsyncConnect(self->hostname, self->port);
311             }
312              
313 69 50         if(ac == NULL) {
314 0 0         DEBUG_MSG("%s", "allocation error");
315             return NULL;
316             }
317 69 50         if(ac->err) {
318 0 0         DEBUG_MSG("connection error: %s", ac->errstr);
319 0           redisAsyncFree(ac);
320 0           return NULL;
321             }
322 69           ac->data = (void*)self;
323 69           self->ac = ac;
324 69           self->is_connected = 0;
325              
326 69           Attach(ac);
327 69           redisAsyncSetConnectCallback(ac, (redisConnectCallback*)Redis__Fast_connect_cb);
328 69           redisAsyncSetDisconnectCallback(ac, (redisDisconnectCallback*)Redis__Fast_disconnect_cb);
329              
330             // wait to connect...
331             timeout = -1;
332 69 50         if(self->cnx_timeout) {
333             timeout = self->cnx_timeout;
334             }
335 55491 100         while(!self->is_connected) {
336 55423           res = wait_for_event(self, timeout, timeout);
337 55423 100         if(self->ac == NULL) {
338             // set is_connected flag to reconnect.
339             // see https://github.com/shogo82148/Redis-Fast/issues/73
340 1           self->is_connected = 1;
341              
342 1           return NULL;
343             }
344 55422 50         if(res != WAIT_FOR_EVENT_OK) {
345 0 0         DEBUG_MSG("error: %d", res);
346              
347             // free the redis context
348 0           redisAsyncFree(self->ac);
349 0           _wait_all_responses(self);
350 0           self->ac = NULL;
351              
352             // set is_connected flag to reconnect.
353             // see https://github.com/shogo82148/Redis-Fast/issues/73
354 0           self->is_connected = 1;
355              
356 69           return NULL;
357             }
358             }
359 68 50         if(self->on_connect){
360 68           dSP;
361 68 50         PUSHMARK(SP);
362 68           call_sv(self->on_connect, G_DISCARD | G_NOARGS);
363             }
364              
365 68 50         DEBUG_MSG("%s", "finish");
366 68           return self->ac;
367             }
368              
369              
370 69           static void Redis__Fast_connect(Redis__Fast self) {
371             struct timeval start, end;
372              
373 69 50         DEBUG_MSG("%s", "start");
374              
375 69           self->flags = 0;
376              
377             //$self->{queue} = [];
378 69           self->pid = getpid();
379              
380 69 100         if(self->reconnect == 0) {
381 3 100         if(! __build_sock(self)) {
382 1 50         if(self->path) {
383 0           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
384             } else {
385 1           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
386             }
387 1           croak("%s", self->error);
388             }
389 68           return ;
390             }
391              
392             // Reconnect...
393 66           gettimeofday(&start, NULL);
394             while (1) {
395             double elapsed_time;
396 66 50         if(__build_sock(self)) {
397             // Connected!
398 66 50         DEBUG_MSG("%s", "finish");
399             return;
400             }
401 0           gettimeofday(&end, NULL);
402 0           elapsed_time = (end.tv_sec-start.tv_sec) + 1E-6 * (end.tv_usec-start.tv_usec);
403 0 0         DEBUG_MSG("elapsed time:%f, reconnect:%lf", elapsed_time, self->reconnect);
404 0 0         if( elapsed_time > self->reconnect) {
405 0 0         if(self->path) {
406 0           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s", self->path);
407             } else {
408 0           snprintf(self->error, MAX_ERROR_SIZE, "Could not connect to Redis server at %s:%d", self->hostname, self->port);
409             }
410 0 0         DEBUG_MSG("%s", "timed out");
411 0           croak("%s", self->error);
412             return;
413             }
414 0 0         DEBUG_MSG("%s", "failed to connect. wait...");
415 0           usleep(self->every);
416 0           }
417             DEBUG_MSG("%s", "finish");
418             }
419              
420             // reconnect if the current connection is closed.
421             // the caller must check self->ac != 0 to continue.
422 71           static void Redis__Fast_reconnect(Redis__Fast self) {
423 71 50         DEBUG_MSG("%s", "start");
424 71 50         if(self->is_connected && !self->ac && self->reconnect > 0) {
    100          
    50          
425 0 0         DEBUG_MSG("%s", "connection not found. reconnect");
426 0           Redis__Fast_connect(self);
427             }
428 71 100         if(!self->ac) {
429 3 50         DEBUG_MSG("%s", "Not connected to any server");
430             }
431 71 50         DEBUG_MSG("%s", "finish");
432 71           }
433              
434 140           static redis_fast_reply_t Redis__Fast_decode_reply(Redis__Fast self, redisReply* reply, int collect_errors) {
435             redis_fast_reply_t res = {NULL, NULL};
436              
437 140           switch (reply->type) {
438             case REDIS_REPLY_ERROR:
439 14           res.error = sv_2mortal(newSVpvn(reply->str, reply->len));
440 14           break;
441             case REDIS_REPLY_STRING:
442             case REDIS_REPLY_STATUS:
443 51           res.result = sv_2mortal(newSVpvn(reply->str, reply->len));
444 51           break;
445              
446             case REDIS_REPLY_INTEGER:
447 37           res.result = sv_2mortal(newSViv(reply->integer));
448 37           break;
449             case REDIS_REPLY_NIL:
450             res.result = &PL_sv_undef;
451 18           break;
452              
453             case REDIS_REPLY_ARRAY: {
454 20           AV* av = newAV();
455             size_t i;
456 20           res.result = sv_2mortal(newRV_noinc((SV*)av));
457              
458 93 100         for (i = 0; i < reply->elements; i++) {
459 73           redis_fast_reply_t elem = Redis__Fast_decode_reply(self, reply->element[i], collect_errors);
460 73 100         if(collect_errors) {
461 3           AV* elem_av = (AV*)sv_2mortal((SV*)newAV());
462 3 100         if(elem.result) {
463 2           av_push(elem_av, SvREFCNT_inc(elem.result));
464             } else {
465 1           av_push(elem_av, newSV(0));
466             }
467 3 100         if(elem.error) {
468 1           av_push(elem_av, SvREFCNT_inc(elem.error));
469             } else {
470 2           av_push(elem_av, newSV(0));
471             }
472 3           av_push(av, newRV_inc((SV*)elem_av));
473             } else {
474 70 100         if(elem.result) {
475 68           av_push(av, SvREFCNT_inc(elem.result));
476             } else {
477 2           av_push(av, newSV(0));
478             }
479 70 100         if(elem.error && !res.error) {
    50          
480             res.error = elem.error;
481             }
482             }
483             }
484             break;
485             }
486             }
487              
488 140           return res;
489             }
490              
491 65           static int Redis__Fast_call_reconnect_on_error(Redis__Fast self, redis_fast_reply_t ret, const void *command_name, STRLEN command_length) {
492             int _need_reconnect = 0;
493             struct timeval current;
494             double current_sec;
495             SV* sv_ret;
496             SV* sv_err;
497             SV* sv_cmd;
498             int count;
499              
500 65 100         if (ret.error == NULL) {
501             return _need_reconnect;
502             }
503 13 50         if (self->reconnect_on_error == NULL) {
504             return _need_reconnect;
505             }
506              
507 0           gettimeofday(¤t, NULL);
508 0           current_sec = current.tv_sec + 1E-6 * current.tv_usec;
509 0 0         if( self->next_reconnect_on_error_at < 0 ||
    0          
510             self->next_reconnect_on_error_at < current_sec) {
511 0           dSP;
512 0           ENTER;
513 0           SAVETMPS;
514              
515 0 0         sv_ret = ret.result ? ret.result : &PL_sv_undef;
516             sv_err = ret.error;
517 0           sv_cmd = sv_2mortal(newSVpvn((const char*)command_name, command_length));
518              
519 0 0         PUSHMARK(SP);
520 0 0         XPUSHs(sv_err);
521 0 0         XPUSHs(sv_ret);
522 0 0         XPUSHs(sv_cmd);
523 0           PUTBACK;
524              
525 0           count = call_sv(self->reconnect_on_error, G_SCALAR);
526              
527 0           SPAGAIN;
528              
529 0 0         if (count != 1) {
530 0           croak("[BUG] retval count should be 1\n");
531             }
532 0 0         _need_reconnect = POPi;
533              
534 0           PUTBACK;
535 0 0         FREETMPS;
536 0           LEAVE;
537             }
538              
539             return _need_reconnect;
540             }
541              
542 67           static void Redis__Fast_sync_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
543 67           Redis__Fast self = (Redis__Fast)c->data;
544             redis_fast_sync_cb_t *cbt = (redis_fast_sync_cb_t*)privdata;
545 67 50         DEBUG_MSG("%p", (void*)privdata);
546 67 100         if(reply) {
547 66           self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
548 66 50         if(cbt->custom_decode) {
549 0           cbt->ret = (cbt->custom_decode)(self, (redisReply*)reply, cbt->collect_errors);
550             } else {
551 66           cbt->ret = Redis__Fast_decode_reply(self, (redisReply*)reply, cbt->collect_errors);
552             }
553 1 50         } else if(c->c.flags & REDIS_FREEING) {
554 1 50         DEBUG_MSG("%s", "redis freeing");
555 1           Safefree(cbt);
556             } else {
557 0 0         DEBUG_MSG("connect error: %s", c->errstr);
558 0           self->need_reconnect = 1;
559 0           cbt->ret.result = NULL;
560 0           cbt->ret.error = sv_2mortal( newSVpvn(c->errstr, strlen(c->errstr)) );
561             }
562 67 50         DEBUG_MSG("%s", "finish");
563 67           }
564              
565 1           static void Redis__Fast_async_reply_cb(redisAsyncContext* c, void* reply, void* privdata) {
566 1           Redis__Fast self = (Redis__Fast)c->data;
567             redis_fast_async_cb_t *cbt = (redis_fast_async_cb_t*)privdata;
568 1 50         DEBUG_MSG("%p, %p", reply, privdata);
569 1 50         if (reply) {
570 1           self->flags = (self->flags | cbt->on_flags) & cbt->off_flags;
571              
572             {
573             redis_fast_reply_t result;
574              
575 1           dSP;
576              
577 1           ENTER;
578 1           SAVETMPS;
579              
580 1 50         if(cbt->custom_decode) {
581 0           result = (cbt->custom_decode)(self, (redisReply*)reply, cbt->collect_errors);
582             } else {
583 1           result = Redis__Fast_decode_reply(self, (redisReply*)reply, cbt->collect_errors);
584             }
585              
586 1 50         if(result.result == NULL) result.result = &PL_sv_undef;
587 1 50         if(result.error == NULL) result.error = &PL_sv_undef;
588              
589 1 50         PUSHMARK(SP);
590 1 50         XPUSHs(result.result);
591 1 50         XPUSHs(result.error);
592 1           PUTBACK;
593              
594 1           call_sv(cbt->cb, G_DISCARD);
595              
596 1 50         FREETMPS;
597 1           LEAVE;
598             }
599              
600             {
601 1 50         if (0 < self->reconnect && !self->need_reconnect
    50          
602             // Avoid useless cost when reconnect_on_error is not set.
603 1 50         && self->reconnect_on_error != NULL) {
604             redis_fast_reply_t result;
605 0 0         if(cbt->custom_decode) {
606 0           result = (cbt->custom_decode)(
607             self, (redisReply*)reply, cbt->collect_errors
608             );
609             } else {
610 0           result = Redis__Fast_decode_reply(
611             self, (redisReply*)reply, cbt->collect_errors
612             );
613             }
614 0           self->need_reconnect = Redis__Fast_call_reconnect_on_error(
615             self, result, cbt->command_name, cbt->command_length
616             );
617             }
618             }
619             } else {
620 0 0         if (c->c.flags & REDIS_FREEING) {
621 0 0         DEBUG_MSG("%s", "redis freeing");
622             } else {
623 0 0         DEBUG_MSG("connect error: %s", c->errstr);
624             }
625              
626             {
627             redis_fast_reply_t result;
628             const char *msg;
629              
630 0           dSP;
631              
632 0           ENTER;
633 0           SAVETMPS;
634              
635             result.result = &PL_sv_undef;
636 0 0         if (c->c.flags & REDIS_FREEING) {
637             msg = "redis freeing";
638             } else {
639 0           msg = c->errstr;
640             }
641 0 0         DEBUG_MSG("error: %s", msg);
642 0           result.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
643              
644 0 0         PUSHMARK(SP);
645 0 0         XPUSHs(result.result);
646 0 0         XPUSHs(result.error);
647 0           PUTBACK;
648              
649 0           call_sv(cbt->cb, G_DISCARD);
650              
651 0 0         FREETMPS;
652 0           LEAVE;
653             }
654             }
655              
656 1           SvREFCNT_dec(cbt->cb);
657 1           Safefree(cbt);
658 1           }
659              
660 0           static void Redis__Fast_subscribe_cb(redisAsyncContext* c, void* reply, void* privdata) {
661             int is_need_free = 0;
662 0           Redis__Fast self = (Redis__Fast)c->data;
663             redis_fast_subscribe_cb_t *cbt = (redis_fast_subscribe_cb_t*)privdata;
664             redisReply* r = (redisReply*)reply;
665              
666 0 0         DEBUG_MSG("%s", "start");
667 0 0         if(!cbt) {
668 0 0         DEBUG_MSG("%s", "cbt is empty finished");
669             return ;
670             }
671              
672 0 0         if (r) {
673 0           char* stype = r->element[0]->str;
674 0           int pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
675             redis_fast_reply_t res;
676              
677 0           dSP;
678 0           ENTER;
679 0           SAVETMPS;
680              
681 0           res = Redis__Fast_decode_reply(self, r, 0);
682              
683 0 0         if (strcasecmp(stype+pvariant,"subscribe") == 0) {
684 0 0         DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
685 0           self->is_subscriber = r->element[2]->integer;
686 0           self->expected_subs--;
687 0 0         } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
688 0 0         DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
689 0           self->is_subscriber = r->element[2]->integer;
690             is_need_free = 1;
691 0           self->expected_subs--;
692             } else {
693 0 0         DEBUG_MSG("%s %s", r->element[0]->str, r->element[1]->str);
694 0           self->proccess_sub_count++;
695             }
696              
697 0 0         if(res.result == NULL) res.result = &PL_sv_undef;
698 0 0         if(res.error == NULL) res.error = &PL_sv_undef;
699              
700 0 0         PUSHMARK(SP);
701 0 0         XPUSHs(res.result);
702 0 0         XPUSHs(res.error);
703 0           PUTBACK;
704              
705 0           call_sv(cbt->cb, G_DISCARD);
706              
707 0 0         FREETMPS;
708 0           LEAVE;
709             } else {
710 0 0         DEBUG_MSG("connect error: %s", c->errstr);
711             is_need_free = 1;
712             }
713              
714 0 0         if(is_need_free) {
715             // destroy private data
716 0 0         DEBUG_MSG("destroy %p", cbt);
717 0 0         if(cbt->cb) {
718             SvREFCNT_dec(cbt->cb);
719 0           cbt->cb = NULL;
720             }
721 0           Safefree(cbt);
722             }
723 0 0         DEBUG_MSG("%s", "finish");
724             }
725              
726 0           static void Redis__Fast_quit(Redis__Fast self) {
727             redis_fast_sync_cb_t *cbt;
728              
729 0 0         if(!self->ac) {
730             return;
731             }
732              
733 0           Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
734 0           cbt->ret.result = NULL;
735 0           cbt->ret.error = NULL;
736 0           cbt->custom_decode = NULL;
737              
738             // initialize, or self->flags will be corrupted.
739 0           cbt->on_flags = 0;
740 0           cbt->off_flags = 0;
741              
742 0           redisAsyncCommand(
743             self->ac, Redis__Fast_sync_reply_cb, cbt, "QUIT"
744             );
745 0           redisAsyncDisconnect(self->ac);
746 0 0         if(_wait_all_responses(self) == WAIT_FOR_EVENT_OK) {
747 0 0         DEBUG_MSG("%s", "wait_all_responses ok");
748 0 0         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
    0          
749             } else {
750 0 0         DEBUG_MSG("%s", "wait_all_responses not ok");
751 0 0         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
    0          
752             }
753 0 0         DEBUG_MSG("%s", "finish");
754             }
755              
756 68           static redis_fast_reply_t Redis__Fast_run_cmd(Redis__Fast self, int collect_errors, CUSTOM_DECODE custom_decode, SV* cb, int argc, const char** argv, size_t* argvlen) {
757             redis_fast_reply_t ret = {NULL, NULL};
758             int on_flags = 0, off_flags = ~0;
759              
760 68 50         DEBUG_MSG("start %s", argv[0]);
761              
762 68 50         DEBUG_MSG("pid check: previous pid is %d, now %d", self->pid, getpid());
763 68 50         if(self->pid != getpid()) {
764 0 0         DEBUG_MSG("%s", "pid changed. create new connection..");
765 0           Redis__Fast_connect(self);
766             }
767              
768 68 50         if(EQUALS_COMMAND(argvlen[0], argv[0], "MULTI")) {
    0          
769             on_flags = FLAG_INSIDE_TRANSACTION;
770 68 100         } else if(EQUALS_COMMAND(argvlen[0], argv[0], "EXEC") ||
    50          
    50          
771 0 0         EQUALS_COMMAND(argvlen[0], argv[0], "DISCARD")) {
772             off_flags = ~(FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH);
773 67 50         } else if(EQUALS_COMMAND(argvlen[0], argv[0], "WATCH")) {
    0          
774             on_flags = FLAG_INSIDE_WATCH;
775 67 50         } else if(EQUALS_COMMAND(argvlen[0], argv[0], "UNWATCH")) {
    0          
776             off_flags = ~FLAG_INSIDE_WATCH;
777             }
778              
779 68 100         if(cb) {
780             redis_fast_async_cb_t *cbt;
781 1           Newx(cbt, sizeof(redis_fast_async_cb_t), redis_fast_async_cb_t);
782 1           cbt->cb = SvREFCNT_inc(cb);
783 1           cbt->custom_decode = custom_decode;
784 1           cbt->collect_errors = collect_errors;
785 1           cbt->on_flags = on_flags;
786 1           cbt->off_flags = off_flags;
787 1           cbt->command_name = argv[0];
788 1           cbt->command_length = argvlen[0];
789 1           redisAsyncCommandArgv(
790             self->ac, Redis__Fast_async_reply_cb, cbt,
791             argc, argv, argvlen
792             );
793 1           ret.result = sv_2mortal(newSViv(1));
794             } else {
795             redis_fast_sync_cb_t *cbt;
796 67 100         int i, cnt = (self->reconnect == 0 ? 1 : 2);
797             int res = WAIT_FOR_EVENT_OK;
798 67 50         for(i = 0; i < cnt; i++) {
799 67           Newx(cbt, sizeof(redis_fast_sync_cb_t), redis_fast_sync_cb_t);
800 67           self->need_reconnect = 0;
801 67           cbt->ret.result = NULL;
802 67           cbt->ret.error = NULL;
803 67           cbt->custom_decode = custom_decode;
804 67           cbt->collect_errors = collect_errors;
805 67           cbt->on_flags = on_flags;
806 67           cbt->off_flags = off_flags;
807 67 50         DEBUG_MSG("%s", "send command in sync mode");
808 67           redisAsyncCommandArgv(
809             self->ac, Redis__Fast_sync_reply_cb, cbt,
810             argc, argv, argvlen
811             );
812 67 50         DEBUG_MSG("%s", "waiting response");
813 67           res = _wait_all_responses(self);
814 67 100         if(res == WAIT_FOR_EVENT_OK && self->need_reconnect == 0) {
    50          
815             int _need_reconnect = 0;
816 66 100         if (1 < cnt - i) {
817 65           _need_reconnect = Redis__Fast_call_reconnect_on_error(
818             self, cbt->ret, argv[0], argvlen[0]
819             );
820             // Should be quit before reconnect
821 65 50         if (_need_reconnect) {
822 0           Redis__Fast_quit(self);
823             }
824             }
825 66 50         if (!_need_reconnect) {
826 66           ret = cbt->ret;
827 66 100         if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
    50          
828 66 50         DEBUG_MSG("finish %s", argv[0]);
829 66           return ret;
830             }
831             }
832              
833 1 50         if( res == WAIT_FOR_EVENT_READ_TIMEOUT ) break;
834              
835 0 0         if(self->flags & (FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH)) {
836             char *msg = "reconnect disabled inside transaction or watch";
837 0 0         DEBUG_MSG("error: %s", msg);
838 0           ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
839 0           return ret;
840             }
841              
842 0           Redis__Fast_reconnect(self);
843 0 0         if(!self->ac) {
844             char *msg = "Not connected to any server";
845 0 0         DEBUG_MSG("error: %s", msg);
846 0           ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
847 0           return ret;
848             }
849             }
850              
851 1 50         if( res == WAIT_FOR_EVENT_OK && (cbt->ret.result || cbt->ret.error) ) Safefree(cbt);
    0          
    0          
852             // else destructor will release cbt
853              
854 1 50         if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) {
855 1           snprintf(self->error, MAX_ERROR_SIZE, "Error while reading from Redis server: %s", strerror(EAGAIN));
856 1           errno = EAGAIN;
857 1 50         DEBUG_MSG("error: %s", self->error);
858 1           ret.error = sv_2mortal(newSVpvn(self->error, strlen(self->error)));
859 1           return ret;
860             }
861 0 0         if(!self->ac) {
862             char *msg = "Not connected to any server";
863 0 0         DEBUG_MSG("error: %s", msg);
864 0           ret.error = sv_2mortal(newSVpvn(msg, strlen(msg)));
865 0           return ret;
866             }
867             }
868 1 50         DEBUG_MSG("Finish %s", argv[0]);
869 1           return ret;
870             }
871              
872 0           static redis_fast_reply_t Redis__Fast_keys_custom_decode(Redis__Fast self, redisReply* reply, int collect_errors) {
873             // TODO: Support redis <= 1.2.6
874 0           return Redis__Fast_decode_reply(self, reply, collect_errors);
875             }
876              
877 0           static redis_fast_reply_t Redis__Fast_info_custom_decode(Redis__Fast self, redisReply* reply, int collect_errors) {
878             redis_fast_reply_t res = {NULL, NULL};
879              
880 0 0         if(reply->type == REDIS_REPLY_STRING ||
881             reply->type == REDIS_REPLY_STATUS) {
882              
883 0           HV* hv = newHV();
884 0           char* str = reply->str;
885 0           size_t len = reply->len;
886 0           res.result = sv_2mortal(newRV_noinc((SV*)hv));
887              
888 0 0         while(len != 0) {
889 0           const char* line = (char*)memchr(str, '\r', len);
890             const char* sep;
891             size_t linelen;
892 0 0         if(line == NULL) {
893             linelen = len;
894             } else {
895 0           linelen = line - str;
896             }
897 0           sep = (char*)memchr(str, ':', linelen);
898 0 0         if(str[0] != '#' && sep != NULL) {
    0          
899             SV* val;
900             SV** ret;
901             size_t keylen;
902 0           keylen = sep - str;
903 0           val = sv_2mortal(newSVpvn(sep + 1, linelen - keylen - 1));
904 0           ret = hv_store(hv, str, keylen, SvREFCNT_inc(val), 0);
905 0 0         if (ret == NULL) {
906             SvREFCNT_dec(val);
907 0           croak("failed to hv_store");
908             }
909             }
910 0 0         if(line == NULL) {
911             break;
912             } else {
913 0           len -= linelen + 2;
914 0           str += linelen + 2;
915             }
916             }
917             } else {
918 0           res = Redis__Fast_decode_reply(self, reply, collect_errors);
919             }
920              
921 0           return res;
922             }
923              
924             MODULE = Redis::Fast PACKAGE = Redis::Fast
925              
926             SV*
927             _new(char* cls);
928             PREINIT:
929             redis_fast_t* self;
930             CODE:
931             {
932 70           Newxz(self, sizeof(redis_fast_t), redis_fast_t);
933 70 50         DEBUG_MSG("%s", "start");
934 70           self->error = (char*)malloc(MAX_ERROR_SIZE);
935 70           self->reconnect_on_error = NULL;
936 70           self->next_reconnect_on_error_at = -1;
937 70           self->is_connected = 1;
938 70           ST(0) = sv_newmortal();
939 70           sv_setref_pv(ST(0), cls, (void*)self);
940 70 50         DEBUG_MSG("return %p", ST(0));
941 70           XSRETURN(1);
942             }
943             OUTPUT:
944             RETVAL
945              
946             double
947             __set_reconnect(Redis::Fast self, double val)
948             CODE:
949             {
950 206           RETVAL = self->reconnect = val;
951             }
952             OUTPUT:
953             RETVAL
954              
955              
956             double
957             __get_reconnect(Redis::Fast self)
958             CODE:
959             {
960 68           RETVAL = self->reconnect;
961             }
962             OUTPUT:
963             RETVAL
964              
965              
966             int
967             __set_every(Redis::Fast self, int val)
968             CODE:
969             {
970 70           RETVAL = self->every = val;
971             }
972             OUTPUT:
973             RETVAL
974              
975              
976             int
977             __get_every(Redis::Fast self)
978             CODE:
979             {
980 0           RETVAL = self->every;
981             }
982             OUTPUT:
983             RETVAL
984              
985             int
986             __set_debug(Redis::Fast self, int val)
987             CODE:
988             {
989 70           RETVAL = self->debug = val;
990             }
991             OUTPUT:
992             RETVAL
993              
994             double
995             __set_cnx_timeout(Redis::Fast self, double val)
996             CODE:
997             {
998 70           RETVAL = self->cnx_timeout = val;
999             }
1000             OUTPUT:
1001             RETVAL
1002              
1003             double
1004             __get_cnx_timeout(Redis::Fast self)
1005             CODE:
1006             {
1007 0           RETVAL = self->cnx_timeout;
1008             }
1009             OUTPUT:
1010             RETVAL
1011              
1012              
1013             double
1014             __set_read_timeout(Redis::Fast self, double val)
1015             CODE:
1016             {
1017 70           RETVAL = self->read_timeout = val;
1018             }
1019             OUTPUT:
1020             RETVAL
1021              
1022             double
1023             __get_read_timeout(Redis::Fast self)
1024             CODE:
1025             {
1026 0           RETVAL = self->read_timeout;
1027             }
1028             OUTPUT:
1029             RETVAL
1030              
1031              
1032             double
1033             __set_write_timeout(Redis::Fast self, double val)
1034             CODE:
1035             {
1036 70           RETVAL = self->write_timeout = val;
1037             }
1038             OUTPUT:
1039             RETVAL
1040              
1041             double
1042             __get_write_timeout(Redis::Fast self)
1043             CODE:
1044             {
1045 0           RETVAL = self->write_timeout;
1046             }
1047             OUTPUT:
1048             RETVAL
1049              
1050              
1051             int
1052             __set_current_database(Redis::Fast self, int val)
1053             CODE:
1054             {
1055 0           RETVAL = self->current_database = val;
1056             }
1057             OUTPUT:
1058             RETVAL
1059              
1060              
1061             int
1062             __get_current_database(Redis::Fast self)
1063             CODE:
1064             {
1065 0           RETVAL = self->current_database;
1066             }
1067             OUTPUT:
1068             RETVAL
1069              
1070              
1071             int
1072             __sock(Redis::Fast self)
1073             CODE:
1074             {
1075 0 0         RETVAL = self->ac ? self->ac->c.fd : 0;
1076             }
1077             OUTPUT:
1078             RETVAL
1079              
1080              
1081             int
1082             __get_port(Redis::Fast self)
1083             CODE:
1084             {
1085             struct sockaddr_in addr;
1086             socklen_t len;
1087 0           len = sizeof( addr );
1088 0           getsockname( self->ac->c.fd, ( struct sockaddr *)&addr, &len );
1089 0           RETVAL = addr.sin_port;
1090             }
1091             OUTPUT:
1092             RETVAL
1093              
1094              
1095             void
1096             __set_on_connect(Redis::Fast self, SV* func)
1097             CODE:
1098             {
1099 70           self->on_connect = SvREFCNT_inc(func);
1100             }
1101              
1102             void
1103             __set_on_build_sock(Redis::Fast self, SV* func)
1104             CODE:
1105             {
1106 0           self->on_build_sock = SvREFCNT_inc(func);
1107             }
1108              
1109             void
1110             __set_data(Redis::Fast self, SV* data)
1111             CODE:
1112             {
1113 70           self->data = SvREFCNT_inc(data);
1114             }
1115              
1116             void
1117             __get_data(Redis::Fast self)
1118             CODE:
1119             {
1120 272           ST(0) = self->data;
1121 272           XSRETURN(1);
1122             }
1123              
1124             void
1125             __set_reconnect_on_error(Redis::Fast self, SV* func)
1126             CODE:
1127             {
1128 0           self->reconnect_on_error = SvREFCNT_inc(func);
1129             }
1130              
1131             double
1132             __set_next_reconnect_on_error_at(Redis::Fast self, double val)
1133             CODE:
1134             {
1135             struct timeval current;
1136             double current_sec;
1137              
1138 0 0         if ( -1 < val ) {
1139 0           gettimeofday(¤t, NULL);
1140 0           current_sec = current.tv_sec + 1E-6 * current.tv_usec;
1141 0           val += current_sec;
1142             }
1143              
1144 0           RETVAL = self->next_reconnect_on_error_at = val;
1145             }
1146             OUTPUT:
1147             RETVAL
1148              
1149             void
1150             is_subscriber(Redis::Fast self)
1151             CODE:
1152             {
1153 139           ST(0) = sv_2mortal(newSViv(self->is_subscriber));
1154 139           XSRETURN(1);
1155             }
1156              
1157              
1158             void
1159             DESTROY(Redis::Fast self);
1160             CODE:
1161             {
1162 70 50         DEBUG_MSG("%s", "start");
1163 70 100         if (self->ac) {
1164 62 50         DEBUG_MSG("%s", "free ac");
1165 62           redisAsyncFree(self->ac);
1166 62           _wait_all_responses(self);
1167 62           self->ac = NULL;
1168             }
1169              
1170 70 100         if(self->hostname) {
1171 4 50         DEBUG_MSG("%s", "free hostname");
1172 4           free(self->hostname);
1173 4           self->hostname = NULL;
1174             }
1175              
1176 70 100         if(self->path) {
1177 66 50         DEBUG_MSG("%s", "free path");
1178 66           free(self->path);
1179 66           self->path = NULL;
1180             }
1181              
1182 70 50         if(self->error) {
1183 70 50         DEBUG_MSG("%s", "free error");
1184 70           free(self->error);
1185 70           self->error = NULL;
1186             }
1187              
1188 70 50         if(self->on_connect) {
1189 70 50         DEBUG_MSG("%s", "free on_connect");
1190 70           SvREFCNT_dec(self->on_connect);
1191 70           self->on_connect = NULL;
1192             }
1193              
1194 70 50         if(self->on_build_sock) {
1195 0 0         DEBUG_MSG("%s", "free on_build_sock");
1196 0           SvREFCNT_dec(self->on_build_sock);
1197 0           self->on_build_sock = NULL;
1198             }
1199              
1200 70 50         if(self->data) {
1201 70 50         DEBUG_MSG("%s", "free data");
1202 70           SvREFCNT_dec(self->data);
1203 70           self->data = NULL;
1204             }
1205              
1206 70 50         if(self->reconnect_on_error) {
1207 0 0         DEBUG_MSG("%s", "free reconnect_on_error");
1208 0           SvREFCNT_dec(self->reconnect_on_error);
1209 0           self->reconnect_on_error = NULL;
1210             }
1211              
1212 70 50         DEBUG_MSG("%s", "finish");
1213 70           Safefree(self);
1214             }
1215              
1216              
1217             void
1218             __connection_info(Redis::Fast self, char* hostname, int port = 6379)
1219             CODE:
1220             {
1221 4 50         if(self->hostname) {
1222 0           free(self->hostname);
1223 0           self->hostname = NULL;
1224             }
1225              
1226 4 50         if(self->path) {
1227 0           free(self->path);
1228 0           self->path = NULL;
1229             }
1230              
1231 4 50         if(hostname) {
1232 4           self->hostname = (char*)malloc(strlen(hostname) + 1);
1233             strcpy(self->hostname, hostname);
1234             }
1235              
1236 4           self->port = port;
1237             }
1238              
1239             void
1240             __connection_info_unix(Redis::Fast self, char* path)
1241             CODE:
1242             {
1243 66 50         if(self->hostname) {
1244 0           free(self->hostname);
1245 0           self->hostname = NULL;
1246             }
1247              
1248 66 50         if(self->path) {
1249 0           free(self->path);
1250 0           self->path = NULL;
1251             }
1252              
1253 66 50         if(path) {
1254 66           self->path = (char*)malloc(strlen(path) + 1);
1255             strcpy(self->path, path);
1256             }
1257             }
1258              
1259              
1260             void
1261             connect(Redis::Fast self)
1262             CODE:
1263             {
1264 69           Redis__Fast_connect(self);
1265             }
1266              
1267             void
1268             wait_all_responses(Redis::Fast self)
1269             CODE:
1270             {
1271 1           int res = _wait_all_responses(self);
1272 1 50         if(res != WAIT_FOR_EVENT_OK) {
1273 0           croak("Error while reading from Redis server");
1274             }
1275              
1276 1 50         if (0 < self->reconnect && self->need_reconnect) {
    50          
1277             // Should be quit before reconnect
1278 0           Redis__Fast_quit(self);
1279 0           Redis__Fast_reconnect(self);
1280 0           self->need_reconnect = 0;
1281             }
1282             }
1283              
1284             void
1285             wait_one_response(Redis::Fast self)
1286             CODE:
1287             {
1288 0           int res = _wait_all_responses(self);
1289 0 0         if(res != WAIT_FOR_EVENT_OK) {
1290 0           croak("Error while reading from Redis server");
1291             }
1292              
1293 0 0         if (0 < self->reconnect && self->need_reconnect) {
    0          
1294             // Should be quit before reconnect
1295 0           Redis__Fast_quit(self);
1296 0           Redis__Fast_reconnect(self);
1297 0           self->need_reconnect = 0;
1298             }
1299             }
1300              
1301             void
1302             __std_cmd(Redis::Fast self, ...)
1303             PREINIT:
1304             redis_fast_reply_t ret;
1305             SV* cb;
1306             char** argv;
1307             size_t* argvlen;
1308             STRLEN len;
1309             int argc, i, collect_errors;
1310             CODE:
1311             {
1312 68           Redis__Fast_reconnect(self);
1313 68 50         if(!self->ac) {
1314 0           croak("Not connected to any server");
1315             }
1316              
1317 68           cb = ST(items - 1);
1318 68 100         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    50          
1319 1           argc = items - 2;
1320             } else {
1321             cb = NULL;
1322 67           argc = items - 1;
1323             }
1324 68 50         Newx(argv, sizeof(char*) * argc, char*);
1325 68 50         Newx(argvlen, sizeof(size_t) * argc, size_t);
1326              
1327 204 100         for (i = 0; i < argc; i++) {
1328 136 50         if(!sv_utf8_downgrade(ST(i + 1), 1)) {
1329 0           croak("command sent is not an octet sequence in the native encoding (Latin-1). Consider using debug mode to see the command itself.");
1330             }
1331 136 50         argv[i] = SvPV(ST(i + 1), len);
1332 136           argvlen[i] = len;
1333             }
1334              
1335             collect_errors = 0;
1336 68 100         if(cb && EQUALS_COMMAND(argvlen[0], argv[0], "EXEC"))
    50          
    50          
1337             collect_errors = 1;
1338              
1339 68           ret = Redis__Fast_run_cmd(self, collect_errors, NULL, cb, argc, (const char**)argv, argvlen);
1340              
1341 68           Safefree(argv);
1342 68           Safefree(argvlen);
1343              
1344 68 100         ST(0) = ret.result ? ret.result : &PL_sv_undef;
1345 68 100         ST(1) = ret.error ? ret.error : &PL_sv_undef;
1346 68           XSRETURN(2);
1347             }
1348              
1349              
1350             void
1351             __quit(Redis::Fast self)
1352             CODE:
1353             {
1354 0 0         DEBUG_MSG("%s", "start QUIT");
1355 0 0         if(self->ac) {
1356 0           Redis__Fast_quit(self);
1357 0           ST(0) = sv_2mortal(newSViv(1));
1358 0           XSRETURN(1);
1359             } else {
1360 0 0         DEBUG_MSG("%s", "finish. there is no connection.");
1361 0           XSRETURN(0);
1362             }
1363             }
1364              
1365              
1366             void
1367             __shutdown(Redis::Fast self)
1368             CODE:
1369             {
1370 0 0         DEBUG_MSG("%s", "start SHUTDOWN");
1371 0 0         if(self->ac) {
1372 0           redisAsyncCommand(
1373             self->ac, NULL, NULL, "SHUTDOWN"
1374             );
1375 0           redisAsyncDisconnect(self->ac);
1376 0           _wait_all_responses(self);
1377 0           self->is_connected = 0;
1378 0           ST(0) = sv_2mortal(newSViv(1));
1379 0           XSRETURN(1);
1380             } else {
1381 0 0         DEBUG_MSG("%s", "redis server has alread shutdown");
1382 0           XSRETURN(0);
1383             }
1384             }
1385              
1386              
1387             void
1388             __keys(Redis::Fast self, ...)
1389             PREINIT:
1390             redis_fast_reply_t ret;
1391             SV* cb;
1392             char** argv;
1393             size_t* argvlen;
1394             STRLEN len;
1395             int argc, i;
1396             CODE:
1397             {
1398 0           Redis__Fast_reconnect(self);
1399 0 0         if(!self->ac) {
1400 0           croak("Not connected to any server");
1401             }
1402              
1403 0           cb = ST(items - 1);
1404 0 0         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    0          
1405 0           argc = items - 1;
1406             } else {
1407             cb = NULL;
1408             argc = items;
1409             }
1410 0 0         Newx(argv, sizeof(char*) * argc, char*);
1411 0 0         Newx(argvlen, sizeof(size_t) * argc, size_t);
1412              
1413 0           argv[0] = "KEYS";
1414 0           argvlen[0] = 4;
1415 0 0         for (i = 1; i < argc; i++) {
1416 0 0         argv[i] = SvPV(ST(i), len);
1417 0           argvlen[i] = len;
1418             }
1419              
1420 0           ret = Redis__Fast_run_cmd(self, 0, Redis__Fast_keys_custom_decode, cb, argc, (const char**)argv, argvlen);
1421 0           Safefree(argv);
1422 0           Safefree(argvlen);
1423              
1424 0 0         ST(0) = ret.result ? ret.result : &PL_sv_undef;
1425 0 0         ST(1) = ret.error ? ret.error : &PL_sv_undef;
1426 0           XSRETURN(2);
1427             }
1428              
1429              
1430             void
1431             __info(Redis::Fast self, ...)
1432             PREINIT:
1433             redis_fast_reply_t ret;
1434             SV* cb;
1435             char** argv;
1436             size_t* argvlen;
1437             STRLEN len;
1438             int argc, i;
1439             CODE:
1440             {
1441 3           Redis__Fast_reconnect(self);
1442 3 50         if(!self->ac) {
1443             char *msg = "Not connected to any server";
1444 3           ST(0) = &PL_sv_undef;
1445 3           ST(1) = sv_2mortal(newSVpvn(msg, strlen(msg)));
1446 3           XSRETURN(2);
1447             }
1448              
1449 0           cb = ST(items - 1);
1450 0 0         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    0          
1451 0           argc = items - 1;
1452             } else {
1453             cb = NULL;
1454             argc = items;
1455             }
1456 0 0         Newx(argv, sizeof(char*) * argc, char*);
1457 0 0         Newx(argvlen, sizeof(size_t) * argc, size_t);
1458              
1459 0           argv[0] = "INFO";
1460 0           argvlen[0] = 4;
1461 0 0         for (i = 1; i < argc; i++) {
1462 0 0         argv[i] = SvPV(ST(i), len);
1463 0           argvlen[i] = len;
1464             }
1465              
1466 0           ret = Redis__Fast_run_cmd(self, 0, Redis__Fast_info_custom_decode, cb, argc, (const char**)argv, argvlen);
1467 0           Safefree(argv);
1468 0           Safefree(argvlen);
1469              
1470 0 0         ST(0) = ret.result ? ret.result : &PL_sv_undef;
1471 0 0         ST(1) = ret.error ? ret.error : &PL_sv_undef;
1472 0           XSRETURN(2);
1473             }
1474              
1475              
1476             void
1477             __send_subscription_cmd(Redis::Fast self, ...)
1478             PREINIT:
1479             SV* cb;
1480             char** argv;
1481             size_t* argvlen;
1482             STRLEN len;
1483             int argc, i;
1484             redis_fast_subscribe_cb_t* cbt;
1485             int pvariant;
1486             CODE:
1487             {
1488 0 0         int cnt = (self->reconnect == 0 ? 1 : 2);
1489              
1490 0 0         DEBUG_MSG("%s", "start");
1491              
1492 0           Redis__Fast_reconnect(self);
1493 0 0         if(!self->ac) {
1494 0           croak("Not connected to any server");
1495             }
1496              
1497 0 0         if(!self->is_subscriber) {
1498 0           _wait_all_responses(self);
1499             }
1500 0           cb = ST(items - 1);
1501 0 0         if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
    0          
1502 0           argc = items - 2;
1503             } else {
1504             cb = NULL;
1505 0           argc = items - 1;
1506             }
1507 0 0         Newx(argv, sizeof(char*) * argc, char*);
1508 0 0         Newx(argvlen, sizeof(size_t) * argc, size_t);
1509              
1510 0 0         for (i = 0; i < argc; i++) {
1511 0 0         argv[i] = SvPV(ST(i+1), len);
1512 0           argvlen[i] = len;
1513 0 0         DEBUG_MSG("argv[%d] = %s", i, argv[i]);
1514             }
1515              
1516 0 0         for(i = 0; i < cnt; i++) {
1517 0           pvariant = tolower(argv[0][0]) == 'p';
1518 0 0         if (strcasecmp(argv[0]+pvariant,"unsubscribe") != 0) {
1519 0 0         DEBUG_MSG("%s", "command is not unsubscribe");
1520 0           Newx(cbt, sizeof(redis_fast_subscribe_cb_t), redis_fast_subscribe_cb_t);
1521 0           cbt->self = self;
1522 0           cbt->cb = SvREFCNT_inc(cb);
1523             } else {
1524 0 0         DEBUG_MSG("%s", "command is unsubscribe");
1525             cbt = NULL;
1526             }
1527 0 0         redisAsyncCommandArgv(
1528             self->ac, cbt ? Redis__Fast_subscribe_cb : NULL, cbt,
1529             argc, (const char**)argv, argvlen
1530             );
1531 0           self->expected_subs = argc - 1;
1532 0 0         while(self->expected_subs > 0 && wait_for_event(self, self->read_timeout, self->write_timeout) == WAIT_FOR_EVENT_OK) ;
    0          
1533 0 0         if(self->expected_subs == 0) break;
1534 0           Redis__Fast_reconnect(self);
1535 0 0         if(!self->ac) {
1536 0           Safefree(argv);
1537 0           Safefree(argvlen);
1538 0           croak("Not connected to any server");
1539             }
1540             }
1541              
1542 0           Safefree(argv);
1543 0           Safefree(argvlen);
1544 0 0         DEBUG_MSG("%s", "finish");
1545 0           XSRETURN(0);
1546             }
1547              
1548             void
1549             wait_for_messages(Redis::Fast self, double timeout = -1)
1550             CODE:
1551             {
1552 0 0         int i, cnt = (self->reconnect == 0 ? 1 : 2);
1553             int res = WAIT_FOR_EVENT_OK;
1554 0 0         DEBUG_MSG("%s", "start");
1555 0           self->proccess_sub_count = 0;
1556 0 0         for(i = 0; i < cnt; i++) {
1557 0 0         while((res = wait_for_event(self, timeout, timeout)) == WAIT_FOR_EVENT_OK) ;
1558 0 0         if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) break;
1559 0           Redis__Fast_reconnect(self);
1560 0 0         if(!self->ac) {
1561 0           croak("Not connected to any server");
1562             }
1563             }
1564 0 0         if(res == WAIT_FOR_EVENT_EXCEPTION) {
1565 0 0         if(!self->ac) {
1566 0 0         DEBUG_MSG("%s", "Connection not found");
1567 0           croak("EOF from server");
1568 0 0         } else if(self->ac->c.err == REDIS_ERR_EOF) {
1569 0 0         DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1570 0           croak("EOF from server");
1571             } else {
1572 0 0         DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1573 0           snprintf(self->error, MAX_ERROR_SIZE, "[WAIT_FOR_MESSAGES] %s", self->ac->c.errstr);
1574 0           croak("%s", self->error);
1575             }
1576             }
1577 0           ST(0) = sv_2mortal(newSViv(self->proccess_sub_count));
1578 0 0         DEBUG_MSG("finish with %d", res);
1579 0           XSRETURN(1);
1580             }
1581              
1582             void
1583             __wait_for_event(Redis::Fast self, double timeout = -1)
1584             CODE:
1585             {
1586 0 0         DEBUG_MSG("%s", "start");
1587 0           wait_for_event(self, timeout, timeout);
1588 0 0         DEBUG_MSG("%s", "finish");
1589 0           XSRETURN(0);
1590             }