File Coverage

blib/lib/Gearman/ResponseParser.pm
Criterion Covered Total %
statement 54 67 80.6
branch 14 20 70.0
condition 3 3 100.0
subroutine 10 12 83.3
pod 7 8 87.5
total 88 110 80.0


line stmt bran cond sub pod time code
1             package Gearman::ResponseParser;
2 8     8   1059 use version ();
  8         1390  
  8         279  
3             $Gearman::ResponseParser::VERSION = version->declare("2.003_002");
4              
5              
6 8     8   25 use strict;
  8         8  
  8         114  
7 8     8   20 use warnings;
  8         9  
  8         3984  
8              
9             =head1 NAME
10              
11             Gearman::ResponseParser - gearmand abstract response parser implementation
12              
13             =head1 DESCRIPTION
14              
15              
16             I is an abstract base class.
17              
18             See: L
19              
20             Subclasses should call this first, then add their own data in underscore members
21              
22             =head1 METHODS
23              
24             =cut
25              
26             # Gearman::ResponseParser::Danga (for Gearman::Client::Danga, the async version)
27             sub new {
28 4     4 0 1041 my $class = shift;
29 4         10 my %opts = @_;
30 4         6 my $src = delete $opts{'source'};
31 4 100       16 die "unsupported arguments '@{[keys %opts]}'" if %opts;
  1         15  
32              
33 3         10 my $self = bless {
34              
35             # the source object/socket that is primarily feeding this.
36             source => $src,
37             }, $class;
38              
39 3         15 $self->reset;
40 3         8 return $self;
41             } ## end sub new
42              
43             =head2 source()
44              
45             B source. The source is object/socket
46              
47             =cut
48              
49             sub source {
50 1     1 1 282 my $self = shift;
51 1         5 return $self->{source};
52             }
53              
54             =head2 on_packet($packet, $parser)
55              
56             subclasses should override this
57              
58             =cut
59              
60             sub on_packet {
61 1     1 1 298 my ($self, $packet, $parser) = @_;
62 1         7 die "SUBCLASSES SHOULD OVERRIDE THIS";
63             }
64              
65             =head2 on_error($msg, $parser)
66              
67             subclasses should override this
68              
69             =cut
70              
71             sub on_error {
72 2     2 1 326 my ($self, $errmsg, $parser) = @_;
73              
74             # NOTE: this interface will evolve.
75 2         15 die "SUBCLASSES SHOULD OVERRIDE THIS";
76             } ## end sub on_error
77              
78             =head2 reset()
79              
80             =cut
81              
82             sub reset {
83 12     12 1 468 my $self = shift;
84 12         23 $self->{header} = '';
85 12         27 $self->{pkt} = undef;
86             }
87              
88             =head2 parse_data($data)
89              
90             don't override:
91             FUTURE OPTIMIZATION: let caller say "you can own this scalarref", and then we can keep it
92             on the initial setting of $self->{data} and avoid copying into our own. overkill for now.
93              
94             =cut
95              
96             sub parse_data {
97 9     9 1 5106 my ($self, $data) = @_; # where $data is a scalar or scalarref to parse
98 9 50       18 my $dataref = ref $data ? $data : \$data;
99              
100             my $err = sub {
101 0     0   0 my $code = shift;
102 0         0 $self->on_error($code);
103 0         0 return undef;
104 9         25 };
105              
106 9         23 while (my $lendata = length $$data) {
107              
108             # read the header
109 16         15 my $hdr_len = length $self->{header};
110 16 100       21 unless ($hdr_len == 12) {
111 9         9 my $need = 12 - $hdr_len;
112 9         18 $self->{header} .= substr($$dataref, 0, $need, '');
113 9 100       14 next unless length $self->{header} == 12;
114              
115 8         20 my ($magic, $type, $len) = unpack("a4NN", $self->{header});
116 8 50       16 return $err->("malformed_magic") unless $magic eq "\0RES";
117              
118 8         6 my $blob = "";
119             $self->{pkt} = {
120 8         20 type => Gearman::Util::cmd_name($type),
121             len => $len,
122             blobref => \$blob,
123             };
124 8         19 next;
125             } ## end unless ($hdr_len == 12)
126              
127             # how much data haven't we read for the current packet?
128 7         8 my $need = $self->{pkt}{len} - length(${ $self->{pkt}{blobref} });
  7         10  
129              
130             # copy the MAX(need, have)
131 7 100       10 my $to_copy = $lendata > $need ? $need : $lendata;
132              
133 7         5 ${ $self->{pkt}{blobref} } .= substr($$dataref, 0, $to_copy, '');
  7         13  
134              
135 7 100       13 if ($to_copy == $need) {
136 5         12 $self->on_packet($self->{pkt}, $self);
137 5         18 $self->reset;
138             }
139             } ## end while (my $lendata = length...)
140              
141 9 100 100     27 if (defined($self->{pkt})
142 5         19 && length(${ $self->{pkt}{blobref} }) == $self->{pkt}{len})
143             {
144 3         7 $self->on_packet($self->{pkt}, $self);
145 3         9 $self->reset;
146             } ## end if (defined($self->{pkt...}))
147             } ## end sub parse_data
148              
149             =head2 eof()
150              
151             don't override
152              
153             =cut
154              
155             sub eof {
156 1     1 1 791 my $self = shift;
157              
158 1         3 $self->on_error("EOF");
159              
160             # ERROR if in middle of packet
161             } ## end sub eof
162              
163             =head2 parse_sock($sock)
164              
165             don't override
166              
167             C<$sock> is readable, we should sysread it and feed it to L
168              
169             =cut
170              
171             sub parse_sock {
172 0     0 1   my ($self, $sock) = @_;
173 0           my $data;
174 0           my $rv = sysread($sock, $data, 128 * 1024);
175              
176 0 0         if (!defined $rv) {
177 0           $self->on_error("read_error: $!");
178 0           return;
179             }
180              
181             # FIXME: EAGAIN , EWOULDBLOCK
182              
183 0 0         if (!$rv) {
184 0           $self->eof;
185 0           return;
186             }
187              
188 0           $self->parse_data(\$data);
189             } ## end sub parse_sock
190              
191             1;