File Coverage

blib/lib/RedisDB/Parser/PP.pm
Criterion Covered Total %
statement 134 138 97.1
branch 83 92 90.2
condition 7 11 63.6
subroutine 21 21 100.0
pod 0 7 0.0
total 245 269 91.0


line stmt bran cond sub pod time code
1             package RedisDB::Parser::PP;
2              
3 2     2   767 use strict;
  2         2  
  2         60  
4 2     2   8 use warnings;
  2         2  
  2         80  
5             our $VERSION = "2.21";
6             $VERSION = eval $VERSION;
7              
8             =head1 NAME
9              
10             RedisDB::Parser::PP - redis protocol parser for RedisDB
11              
12             =head1 DESCRIPTION
13              
14             Pure Perl implementation of L. You should not use this
15             module directly. See details in L documentation.
16              
17             =cut
18              
19 2     2   1041 use Encode qw();
  2         16505  
  2         50  
20 2     2   14 use RedisDB::Parser::Error;
  2         2  
  2         35  
21 2     2   7 use Carp;
  2         2  
  2         100  
22 2     2   8 use Try::Tiny;
  2         2  
  2         75  
23 2     2   7 use Scalar::Util qw(weaken);
  2         3  
  2         418  
24              
25             sub new {
26 4     4 0 1923 my ( $class, %params ) = @_;
27 4   50     45 my $self = {
28             utf8 => $params{utf8},
29             master => $params{master},
30             _error_class => $params{error_class} || "RedisDB::Parser::Error",
31             _default_cb => $params{default_callback},
32             _callbacks => [],
33             _buffer => '',
34             };
35 4         16 weaken( $self->{master} );
36 4         14 return bless $self, $class;
37             }
38              
39             sub build_request {
40 9     9 0 2740 my $self = shift;
41 9         13 my $nargs = @_;
42              
43 9         17 my $req = "*$nargs\015\012";
44 9 100       27 if ( $self->{utf8} ) {
45 4         193 $req .= '$' . length($_) . "\015\012" . $_ . "\015\012"
46 2         4 for map { Encode::encode( 'UTF-8', $_, Encode::FB_CROAK | Encode::LEAVE_SRC ) } @_;
47             }
48             else {
49 2     2   11 use bytes;
  2         2  
  2         10  
50 7         123 $req .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_;
51             }
52 9         98 return $req;
53             }
54              
55             sub push_callback {
56 27     27 0 12489 my ( $self, $cb ) = @_;
57 27         31 push @{ $self->{_callbacks} }, $cb;
  27         53  
58             }
59              
60             sub set_default_callback {
61 1     1 0 11 my ( $self, $cb ) = @_;
62 1         3 $self->{_default_cb} = $cb;
63             }
64              
65             sub callbacks {
66 3     3 0 8 scalar @{ shift->{_callbacks} };
  3         15  
67             }
68              
69             sub propagate_reply {
70 1     1 0 1 my ( $self, $reply ) = @_;
71 1 50       7 $self->{_default_cb}->( $self->{master}, $reply ) if $self->{_default_cb};
72 1         2 while ( my $cb = shift @{ $self->{_callbacks} } ) {
  4         8  
73 3         7 $cb->( $self->{master}, $reply );
74             }
75 1         2 return;
76             }
77              
78             sub parse {
79 30     30 0 11186 my ( $self, $data ) = @_;
80 30         55 $self->{_buffer} .= $data;
81 30   100     118 1 while length $self->{_buffer} and $self->_parse_reply;
82             }
83              
84             # $self->_parse_reply
85             #
86             # checks if buffer contains full reply. Returns 1 if it is,
87             # invokes callback for the reply
88             my ( $READ_LINE, $READ_ERROR, $READ_NUMBER, $READ_BULK_LEN, $READ_BULK, $READ_MBLK_LEN,
89             $WAIT_BUCKS ) = 1 .. 7;
90              
91             sub _parse_reply {
92 38     38   36 my $self = shift;
93 38 50       84 return unless length $self->{_buffer};
94              
95             # if we not yet started parsing reply
96 38 100       68 unless ( $self->{_parse_state} ) {
97 25         48 my $type = substr( $self->{_buffer}, 0, 1, '' );
98 25         35 delete $self->{_parse_mblk_level};
99 25 100       101 if ( $type eq '+' ) {
    100          
    100          
    100          
    50          
100 2         4 $self->{_parse_state} = $READ_LINE;
101             }
102             elsif ( $type eq '-' ) {
103 3         5 $self->{_parse_state} = $READ_ERROR;
104             }
105             elsif ( $type eq ':' ) {
106 4         7 $self->{_parse_state} = $READ_NUMBER;
107             }
108             elsif ( $type eq '$' ) {
109 3         4 $self->{_parse_state} = $READ_BULK_LEN;
110             }
111             elsif ( $type eq '*' ) {
112 13         50 $self->{_parse_state} = $READ_MBLK_LEN;
113 13         23 $self->{_parse_mblk_level} = 1;
114             }
115             else {
116 0         0 die "Got invalid reply: $type$self->{_buffer}";
117             }
118             }
119              
120             # parse data
121 38         36 while (1) {
122 211 100       367 return unless length $self->{_buffer} >= 2;
123 204 100       642 if ( $self->{_parse_state} == $READ_LINE ) {
    100          
    100          
    100          
    100          
    100          
    50          
124 8 100       11 return unless defined( my $line = $self->_read_line );
125 6 100       13 return 1 if $self->_reply_completed($line);
126             }
127             elsif ( $self->{_parse_state} == $READ_ERROR ) {
128 5 100       15 return unless defined( my $line = $self->_read_line );
129 4         19 my $err = $self->{_error_class}->new($line);
130 4 100       13 return 1 if $self->_reply_completed($err);
131             }
132             elsif ( $self->{_parse_state} == $READ_NUMBER ) {
133 25 100       32 return unless defined( my $line = $self->_read_line );
134 24 100       94 die "Received invalid integer reply :$line" unless $line =~ /^-?[0-9]+$/;
135 23 100       46 return 1 if $self->_reply_completed( 0 + $line );
136             }
137             elsif ( $self->{_parse_state} == $READ_BULK_LEN ) {
138 35 50       45 return unless defined( my $len = $self->_read_line );
139 35 100       67 if ( $len >= 0 ) {
    50          
140 33         31 $self->{_parse_state} = $READ_BULK;
141 33         40 $self->{_parse_bulk_len} = $len;
142             }
143             elsif ( $len == -1 ) {
144 2 100       4 return 1 if $self->_reply_completed(undef);
145             }
146             }
147             elsif ( $self->{_parse_state} == $READ_BULK ) {
148 35 100       72 return unless length $self->{_buffer} >= 2 + $self->{_parse_bulk_len};
149 33         48 my $bulk = substr( $self->{_buffer}, 0, $self->{_parse_bulk_len}, '' );
150 33         33 substr $self->{_buffer}, 0, 2, '';
151 33 100       54 if ( $self->{utf8} ) {
152             try {
153 4     4   194 $bulk = Encode::decode( 'UTF-8', $bulk, Encode::FB_CROAK | Encode::LEAVE_SRC );
154             }
155             catch {
156 1     1   58 confess "Couldn't decode reply from the server, invalid UTF-8: '$bulk'";
157 4         22 };
158             }
159 32 100       169 return 1 if $self->_reply_completed($bulk);
160             }
161             elsif ( $self->{_parse_state} == $READ_MBLK_LEN ) {
162 26 50       34 return unless defined( my $len = $self->_read_line );
163 26 100 66     64 if ( $len > 0 ) {
    50          
164 21         26 $self->{_parse_mblk_len} = $len;
165 21         43 $self->{_parse_state} = $WAIT_BUCKS;
166 21         32 $self->{_parse_reply} = [];
167             }
168             elsif ( $len == 0 || $len == -1 ) {
169 5         6 $self->{_parse_mblk_level}--;
170 5 100       11 return 1 if $self->_reply_completed( $len ? undef : [] );
    100          
171             }
172             else {
173 0         0 die "Invalid multi-bulk reply: *$len\015\012$self->{_buffer}";
174             }
175             }
176             elsif ( $self->{_parse_state} == $WAIT_BUCKS ) {
177 70         84 my $char = substr( $self->{_buffer}, 0, 1, '' );
178 70 100       177 if ( $char eq '$' ) {
    100          
    100          
    100          
    50          
179 32         31 $self->{_parse_state} = $READ_BULK_LEN;
180             }
181             elsif ( $char eq ':' ) {
182 20         21 $self->{_parse_state} = $READ_NUMBER;
183             }
184             elsif ( $char eq '+' ) {
185 4         5 $self->{_parse_state} = $READ_LINE;
186             }
187             elsif ( $char eq '-' ) {
188 1         2 $self->{_parse_state} = $READ_ERROR;
189             }
190             elsif ( $char eq '*' ) {
191 13         12 $self->{_parse_state} = $READ_MBLK_LEN;
192 13         12 $self->{_parse_mblk_level}++;
193 13         9 push @{ $self->{_parse_mblk_store} },
  13         30  
194             [ $self->{_parse_mblk_len}, $self->{_parse_reply} ];
195             }
196             else {
197 0         0 die "Invalid multi-bulk reply. Expected [\$:+-*] but got $char";
198             }
199             }
200             }
201 0         0 return;
202             }
203              
204             sub _read_line {
205 99     99   89 my $self = shift;
206 99         127 my $pos = index $self->{_buffer}, "\015\012";
207 99         63 my $line;
208 99 100       156 if ( $pos >= 0 ) {
209              
210             # Got end of the line, add all stuff before \r\n
211             # to the reply string. Strip \r\n from the buffer
212 95         122 $line = substr( $self->{_buffer}, 0, $pos, '' );
213 95         97 substr $self->{_buffer}, 0, 2, '';
214             }
215 99         230 return $line;
216             }
217              
218             sub _mblk_item {
219 69     69   93 my ( $self, $value ) = @_;
220              
221 69         48 push @{ $self->{_parse_reply} }, $value;
  69         95  
222 69         50 my $repeat;
223 69 100       120 if ( --$self->{_parse_mblk_len} ) {
    100          
224 49         42 $self->{_parse_state} = $WAIT_BUCKS;
225 49         45 $repeat = 1;
226             }
227             elsif ( --$self->{_parse_mblk_level} ) {
228 10         11 my $res = $self->{_parse_reply};
229 10         20 ( $self->{_parse_mblk_len}, $self->{_parse_reply} ) =
230 10         7 @{ pop @{ $self->{_parse_mblk_store} } };
  10         7  
231 10         20 $repeat = $self->_mblk_item($res);
232             }
233             else {
234 10         10 $repeat = 0;
235             }
236              
237 69         166 return $repeat;
238             }
239              
240             sub _reply_completed {
241 72     72   73 my ( $self, $reply ) = @_;
242              
243 72 100       121 if ( $self->{_parse_mblk_level} ) {
244 59 100       72 return if $self->_mblk_item($reply);
245 10         20 $reply = delete $self->{_parse_reply};
246             }
247              
248 23         28 $self->{_parse_state} = undef;
249 23   33     19 my $cb = shift( @{ $self->{_callbacks} } ) || $self->{_default_cb};
250 23         53 $cb->( $self->{master}, $reply );
251 23         202 return 1;
252             }
253              
254             1;
255              
256             __END__