Branch Coverage

lib/Redis/Fast.xs
Criterion Covered Total %
branch 205 636 32.2


line true false branch
106 0 203 DEBUG_MSG("flags = %x", e->flags);
113 0 0 DEBUG_MSG("flags = %x", e->flags);
120 0 136 DEBUG_MSG("flags = %x", e->flags);
127 0 136 DEBUG_MSG("flags = %x", e->flags);
139 68 0 if (ac->ev.data != NULL)
168 55239 0 if(self==NULL) return WAIT_FOR_EVENT_EXCEPTION;
169 55239 0 if(self->ac==NULL) return WAIT_FOR_EVENT_EXCEPTION;
174 55239 0 if(e==NULL) return 0;
176 68 55171 if((e->flags & (WAIT_FOR_READ|WAIT_FOR_WRITE)) == (WAIT_FOR_READ|WAIT_FOR_WRITE)) {
177 0 68 DEBUG_MSG("set READ and WRITE, compare read_timeout = %f and write_timeout = %f",
179 66 2 if(read_timeout < 0 && write_timeout < 0) {
0 66 if(read_timeout < 0 && write_timeout < 0) {
182 2 0 } else if(read_timeout < 0) {
185 0 2 } else if(write_timeout < 0) {
188 0 0 } else if(read_timeout < write_timeout) {
195 68 55103 } else if(e->flags & WAIT_FOR_READ) {
196 68 0 DEBUG_MSG("set READ, read_timeout = %f", read_timeout);
199 0 55103 } else if(e->flags & WAIT_FOR_WRITE) {
200 55103 0 DEBUG_MSG("set WRITE, write_timeout = %f", write_timeout);
206 4 55235 if (timeout < 0) {
211 0 55239 DEBUG_MSG("select start, timeout is %f", timeout);
215 136 55103 if(e->flags & WAIT_FOR_READ) { pollfd.events |= POLLIN; }
216 55171 68 if(e->flags & WAIT_FOR_WRITE) { pollfd.events |= POLLOUT; }
218 0 55239 DEBUG_MSG("poll returns %d", rc);
219 1 55238 if(rc == 0) {
220 0 1 DEBUG_MSG("%s", "timeout");
224 0 55238 if(rc < 0) {
225 0 0 DEBUG_MSG("exception: %s", strerror(errno));
226 0 0 if( errno == EINTR ) {
227 0 0 PERL_ASYNC_CHECK();
228 0 0 DEBUG_MSG("%s", "recieved interrupt. retry wait_for_event");
233 55238 0 if(self->ac && (pollfd.revents & POLLIN) != 0) {
67 55171 if(self->ac && (pollfd.revents & POLLIN) != 0) {
234 0 67 DEBUG_MSG("ready to %s", "read");
237 55232 6 if(self->ac && (pollfd.revents & (POLLOUT|POLLHUP)) != 0) {
55171 61 if(self->ac && (pollfd.revents & (POLLOUT|POLLHUP)) != 0) {
238 0 55171 DEBUG_MSG("ready to %s", "write");
241 0 55238 if((pollfd.revents & (POLLERR|POLLNVAL)) != 0) {
242 0 0 DEBUG_MSG(
0 0 DEBUG_MSG(
0 0 DEBUG_MSG(
249 0 55238 DEBUG_MSG("%s", "finish");
255 0 130 DEBUG_MSG("%s", "start");
256 197 68 while(self->ac && self->ac->replies.tail) {
136 61 while(self->ac && self->ac->replies.tail) {
258 135 1 if (res != WAIT_FOR_EVENT_OK) {
259 0 1 DEBUG_MSG("error: %d", res);
263 0 129 DEBUG_MSG("%s", "finish");
270 0 68 DEBUG_MSG("connected status = %d", status);
271 0 68 if(status != REDIS_OK) {
283 0 68 DEBUG_MSG("disconnected status = %d", status);
293 0 69 DEBUG_MSG("%s", "start");
295 0 69 if(self->on_build_sock) {
301 0 0 PUSHMARK(SP);
304 0 0 FREETMPS;
308 66 3 if(self->path) {
314 0 69 if(ac == NULL) {
315 0 0 DEBUG_MSG("%s", "allocation error");
318 1 68 if(ac->err) {
319 0 1 DEBUG_MSG("connection error: %s", ac->errstr);
333 68 0 if(self->cnx_timeout) {
336 55103 68 while(!self->is_connected) {
338 0 55103 if(self->ac == NULL) {
345 0 55103 if(res != WAIT_FOR_EVENT_OK) {
346 0 0 DEBUG_MSG("error: %d", res);
360 68 0 if(self->on_connect){
362 0 68 PUSHMARK(SP);
366 0 68 DEBUG_MSG("%s", "finish");
374 0 69 DEBUG_MSG("%s", "start");
381 3 66 if(self->reconnect == 0) {
382 1 2 if(! __build_sock(aTHX_ self)) {
383 0 1 if(self->path) {
397 66 0 if(__build_sock(aTHX_ self)) {
399 0 66 DEBUG_MSG("%s", "finish");
404 0 0 DEBUG_MSG("elapsed time:%f, reconnect:%lf", elapsed_time, self->reconnect);
405 0 0 if( elapsed_time > self->reconnect) {
406 0 0 if(self->path) {
411 0 0 DEBUG_MSG("%s", "timed out");
415 0 0 DEBUG_MSG("%s", "failed to connect. wait...");
424 0 71 DEBUG_MSG("%s", "start");
425 71 0 if(self->is_connected && !self->ac && self->reconnect > 0) {
3 68 if(self->is_connected && !self->ac && self->reconnect > 0) {
0 3 if(self->is_connected && !self->ac && self->reconnect > 0) {
426 0 0 DEBUG_MSG("%s", "connection not found. reconnect");
429 3 68 if(!self->ac) {
430 0 3 DEBUG_MSG("%s", "Not connected to any server");
432 0 71 DEBUG_MSG("%s", "finish");
459 73 20 for (i = 0; i < reply->elements; i++) {
461 3 70 if(collect_errors) {
463 2 1 if(elem.result) {
468 1 2 if(elem.error) {
475 68 2 if(elem.result) {
480 2 68 if(elem.error && !res.error) {
2 0 if(elem.error && !res.error) {
501 13 52 if (ret.error == NULL) {
504 0 13 if (self->reconnect_on_error == NULL) {
510 0 0 if( self->next_reconnect_on_error_at < 0 ||
0 0 if( self->next_reconnect_on_error_at < 0 ||
516 0 0 sv_ret = ret.result ? ret.result : &PL_sv_undef;
520 0 0 PUSHMARK(SP);
521 0 0 XPUSHs(sv_err);
522 0 0 XPUSHs(sv_ret);
523 0 0 XPUSHs(sv_cmd);
530 0 0 if (count != 1) {
533 0 0 _need_reconnect = POPi;
536 0 0 FREETMPS;
548 0 67 DEBUG_MSG("%p", (void*)privdata);
549 66 1 if(reply) {
551 0 66 if(cbt->custom_decode) {
556 1 0 } else if(c->c.flags & REDIS_FREEING) {
557 0 1 DEBUG_MSG("%s", "redis freeing");
560 0 0 DEBUG_MSG("connect error: %s", c->errstr);
565 0 67 DEBUG_MSG("%s", "finish");
573 0 1 DEBUG_MSG("%p, %p", reply, privdata);
574 1 0 if (reply) {
585 0 1 if(cbt->custom_decode) {
591 0 1 if(result.result == NULL) result.result = &PL_sv_undef;
592 1 0 if(result.error == NULL) result.error = &PL_sv_undef;
594 0 1 PUSHMARK(SP);
595 0 1 XPUSHs(result.result);
596 0 1 XPUSHs(result.error);
601 1 0 FREETMPS;
606 1 0 if (0 < self->reconnect && !self->need_reconnect
1 0 if (0 < self->reconnect && !self->need_reconnect
608 0 1 && self->reconnect_on_error != NULL) {
610 0 0 if(cbt->custom_decode) {
625 0 0 if (c->c.flags & REDIS_FREEING) {
626 0 0 DEBUG_MSG("%s", "redis freeing");
628 0 0 DEBUG_MSG("connect error: %s", c->errstr);
641 0 0 if (c->c.flags & REDIS_FREEING) {
646 0 0 DEBUG_MSG("error: %s", msg);
649 0 0 PUSHMARK(SP);
650 0 0 XPUSHs(result.result);
651 0 0 XPUSHs(result.error);
656 0 0 FREETMPS;
673 0 0 DEBUG_MSG("%s", "start");
674 0 0 if(!cbt) {
675 0 0 DEBUG_MSG("%s", "cbt is empty finished");
679 0 0 if (r) {
690 0 0 if (strcasecmp(stype+pvariant,"subscribe") == 0) {
691 0 0 DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
694 0 0 } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
695 0 0 DEBUG_MSG("%s %s %lld", r->element[0]->str, r->element[1]->str, r->element[2]->integer);
700 0 0 DEBUG_MSG("%s %s", r->element[0]->str, r->element[1]->str);
704 0 0 if(res.result == NULL) res.result = &PL_sv_undef;
705 0 0 if(res.error == NULL) res.error = &PL_sv_undef;
707 0 0 PUSHMARK(SP);
708 0 0 XPUSHs(res.result);
709 0 0 XPUSHs(res.error);
714 0 0 FREETMPS;
717 0 0 DEBUG_MSG("connect error: %s", c->errstr);
721 0 0 if(is_need_free) {
723 0 0 DEBUG_MSG("destroy %p", cbt);
724 0 0 if(cbt->cb) {
730 0 0 DEBUG_MSG("%s", "finish");
736 0 0 if(!self->ac) {
753 0 0 if(_wait_all_responses(aTHX_ self) == WAIT_FOR_EVENT_OK) {
754 0 0 DEBUG_MSG("%s", "wait_all_responses ok");
755 0 0 if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
0 0 if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
757 0 0 DEBUG_MSG("%s", "wait_all_responses not ok");
758 0 0 if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
0 0 if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
760 0 0 DEBUG_MSG("%s", "finish");
767 0 68 DEBUG_MSG("start %s", argv[0]);
769 0 68 DEBUG_MSG("pid check: previous pid is %d, now %d", self->pid, getpid());
770 0 68 if(self->pid != getpid()) {
771 0 0 DEBUG_MSG("%s", "pid changed. create new connection..");
775 0 68 if(EQUALS_COMMAND(argvlen[0], argv[0], "MULTI")) {
0 0 if(EQUALS_COMMAND(argvlen[0], argv[0], "MULTI")) {
777 1 67 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "EXEC") ||
0 1 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "EXEC") ||
0 67 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "EXEC") ||
778 0 0 EQUALS_COMMAND(argvlen[0], argv[0], "DISCARD")) {
780 0 67 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "WATCH")) {
0 0 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "WATCH")) {
782 0 67 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "UNWATCH")) {
0 0 } else if(EQUALS_COMMAND(argvlen[0], argv[0], "UNWATCH")) {
786 1 67 if(cb) {
803 65 2 int i, cnt = (self->reconnect == 0 ? 1 : 2);
805 67 0 for(i = 0; i < cnt; i++) {
814 0 67 DEBUG_MSG("%s", "send command in sync mode");
819 0 67 DEBUG_MSG("%s", "waiting response");
821 66 1 if(res == WAIT_FOR_EVENT_OK && self->need_reconnect == 0) {
66 0 if(res == WAIT_FOR_EVENT_OK && self->need_reconnect == 0) {
823 65 1 if (1 < cnt - i) {
828 0 65 if (_need_reconnect) {
832 66 0 if (!_need_reconnect) {
834 11 55 if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
11 0 if(cbt->ret.result || cbt->ret.error) Safefree(cbt);
835 0 66 DEBUG_MSG("finish %s", argv[0]);
840 0 1 if( res == WAIT_FOR_EVENT_READ_TIMEOUT ) break;
842 0 0 if(self->flags & (FLAG_INSIDE_TRANSACTION | FLAG_INSIDE_WATCH)) {
844 0 0 DEBUG_MSG("error: %s", msg);
850 0 0 if(!self->ac) {
852 0 0 DEBUG_MSG("error: %s", msg);
858 0 1 if( res == WAIT_FOR_EVENT_OK && (cbt->ret.result || cbt->ret.error) ) Safefree(cbt);
0 0 if( res == WAIT_FOR_EVENT_OK && (cbt->ret.result || cbt->ret.error) ) Safefree(cbt);
0 0 if( res == WAIT_FOR_EVENT_OK && (cbt->ret.result || cbt->ret.error) ) Safefree(cbt);
861 1 0 if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) {
864 0 1 DEBUG_MSG("error: %s", self->error);
868 0 0 if(!self->ac) {
870 0 0 DEBUG_MSG("error: %s", msg);
875 0 1 DEBUG_MSG("Finish %s", argv[0]);
887 0 0 if(reply->type == REDIS_REPLY_STRING ||
895 0 0 while(len != 0) {
899 0 0 if(line == NULL) {
905 0 0 if(str[0] != '#' && sep != NULL) {
0 0 if(str[0] != '#' && sep != NULL) {
912 0 0 if (ret == NULL) {
917 0 0 if(line == NULL) {
940 0 70 DEBUG_MSG("%s", "start");
947 0 70 DEBUG_MSG("return %p", ST(0));
1082 0 0 RETVAL = self->ac ? self->ac->c.fd : 0;
1145 0 0 if ( -1 < val ) {
1169 0 70 DEBUG_MSG("%s", "start");
1170 62 8 if (self->ac) {
1171 0 62 DEBUG_MSG("%s", "free ac");
1177 4 66 if(self->hostname) {
1178 0 4 DEBUG_MSG("%s", "free hostname");
1183 66 4 if(self->path) {
1184 0 66 DEBUG_MSG("%s", "free path");
1189 70 0 if(self->error) {
1190 0 70 DEBUG_MSG("%s", "free error");
1195 70 0 if(self->on_connect) {
1196 0 70 DEBUG_MSG("%s", "free on_connect");
1201 0 70 if(self->on_build_sock) {
1202 0 0 DEBUG_MSG("%s", "free on_build_sock");
1207 70 0 if(self->data) {
1208 0 70 DEBUG_MSG("%s", "free data");
1213 0 70 if(self->reconnect_on_error) {
1214 0 0 DEBUG_MSG("%s", "free reconnect_on_error");
1219 0 70 DEBUG_MSG("%s", "finish");
1228 0 4 if(self->hostname) {
1233 0 4 if(self->path) {
1238 4 0 if(hostname) {
1250 0 66 if(self->hostname) {
1255 0 66 if(self->path) {
1260 66 0 if(path) {
1279 0 1 if(res != WAIT_FOR_EVENT_OK) {
1283 1 0 if (0 < self->reconnect && self->need_reconnect) {
0 1 if (0 < self->reconnect && self->need_reconnect) {
1296 0 0 if(res != WAIT_FOR_EVENT_OK) {
1300 0 0 if (0 < self->reconnect && self->need_reconnect) {
0 0 if (0 < self->reconnect && self->need_reconnect) {
1320 0 68 if(!self->ac) {
1325 1 67 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1 0 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1331 0 68 Newx(argv, sizeof(char*) * argc, char*);
1332 0 68 Newx(argvlen, sizeof(size_t) * argc, size_t);
1334 136 68 for (i = 0; i < argc; i++) {
1335 0 136 if(!sv_utf8_downgrade(ST(i + 1), 1)) {
1338 136 0 argv[i] = SvPV(ST(i + 1), len);
1343 1 67 if(cb && EQUALS_COMMAND(argvlen[0], argv[0], "EXEC"))
1 0 if(cb && EQUALS_COMMAND(argvlen[0], argv[0], "EXEC"))
1 0 if(cb && EQUALS_COMMAND(argvlen[0], argv[0], "EXEC"))
1351 56 12 ST(0) = ret.result ? ret.result : &PL_sv_undef;
1352 14 54 ST(1) = ret.error ? ret.error : &PL_sv_undef;
1361 0 0 DEBUG_MSG("%s", "start QUIT");
1362 0 0 if(self->ac) {
1367 0 0 DEBUG_MSG("%s", "finish. there is no connection.");
1377 0 0 DEBUG_MSG("%s", "start SHUTDOWN");
1378 0 0 if(self->ac) {
1388 0 0 DEBUG_MSG("%s", "redis server has alread shutdown");
1406 0 0 if(!self->ac) {
1411 0 0 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
0 0 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1417 0 0 Newx(argv, sizeof(char*) * argc, char*);
1418 0 0 Newx(argvlen, sizeof(size_t) * argc, size_t);
1422 0 0 for (i = 1; i < argc; i++) {
1423 0 0 argv[i] = SvPV(ST(i), len);
1431 0 0 ST(0) = ret.result ? ret.result : &PL_sv_undef;
1432 0 0 ST(1) = ret.error ? ret.error : &PL_sv_undef;
1449 3 0 if(!self->ac) {
1457 0 0 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
0 0 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1463 0 0 Newx(argv, sizeof(char*) * argc, char*);
1464 0 0 Newx(argvlen, sizeof(size_t) * argc, size_t);
1468 0 0 for (i = 1; i < argc; i++) {
1469 0 0 argv[i] = SvPV(ST(i), len);
1477 0 0 ST(0) = ret.result ? ret.result : &PL_sv_undef;
1478 0 0 ST(1) = ret.error ? ret.error : &PL_sv_undef;
1495 0 0 int cnt = (self->reconnect == 0 ? 1 : 2);
1497 0 0 DEBUG_MSG("%s", "start");
1500 0 0 if(!self->ac) {
1504 0 0 if(!self->is_subscriber) {
1508 0 0 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
0 0 if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
1514 0 0 Newx(argv, sizeof(char*) * argc, char*);
1515 0 0 Newx(argvlen, sizeof(size_t) * argc, size_t);
1517 0 0 for (i = 0; i < argc; i++) {
1518 0 0 argv[i] = SvPV(ST(i+1), len);
1520 0 0 DEBUG_MSG("argv[%d] = %s", i, argv[i]);
1523 0 0 for(i = 0; i < cnt; i++) {
1525 0 0 if (strcasecmp(argv[0]+pvariant,"unsubscribe") != 0) {
1526 0 0 DEBUG_MSG("%s", "command is not unsubscribe");
1531 0 0 DEBUG_MSG("%s", "command is unsubscribe");
1534 0 0 redisAsyncCommandArgv(
1539 0 0 while(self->expected_subs > 0 && wait_for_event(aTHX_ self, self->read_timeout, self->write_timeout) == WAIT_FOR_EVENT_OK) ;
0 0 while(self->expected_subs > 0 && wait_for_event(aTHX_ self, self->read_timeout, self->write_timeout) == WAIT_FOR_EVENT_OK) ;
1540 0 0 if(self->expected_subs == 0) break;
1542 0 0 if(!self->ac) {
1551 0 0 DEBUG_MSG("%s", "finish");
1559 0 0 int i, cnt = (self->reconnect == 0 ? 1 : 2);
1561 0 0 DEBUG_MSG("%s", "start");
1563 0 0 for(i = 0; i < cnt; i++) {
1564 0 0 while((res = wait_for_event(aTHX_ self, timeout, timeout)) == WAIT_FOR_EVENT_OK) ;
1565 0 0 if(res == WAIT_FOR_EVENT_READ_TIMEOUT || res == WAIT_FOR_EVENT_WRITE_TIMEOUT) break;
1567 0 0 if(!self->ac) {
1571 0 0 if(res == WAIT_FOR_EVENT_EXCEPTION) {
1572 0 0 if(!self->ac) {
1573 0 0 DEBUG_MSG("%s", "Connection not found");
1575 0 0 } else if(self->ac->c.err == REDIS_ERR_EOF) {
1576 0 0 DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1579 0 0 DEBUG_MSG("hiredis returns error: %s", self->ac->c.errstr);
1585 0 0 DEBUG_MSG("finish with %d", res);
1593 0 0 DEBUG_MSG("%s", "start");
1595 0 0 DEBUG_MSG("%s", "finish");