File Coverage

blib/lib/RedisDB/Parser/PP.pm
Criterion Covered Total %
statement 136 140 97.1
branch 83 92 90.2
condition 7 11 63.6
subroutine 21 21 100.0
pod 0 7 0.0
total 247 271 91.1


line stmt bran cond sub pod time code
1             package RedisDB::Parser::PP;
2              
3 2     2   699 use strict;
  2         3  
  2         43  
4 2     2   7 use warnings;
  2         4  
  2         76  
5             our $VERSION = "2.23";
6             $VERSION = eval $VERSION;
7              
8             =head1 NAME
9              
10             RedisDB::Parser::PP - redis protocol parser for RedisDB
11              
12             =head1 DESCRIPTION
13              
14             Pure Perl implementation of L. You should not use this
15             module directly. See details in L documentation.
16              
17             =cut
18              
19 2     2   892 use Encode qw();
  2         14719  
  2         37  
20 2     2   25 use RedisDB::Parser::Error;
  2         4  
  2         31  
21 2     2   7 use Carp;
  2         3  
  2         70  
22 2     2   8 use Try::Tiny;
  2         3  
  2         64  
23 2     2   8 use Scalar::Util qw(weaken);
  2         4  
  2         319  
24              
25             sub new {
26 4     4 0 3227 my ( $class, %params ) = @_;
27             my $self = {
28             utf8 => $params{utf8},
29             master => $params{master},
30             _error_class => $params{error_class} || "RedisDB::Parser::Error",
31             _default_cb => $params{default_callback},
32 4   50     31 _callbacks => [],
33             _buffer => '',
34             };
35 4         13 weaken( $self->{master} );
36 4         11 return bless $self, $class;
37             }
38              
39             sub build_request {
40 9     9 0 3738 my $self = shift;
41 9         13 my $nargs = @_;
42              
43 9         18 my $req = "*$nargs\015\012";
44 9 100       31 if ( $self->{utf8} ) {
45             $req .= '$' . length($_) . "\015\012" . $_ . "\015\012"
46 2         4 for map { Encode::encode( 'UTF-8', $_, Encode::FB_CROAK | Encode::LEAVE_SRC ) } @_;
  4         209  
47             }
48             else {
49 2     2   11 use bytes;
  2         3  
  2         8  
50 7         36 $req .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_;
51             }
52 9         108 return $req;
53             }
54              
55             sub push_callback {
56 27     27 0 20512 my ( $self, $cb ) = @_;
57 27         32 push @{ $self->{_callbacks} }, $cb;
  27         54  
58             }
59              
60             sub set_default_callback {
61 1     1 0 7 my ( $self, $cb ) = @_;
62 1         3 $self->{_default_cb} = $cb;
63             }
64              
65             sub callbacks {
66 3     3 0 7 scalar @{ shift->{_callbacks} };
  3         10  
67             }
68              
69             sub propagate_reply {
70 1     1 0 2 my ( $self, $reply ) = @_;
71 1 50       5 $self->{_default_cb}->( $self->{master}, $reply ) if $self->{_default_cb};
72 1         2 while ( my $cb = shift @{ $self->{_callbacks} } ) {
  4         10  
73 3         5 $cb->( $self->{master}, $reply );
74             }
75 1         2 return;
76             }
77              
78             sub parse {
79 30     30 0 15541 my ( $self, $data ) = @_;
80 30         52 $self->{_buffer} .= $data;
81 30         36 my $cnt = 0;
82 30   100     90 $cnt++ while length $self->{_buffer} and $self->_parse_reply;
83 28         55 return $cnt;
84             }
85              
86             # $self->_parse_reply
87             #
88             # checks if buffer contains full reply. Returns 1 if it is,
89             # invokes callback for the reply
90             my ( $READ_LINE, $READ_ERROR, $READ_NUMBER, $READ_BULK_LEN, $READ_BULK, $READ_MBLK_LEN,
91             $WAIT_BUCKS ) = 1 .. 7;
92              
93             sub _parse_reply {
94 38     38   47 my $self = shift;
95 38 50       68 return unless length $self->{_buffer};
96              
97             # if we not yet started parsing reply
98 38 100       60 unless ( $self->{_parse_state} ) {
99 25         49 my $type = substr( $self->{_buffer}, 0, 1, '' );
100 25         33 delete $self->{_parse_mblk_level};
101 25 100       75 if ( $type eq '+' ) {
    100          
    100          
    100          
    50          
102 2         3 $self->{_parse_state} = $READ_LINE;
103             }
104             elsif ( $type eq '-' ) {
105 3         4 $self->{_parse_state} = $READ_ERROR;
106             }
107             elsif ( $type eq ':' ) {
108 4         7 $self->{_parse_state} = $READ_NUMBER;
109             }
110             elsif ( $type eq '$' ) {
111 3         6 $self->{_parse_state} = $READ_BULK_LEN;
112             }
113             elsif ( $type eq '*' ) {
114 13         15 $self->{_parse_state} = $READ_MBLK_LEN;
115 13         23 $self->{_parse_mblk_level} = 1;
116             }
117             else {
118 0         0 die "Got invalid reply: $type$self->{_buffer}";
119             }
120             }
121              
122             # parse data
123 38         44 while (1) {
124 211 100       313 return unless length $self->{_buffer} >= 2;
125 204 100       503 if ( $self->{_parse_state} == $READ_LINE ) {
    100          
    100          
    100          
    100          
    100          
    50          
126 8 100       13 return unless defined( my $line = $self->_read_line );
127 6 100       8 return 1 if $self->_reply_completed($line);
128             }
129             elsif ( $self->{_parse_state} == $READ_ERROR ) {
130 5 100       8 return unless defined( my $line = $self->_read_line );
131 4         12 my $err = $self->{_error_class}->new($line);
132 4 100       8 return 1 if $self->_reply_completed($err);
133             }
134             elsif ( $self->{_parse_state} == $READ_NUMBER ) {
135 25 100       36 return unless defined( my $line = $self->_read_line );
136 24 100       83 die "Received invalid integer reply :$line" unless $line =~ /^-?[0-9]+$/;
137 23 100       47 return 1 if $self->_reply_completed( 0 + $line );
138             }
139             elsif ( $self->{_parse_state} == $READ_BULK_LEN ) {
140 35 50       56 return unless defined( my $len = $self->_read_line );
141 35 100       58 if ( $len >= 0 ) {
    50          
142 33         42 $self->{_parse_state} = $READ_BULK;
143 33         42 $self->{_parse_bulk_len} = $len;
144             }
145             elsif ( $len == -1 ) {
146 2 100       5 return 1 if $self->_reply_completed(undef);
147             }
148             }
149             elsif ( $self->{_parse_state} == $READ_BULK ) {
150 35 100       63 return unless length $self->{_buffer} >= 2 + $self->{_parse_bulk_len};
151 33         50 my $bulk = substr( $self->{_buffer}, 0, $self->{_parse_bulk_len}, '' );
152 33         40 substr $self->{_buffer}, 0, 2, '';
153 33 100       53 if ( $self->{utf8} ) {
154             try {
155 4     4   310 $bulk = Encode::decode( 'UTF-8', $bulk, Encode::FB_CROAK | Encode::LEAVE_SRC );
156             }
157             catch {
158 1     1   59 confess "Couldn't decode reply from the server, invalid UTF-8: '$bulk'";
159 4         19 };
160             }
161 32 100       208 return 1 if $self->_reply_completed($bulk);
162             }
163             elsif ( $self->{_parse_state} == $READ_MBLK_LEN ) {
164 26 50       48 return unless defined( my $len = $self->_read_line );
165 26 100 66     54 if ( $len > 0 ) {
    50          
166 21         25 $self->{_parse_mblk_len} = $len;
167 21         26 $self->{_parse_state} = $WAIT_BUCKS;
168 21         34 $self->{_parse_reply} = [];
169             }
170             elsif ( $len == 0 || $len == -1 ) {
171 5         6 $self->{_parse_mblk_level}--;
172 5 100       10 return 1 if $self->_reply_completed( $len ? undef : [] );
    100          
173             }
174             else {
175 0         0 die "Invalid multi-bulk reply: *$len\015\012$self->{_buffer}";
176             }
177             }
178             elsif ( $self->{_parse_state} == $WAIT_BUCKS ) {
179 70         97 my $char = substr( $self->{_buffer}, 0, 1, '' );
180 70 100       123 if ( $char eq '$' ) {
    100          
    100          
    100          
    50          
181 32         38 $self->{_parse_state} = $READ_BULK_LEN;
182             }
183             elsif ( $char eq ':' ) {
184 20         22 $self->{_parse_state} = $READ_NUMBER;
185             }
186             elsif ( $char eq '+' ) {
187 4         6 $self->{_parse_state} = $READ_LINE;
188             }
189             elsif ( $char eq '-' ) {
190 1         2 $self->{_parse_state} = $READ_ERROR;
191             }
192             elsif ( $char eq '*' ) {
193 13         15 $self->{_parse_state} = $READ_MBLK_LEN;
194 13         14 $self->{_parse_mblk_level}++;
195 13         25 push @{ $self->{_parse_mblk_store} },
196 13         14 [ $self->{_parse_mblk_len}, $self->{_parse_reply} ];
197             }
198             else {
199 0         0 die "Invalid multi-bulk reply. Expected [\$:+-*] but got $char";
200             }
201             }
202             }
203 0         0 return;
204             }
205              
206             sub _read_line {
207 99     99   110 my $self = shift;
208 99         140 my $pos = index $self->{_buffer}, "\015\012";
209 99         101 my $line;
210 99 100       143 if ( $pos >= 0 ) {
211              
212             # Got end of the line, add all stuff before \r\n
213             # to the reply string. Strip \r\n from the buffer
214 95         138 $line = substr( $self->{_buffer}, 0, $pos, '' );
215 95         109 substr $self->{_buffer}, 0, 2, '';
216             }
217 99         206 return $line;
218             }
219              
220             sub _mblk_item {
221 69     69   91 my ( $self, $value ) = @_;
222              
223 69         69 push @{ $self->{_parse_reply} }, $value;
  69         100  
224 69         76 my $repeat;
225 69 100       109 if ( --$self->{_parse_mblk_len} ) {
    100          
226 49         60 $self->{_parse_state} = $WAIT_BUCKS;
227 49         57 $repeat = 1;
228             }
229             elsif ( --$self->{_parse_mblk_level} ) {
230 10         11 my $res = $self->{_parse_reply};
231             ( $self->{_parse_mblk_len}, $self->{_parse_reply} ) =
232 10         11 @{ pop @{ $self->{_parse_mblk_store} } };
  10         10  
  10         19  
233 10         19 $repeat = $self->_mblk_item($res);
234             }
235             else {
236 10         11 $repeat = 0;
237             }
238              
239 69         145 return $repeat;
240             }
241              
242             sub _reply_completed {
243 72     72   110 my ( $self, $reply ) = @_;
244              
245 72 100       119 if ( $self->{_parse_mblk_level} ) {
246 59 100       90 return if $self->_mblk_item($reply);
247 10         20 $reply = delete $self->{_parse_reply};
248             }
249              
250 23         30 $self->{_parse_state} = undef;
251 23   33     36 my $cb = shift( @{ $self->{_callbacks} } ) || $self->{_default_cb};
252 23         49 $cb->( $self->{master}, $reply );
253 23         189 return 1;
254             }
255              
256             1;
257              
258             __END__