File Coverage

lib/Redis/Jet.xs
Criterion Covered Total %
statement 244 372 65.5
branch 123 324 37.9
condition n/a
subroutine n/a
pod n/a
total 367 696 52.7


line stmt bran cond sub pod time code
1             #ifdef __cplusplus
2             extern "C" {
3             #endif
4              
5             #define PERL_NO_GET_CONTEXT /* we want efficiency */
6             #include
7             #include
8             #include
9             #include
10             #include
11              
12             #ifdef __cplusplus
13             } /* extern "C" */
14             #endif
15              
16             #define NEED_newRV_noinc
17             #define NEED_sv_2pv_flags
18             #include "ppport.h"
19              
20             #ifndef STATIC_INLINE /* a public perl API from 5.13.4 */
21             # if defined(__GNUC__) || defined(__cplusplus) || (defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L))
22             # define STATIC_INLINE static inline
23             # else
24             # define STATIC_INLINE static
25             # endif
26             #endif /* STATIC_INLINE */
27              
28             #define READ_MAX 16384
29             #define REQUEST_BUF_SIZE 4096
30              
31             #define PIPELINE(a) a == 1
32             #define FIGURES(a) (a==0) ? 1 : (int)log10(a) + 1
33              
34             struct jet_response_st {
35             SV * data;
36             };
37              
38             struct redis_jet_s {
39             SV * server;
40             double connect_timeout;
41             double io_timeout;
42             int utf8;
43             int noreply;
44             int reconnect_attempts;
45             double reconnect_delay;
46             int fileno;
47             HV * bucket;
48             char * request_buf;
49             char * read_buf;
50             long int request_buf_len;
51             long int read_buf_len;
52             struct jet_response_st * response_st;
53             long int response_st_len;
54             };
55             typedef struct redis_jet_s Redis_Jet;
56              
57             STATIC_INLINE
58             void
59 6           _smaller_zero_err(const char * key) {
60 6           croak("%s must be larger than zero", key);
61             }
62              
63             STATIC_INLINE
64             int
65 17           hv_fetch_iv(pTHX_ HV * hv, const char * key, const int defaultval ) {
66             SV **ssv;
67 17           ssv = hv_fetch(hv, key, strlen(key), 0);
68 17 50         if (ssv) {
69 17 50         return SvIV(*ssv);
70             }
71             return defaultval;
72             }
73              
74             STATIC_INLINE
75             int
76             hv_fetch_iv_positive_number(pTHX_ HV * hv, const char * key, const int defaultval) {
77 17           int ret = hv_fetch_iv(aTHX_ (hv), key, defaultval);
78 17 100         if (ret < 0) _smaller_zero_err(key);
    100          
    100          
79             return ret;
80             }
81              
82             STATIC_INLINE
83             double
84 16           hv_fetch_nv(pTHX_ HV * hv, const char * key, const double defaultval ) {
85             SV **ssv;
86 16           ssv = hv_fetch(hv, key, strlen(key), 0);
87 16 50         if (ssv) {
88 16 100         return SvNV(*ssv);
89             }
90             return defaultval;
91             }
92              
93             STATIC_INLINE
94             double
95             hv_fetch_nv_positive_number(pTHX_ HV * hv, const char * key, const double defaultval) {
96 16           double ret = hv_fetch_nv(aTHX_ (hv), key, defaultval);
97 16 100         if (ret < 0) _smaller_zero_err(key);
    100          
    100          
98             return ret;
99             }
100              
101             STATIC_INLINE
102             void
103             memcat( char * dst, ssize_t *dst_len, const char * src, const ssize_t src_len ) {
104 8           memcpy(&dst[*dst_len], src, src_len);
105 8           *dst_len += src_len;
106             }
107              
108             STATIC_INLINE
109             char *
110 8           svpv2char(pTHX_ SV *string, STRLEN *len, const int utf8) {
111 8 50         if ( utf8 ) {
112 0 0         SvGETMAGIC(string);
113 0 0         if (!SvUTF8(string)) {
114 0           string = sv_mortalcopy(string);
115 0           sv_utf8_encode(string);
116             }
117             }
118 8 50         return (char *)SvPV(string, *len);
119             }
120              
121              
122              
123             STATIC_INLINE
124             void
125             renewmem(pTHX_ char **d, ssize_t *cur, const ssize_t req) {
126 25 0         if ( req > *cur ) {
    0          
    0          
    50          
    50          
    50          
    0          
    0          
    0          
    50          
127 1           *cur = req - (req % 4096) + 4096;
128 1           Renew(*d, *cur, char);
129             }
130             }
131              
132             STATIC_INLINE
133             void
134             memcat_i(char * dst, ssize_t *dst_len, ssize_t snum, const int fig ) {
135 8           int dlen = *dst_len + fig - 1;
136             do {
137 8           dst[dlen] = '0' + (snum % 10);
138 8           dlen--;
139 8 0         } while ( snum /= 10);
    0          
    50          
    0          
    0          
140 8           *dst_len += fig;
141             }
142              
143             STATIC_INLINE
144             long int
145 24           _index_crlf(const char * buf, const ssize_t buf_len, ssize_t offset) {
146             ssize_t ret = -1;
147             char *p;
148 24           p = memchr(&buf[offset],'\n',buf_len);
149 24 50         if (!p) {
150             return ret;
151             }
152 24 50         if ( *(p-1) == '\r' ) {
153 24           return p - &buf[offset] + offset - 1;
154             }
155 0 0         while( offset < buf_len -1 ) {
156 0 0         if (buf[offset] == 13 && buf[offset+1] == 10 ) {
    0          
157             ret = offset;
158             break;
159             }
160 0           offset++;
161             }
162             return ret;
163             }
164              
165             STATIC_INLINE
166             void
167 11           _av_push(pTHX_ AV * data_av, const char * buf, const ssize_t copy_len, const int utf8) {
168             SV * dst;
169 11           dst = newSVpvn(buf, copy_len);
170 11           SvPOK_only(dst);
171 11 100         if ( utf8 ) { SvUTF8_on(dst); }
172 11           (void)av_push(data_av, dst);
173 11           }
174              
175             STATIC_INLINE
176             void
177 7           _sv_store(pTHX_ SV * data_sv, char * buf, ssize_t copy_len, int utf8) {
178             char * d;
179 7 50         d = SvGROW(data_sv, copy_len);
    50          
180 7           memcpy(d, buf, copy_len);
181 7           SvCUR_set(data_sv, copy_len);
182 7           SvPOK_only(data_sv);
183 7 50         if ( utf8 ) {
184 0           SvUTF8_on(data_sv);
185             }
186 7           }
187              
188             /*
189             == -2 incomplete
190             == -1 broken
191             */
192             STATIC_INLINE
193             long int
194 13           _parse_message(pTHX_ char * buf, const ssize_t buf_len, SV * data_sv, SV * error_sv, const int utf8) {
195             long int first_crlf;
196             long int m_first_crlf;
197             ssize_t v_size;
198             ssize_t m_size;
199             ssize_t m_v_size;
200             ssize_t m_buf_len;
201             ssize_t m_read;
202             ssize_t j;
203             char * m_buf;
204             AV * av_list;
205              
206 13 50         if ( buf_len < 2 ) {
207             return -2;
208             }
209 13           first_crlf = _index_crlf(buf,buf_len,0);
210 13 50         if ( first_crlf < 0 ) {
211             return -2;
212             }
213              
214 13           switch ( buf[0] ) {
215             case '+':
216             case ':':
217             /* 1 line reply
218             +foo\r\n */
219 5           _sv_store(aTHX_ data_sv, &buf[1], first_crlf-1, utf8);
220 5           return first_crlf + 2;
221             case '-':
222             /* error
223             -ERR unknown command 'a' */
224 1           sv_setsv(data_sv, &PL_sv_undef);
225 1           _sv_store(aTHX_ error_sv, &buf[1], first_crlf-1, utf8);
226 1           return first_crlf + 2;
227             case '$':
228             /* bulf
229             C: get mykey
230             S: $3
231             S: foo
232             */
233 1 50         if ( buf[1] == '-' && buf[2] == '1' ) {
    0          
234 0           sv_setsv(data_sv, &PL_sv_undef);
235 0           sv_setsv(error_sv, &PL_sv_undef);
236 0           return first_crlf + 2;
237             }
238             v_size = 0;
239 2 100         for (j=1; j
240 1           v_size = v_size * 10 + (buf[j] - '0');
241             }
242 1 50         if ( buf_len - (first_crlf + 2) < v_size + 2 ) {
243             return -2;
244             }
245 1           _sv_store(aTHX_ data_sv, &buf[first_crlf+2], v_size, utf8);
246 1           sv_setsv(error_sv, &PL_sv_undef);
247 1           return first_crlf+2+v_size+2;
248             case '*':
249             /* multibulk
250             # *3
251             # $3
252             # foo
253             # $-1
254             # $3
255             # baa
256             #
257             ## null list/timeout
258             # *-1
259             #
260             */
261 5 50         if ( buf[1] == '-' && buf[2] == '1' ) {
    0          
262 0           sv_setsv(data_sv, &PL_sv_undef);
263 0           sv_setsv(error_sv, &PL_sv_undef);
264 0           return first_crlf + 2;
265             }
266             m_size = 0;
267 10 100         for (j=1; j
268 5           m_size = m_size * 10 + (buf[j] - '0');
269             }
270 5           av_list = newAV();
271 5 50         if ( m_size == 0 ) {
272 0           sv_setsv(data_sv, sv_2mortal(newRV_noinc((SV *) av_list)));
273 0           sv_setsv(error_sv, &PL_sv_undef);
274 0           return first_crlf + 2;
275             }
276 5           m_buf = &buf[first_crlf + 2];
277 5           m_buf_len = buf_len - (first_crlf + 2);
278             m_read = 0;
279 11 50         while ( m_buf_len > m_read ) {
280 11 50         if (m_buf[0] != '$' ) {
281             return -1;
282             }
283 11 50         if (m_buf[1] == '-' && m_buf[2] == '1' ) {
    0          
284 0           av_push(av_list, &PL_sv_undef);
285 0           m_buf += 5;
286 0           m_read += 5;
287 0           continue;
288             }
289 11           m_first_crlf = _index_crlf(m_buf, m_buf_len - m_read, 0);
290 11 50         if ( m_first_crlf < 0 ) {
291             return -2;
292             }
293             m_v_size = 0;
294 22 100         for (j=1; j
295 11           m_v_size = m_v_size * 10 + (m_buf[j] - '0');
296             }
297 11 50         if ( m_buf_len - m_read - (m_first_crlf + 2) < m_v_size + 2 ) {
298             return -2;
299             }
300 11           _av_push(aTHX_ av_list, &m_buf[m_first_crlf+2], m_v_size, utf8);
301 11           m_buf += m_first_crlf+2+m_v_size+2;
302 11           m_read += m_first_crlf+2+m_v_size+2;
303 11 100         if ( av_len(av_list) + 1 == m_size ) {
304             break;
305             }
306             }
307 5 50         if ( av_len(av_list) + 1 < m_size ) {
308             return -2;
309             }
310 5           sv_setsv(data_sv, sv_2mortal(newRV_noinc((SV *) av_list)));
311 5           sv_setsv(error_sv, &PL_sv_undef);
312 5           return first_crlf + 2 + m_read;
313             default:
314             return -1;
315             }
316             }
317              
318             STATIC_INLINE
319             ssize_t
320 4           _write_timeout(const int fileno, const double timeout, char * write_buf, const int write_len ) {
321             int rv;
322             int nfound;
323             struct pollfd wfds[1];
324             DO_WRITE:
325 2           rv = write(fileno, write_buf, write_len);
326 2 50         if ( rv >= 0 ) {
327 2           return rv;
328             }
329 0 0         if ( rv < 0 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK ) {
    0          
    0          
    0          
330 0           return rv;
331             }
332             WAIT_WRITE:
333 0           wfds[0].fd = fileno;
334 0           wfds[0].events = POLLOUT;
335             while (1) {
336 0           nfound = poll(wfds, 1, (int)timeout*1000);
337 0 0         if ( nfound == 1 ) {
338             break;
339             }
340 0 0         if ( nfound == 0 && errno != EINTR ) {
    0          
341             return -1;
342             }
343             }
344             goto DO_WRITE;
345             }
346              
347              
348             STATIC_INLINE
349             ssize_t
350 3           _read_timeout(const int fileno, const double timeout, char * read_buf, const int read_len ) {
351             int rv;
352             int nfound;
353             struct pollfd rfds[1];
354 3           goto WAIT_READ;
355             DO_READ:
356 6           rv = read(fileno, read_buf, read_len);
357 3 50         if ( rv >= 0 ) {
358 3           return rv;
359             }
360 0 0         if ( rv < 0 && errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK ) {
    0          
    0          
    0          
361 0           return rv;
362             }
363             WAIT_READ:
364 3           rfds[0].fd = fileno;
365 3           rfds[0].events = POLLIN;
366             while (1) {
367 3           nfound = poll(rfds, 1, (int)timeout*1000);
368 3 50         if ( nfound == 1 ) {
369             break;
370             }
371 0 0         if ( nfound == 0 && errno != EINTR ) {
    0          
372             return -1;
373             }
374             }
375             goto DO_READ;
376             }
377              
378             STATIC_INLINE
379             void
380 8           disconnect_socket (pTHX_ Redis_Jet * self) {
381 4           self->fileno = 0;
382 4 100         if ( hv_exists(self->bucket, "socket", strlen("socket")) ) {
383 2           (void)hv_delete(self->bucket, "socket", strlen("socket"), 0);
384             }
385 4           }
386              
387             MODULE = Redis::Jet PACKAGE = Redis::Jet
388              
389             PROTOTYPES: DISABLE
390              
391             Redis_Jet *
392             _new(class, args)
393             char * class
394             SV * args
395             PREINIT:
396             Redis_Jet * self;
397             STRLEN server_len;
398             char * s;
399             SV **server_ssv;
400             CODE:
401 8           Newxz(self, sizeof(Redis_Jet), Redis_Jet);
402 8 50         if ( SvTYPE(SvRV(args)) == SVt_PVHV) {
403 8           server_ssv = hv_fetch((HV *)SvRV(args), "server", strlen("server"),0);
404 8 50         if ( server_ssv ) {
405 8           self->server = newSVsv(*server_ssv);
406             }
407             else {
408 0           self->server = newSVpvs("127.0.0.1:6379");
409             }
410 15           self->utf8 = hv_fetch_iv_positive_number(aTHX_ (HV *)SvRV(args), "utf8", 0);
411 13           self->connect_timeout = hv_fetch_nv_positive_number(aTHX_ (HV *)SvRV(args), "connect_timeout", 10);
412 11           self->io_timeout = hv_fetch_nv_positive_number(aTHX_ (HV *)SvRV(args), "io_timeout", 10);
413 9           self->noreply = hv_fetch_iv_positive_number(aTHX_ (HV *)SvRV(args), "noreply", 0);
414 7           self->reconnect_attempts = hv_fetch_iv_positive_number(aTHX_ (HV *)SvRV(args), "reconnect_attempts", 0);
415 5           self->reconnect_delay = hv_fetch_nv_positive_number(aTHX_ (HV *)SvRV(args), "reconnect_delay", 10);
416 2           self->bucket = newHV();
417             }
418             else {
419 0           croak("Not a hash reference");
420             }
421 2           self->request_buf_len = 0;
422 2           self->read_buf_len = 0;
423 2           self->response_st_len = 0;
424             RETVAL = self;
425             OUTPUT:
426             RETVAL
427              
428             HV *
429             get_bucket(self)
430             Redis_Jet * self
431             CODE:
432 2           RETVAL = self->bucket;
433             OUTPUT:
434             RETVAL
435              
436             SV *
437             get_server(self)
438             Redis_Jet * self
439             PREINIT:
440             PPCODE:
441 4 50         XPUSHs(self->server);
442              
443              
444             double
445             get_connect_timeout(self)
446             Redis_Jet * self
447             CODE:
448 2           RETVAL = self->connect_timeout;
449             OUTPUT:
450             RETVAL
451              
452             double
453             get_io_timeout(self)
454             Redis_Jet * self
455             CODE:
456 0           RETVAL = self->io_timeout;
457             OUTPUT:
458             RETVAL
459              
460             int
461             get_utf8(self)
462             Redis_Jet * self
463             CODE:
464 0           RETVAL = self->utf8;
465             OUTPUT:
466             RETVAL
467              
468             int
469             get_noreply(self)
470             Redis_Jet * self
471             CODE:
472 0           RETVAL = self->noreply;
473             OUTPUT:
474             RETVAL
475              
476             int
477             set_fileno(self,fileno)
478             Redis_Jet * self
479             int fileno
480             CODE:
481 2           RETVAL = self->fileno = fileno;
482             OUTPUT:
483             RETVAL
484              
485             SV *
486             _destroy(self)
487             Redis_Jet * self
488             CODE:
489 2 50         if ( self->request_buf_len != 0 ) {
490 2           Safefree(self->request_buf);
491             }
492 2 50         if ( self->response_st_len != 0 ) {
493 2           Safefree(self->response_st);
494             }
495 2 50         if ( self->read_buf_len != 0 ) {
496 2           Safefree(self->read_buf);
497             }
498 2           disconnect_socket(aTHX_ self);
499 2           SvREFCNT_dec((SV*)self->server);
500 2           SvREFCNT_dec((SV*)self->bucket);
501 2           Safefree(self);
502              
503             SV *
504             parse_message(buf_sv, res_av)
505             SV * buf_sv
506             AV * res_av
507             ALIAS:
508             Redis::Jet::parse_message = 0
509             Redis::Jet::parse_message_utf8 = 1
510             PREINIT:
511             ssize_t buf_len;
512             char * buf;
513             AV * data_av;
514             SV * data_sv;
515             SV * error_sv;
516             long int ret;
517             long int readed;
518             CODE:
519 7           buf_len = SvCUR(buf_sv);
520 7 50         buf = SvPV_nolen(buf_sv);
521             readed = 0;
522 15 100         while ( buf_len > 0 ) {
523 8           data_sv = newSV(0);
524 8 50         (void)SvUPGRADE(data_sv, SVt_PV);
525 8           error_sv = newSV(0);
526 8 50         (void)SvUPGRADE(error_sv, SVt_PV);
527 8           ret = _parse_message(aTHX_ buf, buf_len, data_sv, error_sv, ix);
528 8 50         if ( ret == -1 ) {
529 0           XSRETURN_UNDEF;
530             }
531 8 50         else if ( ret == -2 ) {
532             break;
533             }
534             else {
535 8           data_av = newAV();
536 8           av_push(data_av, data_sv);
537 8 100         if ( SvOK(error_sv) ) {
    50          
    50          
538 1           av_push(data_av, error_sv);
539             } else {
540 7           sv_2mortal(error_sv);
541             }
542 8           av_push(res_av, newRV_noinc((SV *) data_av));
543 8           readed += ret;
544 8           buf_len -= ret;
545 8           buf = &buf[ret];
546             }
547             }
548 7           RETVAL = newSViv(readed);
549             OUTPUT:
550             RETVAL
551              
552             SV *
553             command(self,...)
554             Redis_Jet * self
555             ALIAS:
556             Redis::Jet::command = 0
557             Redis::Jet::pipeline = 1
558             PREINIT:
559             AV * data_av;
560             SV * data_sv;
561             SV * error_sv;
562             ssize_t i, j;
563             long int ret;
564             /* build */
565             int args_offset = 1;
566             int fig;
567             int connect_retry = 0;
568             ssize_t pipeline_len = 1;
569             ssize_t request_len = 0;
570 2           STRLEN request_arg_len = 0;
571             char * request_arg;
572             AV * request_arg_list;
573             /* send */
574             ssize_t written;
575             char * write_buf;
576             /* response */
577             ssize_t readed;
578             ssize_t parse_offset;
579             ssize_t parsed_response;
580             long int parse_result;
581             PPCODE:
582             /* init */
583 2 50         if ( self->request_buf_len == 0 ) {
584 2           Newx(self->request_buf, REQUEST_BUF_SIZE, char);
585 2           self->request_buf_len = REQUEST_BUF_SIZE;
586             }
587 2 50         if ( self->read_buf_len == 0 ) {
588 2           Newx(self->read_buf, READ_MAX, char);
589 2           self->read_buf_len = READ_MAX;
590             }
591 2 50         if ( self->response_st_len == 0 ) {
592 2           Newx(self->response_st, sizeof(struct jet_response_st)*10, struct jet_response_st);
593 2           self->response_st_len = 30;
594             }
595             DO_CONNECT:
596             /* connect */
597 2 50         if ( self->fileno == 0 ) {
598             {
599 2           dSP;
600 2           ENTER;
601 2           SAVETMPS;
602 2 50         PUSHMARK(SP);
603 2 50         XPUSHs(ST(0));
604 2           PUTBACK;
605 2           call_method("connect", G_DISCARD);
606 2 50         FREETMPS;
607 2           LEAVE;
608             }
609 2 50         if ( self->fileno == 0 ) {
610             /* connection error */
611 0           disconnect_socket(aTHX_ self);
612 0 0         if ( self->reconnect_attempts > connect_retry ) {
613 0           connect_retry++;
614 0           usleep(self->reconnect_delay*1000000); /* micro-sec */
615 0           goto DO_CONNECT;
616             }
617 0 0         if ( PIPELINE(ix) ) {
618 0           pipeline_len = items - args_offset;
619 0 0         EXTEND(SP, pipeline_len);
    0          
620 0 0         for (i=0; i
621 0           data_av = newAV();
622 0           (void)av_push(data_av, &PL_sv_undef);
623 0 0         (void)av_push(data_av, newSVpvf("failed to connect server: %s",
624             ( errno != 0 ) ? strerror(errno) : "timeout"));
625 0           PUSHs( sv_2mortal(newRV_noinc((SV *) data_av)) );
626             }
627             }
628             else {
629 0 0         EXTEND(SP, 2);
630 0           PUSHs(&PL_sv_undef);
631 0 0         PUSHs(sv_2mortal(newSVpvf("failed to connect server: %s",
632             ( errno != 0 ) ? strerror(errno) : "timeout")));
633             }
634             goto COMMAND_DONE;
635             }
636             }
637              
638             /* connection successful */
639             connect_retry = 0;
640              
641             /* char * s = SvPV_nolen(ST(1)); */
642             /* printf("ix:%d,fileno:%d,utf8:%d,timeout:%f,noreply:%d, items:%d %s\n",ix,self->fileno,self->utf8,self->io_timeout,self->noreply,items,&s[0]); */
643              
644              
645             /* build_message */
646 2 50         if ( PIPELINE(ix) ) {
647             /* build_request([qw/set foo bar/],[qw/set bar baz/]) */
648 2           pipeline_len = items - args_offset;
649 10 100         for( i=args_offset; i < items; i++ ) {
650 8 50         if ( SvOK(ST(i)) && SvROK(ST(i)) && SvTYPE(SvRV(ST(i))) == SVt_PVAV ) {
    0          
    0          
    50          
    0          
651             request_arg_list = (AV *)SvRV(ST(i));
652 0 0         fig = FIGURES(av_len(request_arg_list)+1);
653             /* 1(*) + args + 2(crlf) */
654 0           renewmem(aTHX_ &self->request_buf, &self->request_buf_len, 1 + fig + 2);
655 0           self->request_buf[request_len++] = '*';
656 0           memcat_i(self->request_buf, &request_len, av_len(request_arg_list)+1, fig);
657 0           self->request_buf[request_len++] = 13; /* \r */
658 0           self->request_buf[request_len++] = 10; /* \n */
659 0 0         for (j=0; j
660 0           request_arg = svpv2char(aTHX_ *av_fetch(request_arg_list,j,0), &request_arg_len, self->utf8);
661 0 0         fig = FIGURES(request_arg_len);
662             /* 1($) + fig + 2(crlf) + command_arg_len + 2 */
663 0           renewmem(aTHX_ &self->request_buf, &self->request_buf_len, 1 + fig + 2 + request_arg_len + 2);
664 0           self->request_buf[request_len++] = '$';
665 0           memcat_i(self->request_buf, &request_len, request_arg_len, fig);
666 0           self->request_buf[request_len++] = 13; /* \r */
667 0           self->request_buf[request_len++] = 10; /* \n */
668             /* need size: self->request_buf_len + request_arg_len - (self->request_buf_len - request_len) */
669 0           renewmem(aTHX_ &self->request_buf, &self->request_buf_len, request_arg_len + request_len);
670 0           memcat(self->request_buf, &request_len, request_arg, request_arg_len);
671 0           self->request_buf[request_len++] = 13; /* \r */
672 0           self->request_buf[request_len++] = 10; /* \n */
673             }
674             }
675             else {
676             /* 1(*) + 1(args) + 2(crlf) */
677             renewmem(aTHX_ &self->request_buf, &self->request_buf_len, 1 + 1 + 2);
678 8           self->request_buf[request_len++] = '*';
679 8           self->request_buf[request_len++] = '1';
680 8           self->request_buf[request_len++] = 13; /* \r */
681 8           self->request_buf[request_len++] = 10; /* \n */
682 8           request_arg = svpv2char(aTHX_ ST(i), &request_arg_len, self->utf8);
683 8 50         fig = FIGURES(request_arg_len);
684             /* 1($) + fig + 2(crlf) + command_arg_len + 2 */
685 8           renewmem(aTHX_ &self->request_buf, &self->request_buf_len, 1 + fig + 2 + request_arg_len + 2);
686 8           self->request_buf[request_len++] = '$';
687 8           memcat_i(self->request_buf, &request_len, request_arg_len, fig);
688 8           self->request_buf[request_len++] = 13; /* \r */
689 8           self->request_buf[request_len++] = 10; /* \n */
690             /* need size: self->request_buf_len + request_arg_len - (self->request_buf_len - request_len) */
691 8           renewmem(aTHX_ &self->request_buf, &self->request_buf_len, request_arg_len + request_len);
692 8           memcat(self->request_buf, &request_len, request_arg, request_arg_len);
693 8           self->request_buf[request_len++] = 13; /* \r */
694 8           self->request_buf[request_len++] = 10; /* \n */
695             }
696             }
697             }
698             else {
699             /* build_request(qw/set bar baz/) */
700 0 0         fig = FIGURES(items-args_offset);
701             /* 1(*) + fig + 2(crlf) */
702 0           renewmem(aTHX_ &self->request_buf, &self->request_buf_len, 1 + fig + 2);
703 0           self->request_buf[request_len++] = '*';
704 0           memcat_i(self->request_buf, &request_len, items-args_offset, fig);
705 0           self->request_buf[request_len++] = 13; /* \r */
706 0           self->request_buf[request_len++] = 10; /* \n */
707 0 0         for (j=args_offset; j
708 0           request_arg = svpv2char(aTHX_ ST(j), &request_arg_len, self->utf8);
709 0 0         fig = FIGURES(request_arg_len);
710             /* 1($) + fig + 2(crlf) + command_arg_len + 2 */
711 0           renewmem(aTHX_ &self->request_buf, &self->request_buf_len, 1 + fig + 2 + request_arg_len + 2);
712 0           self->request_buf[request_len++] = '$';
713 0           memcat_i(self->request_buf, &request_len, request_arg_len, fig);
714 0           self->request_buf[request_len++] = 13; /* \r */
715 0           self->request_buf[request_len++] = 10; /* \n */
716             /* need size: self->request_buf_len + request_arg_len - (self->request_buf_len - request_len) */
717 0           renewmem(aTHX_ &self->request_buf, &self->request_buf_len, request_arg_len + request_len);
718 0           memcat(self->request_buf, &request_len, request_arg, request_arg_len);
719 0           self->request_buf[request_len++] = 13; /* \r */
720 0           self->request_buf[request_len++] = 10; /* \n */
721             }
722             }
723              
724             /* printf("== %s --\n",&self->request_buf[0]); */
725             /* printf("pipeline_len:%d,%d,%ld\n",args_offset,items,pipeline_len); */
726             /* send request */
727             written = 0;
728 2           write_buf = &self->request_buf[0];
729 4 100         while ( request_len > written ) {
730 2           ret = _write_timeout(self->fileno, self->io_timeout, write_buf, request_len - written);
731 2 50         if ( ret <= 0 ) {
732             break;
733             }
734 2           written += ret;
735 2           write_buf = &self->request_buf[written];
736             }
737              
738             /* request done */
739 2 50         if ( PIPELINE(ix) ) {
740 2 50         EXTEND(SP, pipeline_len);
    50          
741 2 50         if ( pipeline_len > self->response_st_len ) {
742 0 0         Renew(self->response_st, sizeof(struct jet_response_st)*pipeline_len, struct jet_response_st);
743 0           self->response_st_len = pipeline_len;
744             }
745             }
746             else {
747 0 0         EXTEND(SP, 2);
748             }
749              
750             /* request error */
751 2 50         if (ret <= 0) {
752 0           disconnect_socket(aTHX_ self);
753 0 0         if ( ret == 0 && self->reconnect_attempts > connect_retry ) {
    0          
754             connect_retry++;
755 0           usleep(self->reconnect_delay*1000000); /* micro-sec */
756 0           goto DO_CONNECT;
757             }
758 0 0         if ( PIPELINE(ix) ) {
759 0 0         for (i=0; i
760 0           data_av = newAV();
761 0           (void)av_push(data_av, &PL_sv_undef);
762 0 0         (void)av_push(data_av, newSVpvf("failed to send message: %s",
763             ( errno != 0 ) ? strerror(errno) : "timeout or disconnected"));
764 0           PUSHs( sv_2mortal(newRV_noinc((SV *) data_av)) );
765             }
766             }
767             else {
768 0           PUSHs(&PL_sv_undef);
769 0 0         PUSHs(sv_2mortal(newSVpvf("failed to send message: %s",
770             ( errno != 0 ) ? strerror(errno) : "timeout or disconnected")));
771             }
772             goto COMMAND_DONE;
773             }
774              
775             /* noreply */
776 2 50         if ( self->noreply > 0 ) {
777 0           ret = read(self->fileno, &self->read_buf[0], READ_MAX);
778 0 0         if ( PIPELINE(ix) ) {
779 0 0         for (i=0; i
780 0           data_av = newAV();
781 0           (void)av_push(data_av, newSVpvs("0 but true"));
782 0           PUSHs( sv_2mortal(newRV_noinc((SV *) data_av)) );
783             }
784             }
785             else {
786 1           PUSHs(sv_2mortal(newSVpvs("0 but true")));
787             }
788             goto COMMAND_DONE;
789             }
790              
791             /* read response */
792             parsed_response=0;
793             parse_offset=0;
794             readed = 0;
795             while (1) {
796 3           ret = _read_timeout(self->fileno, self->io_timeout, &self->read_buf[readed], READ_MAX);
797 3 100         if ( ret <= 0 ) {
798             /* timeout or error */
799 1           disconnect_socket(aTHX_ self);
800 1 50         if ( PIPELINE(ix) ) {
801 3 100         for (i=parsed_response; i
802 2           data_av = newAV();
803 2           (void)av_push(data_av, &PL_sv_undef);
804 2 50         (void)av_push(data_av, newSVpvf("failed to read message: %s", ( errno != 0 ) ? strerror(errno) : "timeout or disconnected"));
805 2           self->response_st[i].data = newRV_noinc((SV*)data_av);
806             }
807             }
808             else {
809 0           self->response_st[0].data = &PL_sv_undef;
810 0 0         self->response_st[1].data = newSVpvf("failed to read message: %s", ( errno != 0 ) ? strerror(errno) : "timeout or disconnected");
811             }
812             goto PARSER_DONE;
813             }
814 2           readed += ret;
815 6 100         while ( readed > parse_offset ) {
816 5           data_sv = newSV(0);
817 5 50         (void)SvUPGRADE(data_sv, SVt_PV);
818 5           error_sv = newSV(0);
819 5 50         (void)SvUPGRADE(error_sv, SVt_PV);
820 5           parse_result = _parse_message(aTHX_ &self->read_buf[parse_offset],
821             readed - parse_offset, data_sv, error_sv, self->utf8);
822 5 100         if ( parse_result == -1 ) {
823             /* corruption */
824 1           disconnect_socket(aTHX_ self);
825 1 50         if ( PIPELINE(ix) ) {
826 3 100         for (i=parsed_response; i
827 2           data_av = newAV();
828 2           (void)av_push(data_av, &PL_sv_undef);
829 2           (void)av_push(data_av, newSVpvs("failed to parse message: corrupted message found"));
830 2           self->response_st[i].data = newRV_noinc((SV*)data_av);
831             }
832             }
833             else {
834 0           self->response_st[0].data = &PL_sv_undef;
835 0           self->response_st[1].data = newSVpvs("failed to parse message: corrupted message found");
836             }
837             goto PARSER_DONE;
838             }
839 4 50         else if ( parse_result == -2 ) {
840             break;
841             }
842             else {
843 4           parse_offset += parse_result;
844 4 50         if ( PIPELINE(ix) ) {
845 4           data_av = newAV();
846 4           av_push(data_av, data_sv);
847 4 50         if ( SvOK(error_sv) ) {
    50          
    50          
848 0           av_push(data_av, error_sv);
849             }
850             else {
851 4           sv_2mortal(error_sv);
852             }
853 4           self->response_st[parsed_response++].data = newRV_noinc((SV*)data_av);
854 4 50         if ( parsed_response >= pipeline_len ) {
855             goto PARSER_DONE;
856             }
857             }
858             else {
859 0           self->response_st[0].data = data_sv;
860 0           self->response_st[1].data = error_sv;
861 0           goto PARSER_DONE;
862             }
863             }
864             }
865 1           renewmem(aTHX_ &self->read_buf, &self->read_buf_len, readed + READ_MAX);
866             }
867            
868             PARSER_DONE:
869 2 50         if ( PIPELINE(ix) ) {
870 10 100         for (i=0; i
871 8           PUSHs( sv_2mortal((SV *)self->response_st[i].data));
872             }
873             }
874             else {
875 0           PUSHs( sv_2mortal(self->response_st[0].data) );
876 0 0         if ( SvOK(self->response_st[1].data) ) {
    0          
    0          
877 0           PUSHs( sv_2mortal(self->response_st[1].data) );
878             }
879             else {
880 0           sv_2mortal(self->response_st[1].data); /* XXX */
881             }
882             }
883            
884             COMMAND_DONE:
885 2 50         if ( self->request_buf_len > REQUEST_BUF_SIZE * 4 ) {
886 0           Safefree(self->request_buf);
887 0           self->request_buf_len = 0;
888             }
889 2 50         if ( self->response_st_len > 100 ) {
890 0           Safefree(self->response_st);
891 0           self->response_st_len = 0;
892             }
893 2 50         if ( self->read_buf_len > READ_MAX * 4 ) {
894 0           Safefree(self->read_buf);
895 0           self->read_buf_len = 0;
896             }
897