File Coverage

rdb_parser.c
Criterion Covered Total %
statement 223 236 94.4
branch 117 144 81.2
condition n/a
subroutine n/a
pod n/a
total 340 380 89.4


line stmt bran cond sub pod time code
1             #include
2             #include
3             #include
4              
5             #define NEED_newRV_noinc
6             #define NEED_sv_2pv_flags
7             #include "ppport.h"
8              
9             #include "rdb_parser.h"
10              
11 4           RDB_parser* rdb_parser__init(SV* master, SV* error_class, int utf8) {
12             RDB_parser *parser;
13              
14 4           Newx(parser, 1, RDB_parser);
15 4 50         if (parser == NULL) {
16 0           croak("Couldn't allocate memory for parser");
17             }
18              
19 4 50         if (SvROK(master)) {
20 0           parser->master = SvRV(master);
21             }
22             else {
23 4           parser->master = &PL_sv_undef;
24             }
25              
26 4           parser->utf8 = utf8;
27 4           parser->callbacks = newAV();
28 4           parser->default_cb = NULL;
29 4           parser->mblk_reply = NULL;
30 4           parser->mblk_store = NULL;
31 4           parser->buffer = newSVpvn("", 0);
32 4           parser->state = RDBP_CLEAN;
33              
34 4           parser->error_class = newSVsv(error_class);
35 4           parser->error_class_constructor = newSVsv(error_class);
36 4           sv_catpv(parser->error_class_constructor, "::new");
37              
38 4           return parser;
39             }
40              
41 4           void rdb_parser__free(RDB_parser *parser) {
42             struct rdbp_mblk_store *store, *next;
43              
44 4           SvREFCNT_dec(parser->callbacks);
45 4           SvREFCNT_dec(parser->buffer);
46 4           SvREFCNT_dec(parser->error_class);
47 4           SvREFCNT_dec(parser->error_class_constructor);
48 4 50         if (parser->default_cb != NULL)
49 0           SvREFCNT_dec(parser->default_cb);
50 4 100         if (parser->mblk_reply != NULL)
51 1           SvREFCNT_dec(parser->mblk_reply);
52              
53 4           store = parser->mblk_store;
54 4 50         while (store != NULL) {
55 0           next = store->next;
56 0           SvREFCNT_dec(store->mblk_reply);
57 0           Safefree(store);
58 0           store = next;
59             }
60              
61 4           Safefree(parser);
62 4           }
63              
64 1           void rdb_parser__propagate_reply(RDB_parser *parser, SV *reply) {
65             SV *cb;
66              
67             while (1) {
68 5 100         if(av_len(parser->callbacks) >= 0) {
69 3           cb = av_shift(parser->callbacks);
70 3           sv_2mortal(cb);
71             }
72 2 100         else if (parser->default_cb != NULL) {
73 1           cb = parser->default_cb;
74 1           parser->default_cb = NULL;
75             }
76             else {
77 1           break;
78             }
79              
80             {
81 4           dSP;
82 4           ENTER;
83 4           SAVETMPS;
84 4 50         PUSHMARK(SP);
85 4 50         XPUSHs(sv_2mortal(newRV_inc(parser->master)));
86 4 50         XPUSHs(sv_2mortal(newSVsv(reply)));
87 4           PUTBACK;
88 4           call_sv(cb, G_VOID|G_DISCARD);
89 4 50         FREETMPS;
90 4           LEAVE;
91             }
92 4           }
93 1           }
94              
95             static
96 99           long _line_length(char *start, size_t length) {
97             int i;
98 99           char *pos = start;
99              
100 340 100         for (i=0; i < length - 1; i++, pos++) {
101 336 100         if (*pos == '\r' && pos[1] == '\n') {
    50          
102 95           return (long)(pos - start);
103             }
104             }
105              
106 4           return -1;
107             }
108              
109             /*
110             * read line from the buffer and return it as SV
111             */
112             static
113 13           SV* _read_line(SV *buffer) {
114             char *pv;
115             long len;
116 13           SV *line = NULL;
117              
118 13           pv = SvPVX(buffer);
119 13           len = _line_length(pv, sv_len(buffer));
120 13 100         if (len >= 0) {
121 10           line = newSVpvn(pv, len);
122 10           sv_chop(buffer, pv + len + 2);
123             }
124              
125 13           return line;
126             };
127              
128             /*
129             * read line containing integer number
130             */
131             static
132 25           SV* _read_number(SV *buffer) {
133             char *pv;
134             long len;
135 25           SV *num = NULL;
136              
137 25           pv = SvPVX(buffer);
138 25           len = _line_length(pv, sv_len(buffer));
139 25 100         if (len >= 0) {
140 24           pv[len] = 0;
141 24           num = newSViv(atol(pv));
142 24           sv_chop(buffer, pv + len + 2);
143             }
144              
145 25           return num;
146             }
147              
148             /*
149             * read line containing length
150             * returns length >= -1, or -2 if line is not finished
151             */
152             static
153 61           long _read_length(SV *buffer) {
154             char *pv;
155             long len;
156 61           long num = -2;
157              
158 61           pv = SvPVX(buffer);
159 61           len = _line_length(pv, sv_len(buffer));
160 61 50         if (len >= 0) {
161 61           pv[len] = 0;
162 61           num = atol(pv);
163 61           sv_chop(buffer, pv + len + 2);
164             }
165              
166 61           return num;
167             }
168              
169             /*
170             * creates RedisDB::Parser::Error object from the message
171             */
172             static
173 4           SV* _create_rdb_error(RDB_parser *parser, SV *msg) {
174             int count;
175             SV* err;
176              
177 4           dSP;
178 4           ENTER;
179 4           SAVETMPS;
180 4 50         PUSHMARK(SP);
181 4 50         XPUSHs(parser->error_class);
182 4 50         XPUSHs(sv_2mortal(msg));
183 4           PUTBACK;
184 4           count = call_sv(parser->error_class_constructor, G_SCALAR);
185 4 50         if (count != 1)
186 0           croak("Expected single return value from new, but got many");
187 4           SPAGAIN;
188 4           err = newSVsv(POPs);
189 4           PUTBACK;
190 4 50         FREETMPS;
191 4           LEAVE;
192              
193 4           return err;
194             }
195              
196             /*
197             * stores current multibulk reply status in mblk_stack
198             */
199             static
200 13           void _mblk_status_store(RDB_parser *parser) {
201             struct rdbp_mblk_store *store;
202 13           Newx(store, 1, struct rdbp_mblk_store);
203 13           store->mblk_reply = parser->mblk_reply;
204 13           parser->mblk_reply = NULL;
205 13           store->mblk_len = parser->mblk_len;
206 13           store->next = parser->mblk_store;
207 13           parser->mblk_store = store;
208 13           }
209              
210             /*
211             * fetches status of the multibulk reply from the mblk_stack
212             */
213             static
214 13           void _mblk_status_fetch(RDB_parser *parser) {
215             struct rdbp_mblk_store *store;
216              
217 13           store = parser->mblk_store;
218 13 50         if (store == NULL)
219 0           croak("Already at the upper level of multi-bulk reply");
220 13           parser->mblk_len = store->mblk_len;
221 13           parser->mblk_reply = store->mblk_reply;
222 13           parser->mblk_store = store->next;
223 13           Safefree(store);
224 13           }
225              
226             /*
227             * process new value of multi-bulk reply
228             */
229             static
230 69           int _mblk_item(RDB_parser *parser, SV *value) {
231             SV *tmp;
232 69           int repeat = 0;
233              
234 69           av_push(parser->mblk_reply, value);
235 69 100         if (parser->mblk_len > 1) {
236 49           parser->mblk_len--;
237 49           parser->state = RDBP_WAIT_BUCKS;
238 49           repeat = 1;
239             }
240 20 100         else if (parser->mblk_level > 1) {
241 10           parser->mblk_level--;
242 10           tmp = newRV_noinc((SV *)(parser->mblk_reply));
243 10           _mblk_status_fetch(parser);
244 10           repeat = _mblk_item(parser, tmp);
245             }
246              
247 69           return repeat;
248             }
249              
250             /*
251             * check if we finished with this reply, and invoke callback if needed.
252             * returns 1 if reply completed or 0 otherwise
253             */
254             static
255 73           int _reply_completed(RDB_parser *parser, SV *value) {
256             SV *reply, *cb;
257              
258 73 100         if (parser->mblk_level) {
259 59 100         if (_mblk_item(parser, value))
260 49           return 0;
261 10           reply = newRV_noinc((SV *)(parser->mblk_reply));
262 10           parser->mblk_reply = NULL;
263             }
264             else
265 14           reply = value;
266              
267 24           parser->state = RDBP_CLEAN;
268              
269             {
270 24           dSP;
271 24           ENTER;
272 24           SAVETMPS;
273 24 100         if (av_len(parser->callbacks) >= 0) {
274 23           cb = av_shift(parser->callbacks);
275 23           sv_2mortal(cb);
276             }
277 1 50         else if (parser->default_cb != NULL) {
278 0           cb = parser->default_cb;
279             }
280 1           else croak("No callbacks in the queue and no default callback set");
281 23 50         PUSHMARK(SP);
282 23 50         XPUSHs(sv_2mortal(newRV_inc(parser->master)));
283 23 50         XPUSHs(sv_2mortal(reply));
284 23           PUTBACK;
285 23           call_sv(cb, G_VOID|G_DISCARD);
286 23 50         FREETMPS;
287 23           LEAVE;
288             }
289              
290 23           return 1;
291             }
292              
293 38           int rdb_parser__parse_reply(RDB_parser *parser) {
294             char op;
295             char *pv;
296             SV *line, *err, *bulk, *mblk;
297             long length;
298              
299 38 50         if (sv_len(parser->buffer) == 0) return 0;
300              
301 38 100         if (parser->state == RDBP_CLEAN) {
302 25           parser->mblk_level = 0;
303              
304             /* remove first character from the buffer */
305 25           pv = SvPVX(parser->buffer);
306 25           op = *pv;
307 25           sv_chop(parser->buffer, pv + 1);
308              
309 25 100         if (op == '+')
310 2           parser->state = RDBP_READ_LINE;
311 23 100         else if (op == '-')
312 3           parser->state = RDBP_READ_ERROR;
313 20 100         else if (op == ':')
314 4           parser->state = RDBP_READ_NUMBER;
315 16 100         else if (op == '$')
316 3           parser->state = RDBP_READ_BULK_LEN;
317 13 50         else if (op == '*') {
318 13           parser->state = RDBP_READ_MBLK_LEN;
319 13           parser->mblk_level = 1;
320             }
321             else {
322 0           croak("Got invalid reply");
323             }
324             }
325              
326             while (1) {
327 211 100         if (sv_len(parser->buffer) < 2) return 0;
328              
329 204 100         if (parser->state == RDBP_READ_LINE) {
330 8           line = _read_line(parser->buffer);
331 8 100         if (line == NULL) return 0;
332 6 100         if (_reply_completed(parser, line)) return 1;
333             }
334 196 100         else if (parser->state == RDBP_READ_ERROR) {
335 5           line = _read_line(parser->buffer);
336 5 100         if (line == NULL) return 0;
337 4           err = _create_rdb_error(parser, line);
338 4 100         if (_reply_completed(parser, err)) return 1;
339             }
340 191 100         else if (parser->state == RDBP_READ_NUMBER) {
341 25           line = _read_number(parser->buffer);
342 25 100         if (line == NULL) return 0;
343 24 100         if (_reply_completed(parser, line)) return 1;
344             }
345 166 100         else if (parser->state == RDBP_READ_BULK_LEN) {
346 35           length = _read_length(parser->buffer);
347 35 100         if (length >= 0) {
348 33           parser->state = RDBP_READ_BULK;
349 33           parser->bulk_len = length;
350             }
351 2 50         else if (length == -1) {
352 2 100         if (_reply_completed(parser, newSVpvn(NULL, 0))) return 1;
353             }
354 0           else return 0;
355             }
356 131 100         else if (parser->state == RDBP_READ_BULK) {
357 35 100         if (sv_len(parser->buffer) < 2 + parser->bulk_len) return 0;
358 33           pv = SvPVX(parser->buffer);
359 33           bulk = newSVpvn(pv, parser->bulk_len);
360 33           sv_chop(parser->buffer, pv + parser->bulk_len + 2);
361 33 100         if (parser->utf8) {
362 4 100         if (!sv_utf8_decode(bulk))
363 1           croak("Received invalid UTF-8 string from the server");
364             }
365 32 100         if (_reply_completed(parser, bulk)) return 1;
366             }
367 96 100         else if (parser->state == RDBP_READ_MBLK_LEN) {
368 26           length = _read_length(parser->buffer);
369 26 100         if (length > 0) {
370 21           parser->mblk_len = length;
371 21           parser->state = RDBP_WAIT_BUCKS;
372 21           parser->mblk_reply = newAV();
373             }
374 5 100         else if (length == 0 || length == -1) {
    50          
375 5 100         mblk = (length == 0) ? newRV_noinc((SV *)newAV()) : newSVpvn(NULL, 0);
376 5           parser->mblk_level--;
377 5 100         if (parser->mblk_level > 0)
378 3           _mblk_status_fetch(parser);
379 5 100         if (_reply_completed(parser, mblk)) return 1;
380             }
381 24           else return 0;
382             }
383 70 50         else if (parser->state == RDBP_WAIT_BUCKS) {
384             /* remove first character from the buffer */
385 70           pv = SvPVX(parser->buffer);
386 70           op = *pv;
387 70           sv_chop(parser->buffer, pv + 1);
388              
389 70 100         if (op == '$') parser->state = RDBP_READ_BULK_LEN;
390 38 100         else if (op == ':') parser->state = RDBP_READ_NUMBER;
391 18 100         else if (op == '+') parser->state = RDBP_READ_LINE;
392 14 100         else if (op == '-') parser->state = RDBP_READ_ERROR;
393 13 50         else if (op == '*') {
394 13           parser->state = RDBP_READ_MBLK_LEN;
395 13           parser->mblk_level++;
396 13           _mblk_status_store(parser);
397             }
398 0           else croak("Invalid multi-bulk reply. Expected [$:+-*] but got something else");
399             }
400 173           }
401              
402             return 0;
403             }
404