File Coverage

blib/lib/Gearman/ResponseParser.pm
Criterion Covered Total %
statement 54 67 80.6
branch 14 20 70.0
condition 3 3 100.0
subroutine 10 12 83.3
pod 7 8 87.5
total 88 110 80.0


line stmt bran cond sub pod time code
1             package Gearman::ResponseParser;
2 8     8   804 use version;
  8         1285  
  8         53  
3             $Gearman::ResponseParser::VERSION = qv("2.001_001");
4              
5 8     8   494 use strict;
  8         8  
  8         117  
6 8     8   22 use warnings;
  8         10  
  8         4108  
7              
8             =head1 NAME
9              
10             Gearman::ResponseParser - gearmand abstract response parser implementation
11              
12             =head1 DESCRIPTION
13              
14              
15             I is an abstract base class.
16              
17             See: L
18              
19             Subclasses should call this first, then add their own data in underscore members
20              
21             =head1 METHODS
22              
23             =cut
24              
25             # Gearman::ResponseParser::Danga (for Gearman::Client::Danga, the async version)
26             sub new {
27 4     4 0 982 my $class = shift;
28 4         11 my %opts = @_;
29 4         5 my $src = delete $opts{'source'};
30 4 100       14 die "unsupported arguments '@{[keys %opts]}'" if %opts;
  1         11  
31              
32 3         7 my $self = bless {
33              
34             # the source object/socket that is primarily feeding this.
35             source => $src,
36             }, $class;
37              
38 3         13 $self->reset;
39 3         6 return $self;
40             } ## end sub new
41              
42             =head2 source()
43              
44             B source. The source is object/socket
45              
46             =cut
47              
48             sub source {
49 1     1 1 325 my $self = shift;
50 1         4 return $self->{source};
51             }
52              
53             =head2 on_packet($packet, $parser)
54              
55             subclasses should override this
56              
57             =cut
58              
59             sub on_packet {
60 1     1 1 308 my ($self, $packet, $parser) = @_;
61 1         7 die "SUBCLASSES SHOULD OVERRIDE THIS";
62             }
63              
64             =head2 on_error($msg, $parser)
65              
66             subclasses should override this
67              
68             =cut
69              
70             sub on_error {
71 2     2 1 290 my ($self, $errmsg, $parser) = @_;
72              
73             # NOTE: this interface will evolve.
74 2         19 die "SUBCLASSES SHOULD OVERRIDE THIS";
75             } ## end sub on_error
76              
77             =head2 reset()
78              
79             =cut
80              
81             sub reset {
82 12     12 1 462 my $self = shift;
83 12         17 $self->{header} = '';
84 12         24 $self->{pkt} = undef;
85             }
86              
87             =head2 parse_data($data)
88              
89             don't override:
90             FUTURE OPTIMIZATION: let caller say "you can own this scalarref", and then we can keep it
91             on the initial setting of $self->{data} and avoid copying into our own. overkill for now.
92              
93             =cut
94              
95             sub parse_data {
96 9     9 1 4831 my ($self, $data) = @_; # where $data is a scalar or scalarref to parse
97 9 50       18 my $dataref = ref $data ? $data : \$data;
98              
99             my $err = sub {
100 0     0   0 my $code = shift;
101 0         0 $self->on_error($code);
102 0         0 return undef;
103 9         22 };
104              
105 9         21 while (my $lendata = length $$data) {
106              
107             # read the header
108 16         13 my $hdr_len = length $self->{header};
109 16 100       28 unless ($hdr_len == 12) {
110 9         8 my $need = 12 - $hdr_len;
111 9         15 $self->{header} .= substr($$dataref, 0, $need, '');
112 9 100       14 next unless length $self->{header} == 12;
113              
114 8         23 my ($magic, $type, $len) = unpack("a4NN", $self->{header});
115 8 50       16 return $err->("malformed_magic") unless $magic eq "\0RES";
116              
117 8         7 my $blob = "";
118             $self->{pkt} = {
119 8         16 type => Gearman::Util::cmd_name($type),
120             len => $len,
121             blobref => \$blob,
122             };
123 8         19 next;
124             } ## end unless ($hdr_len == 12)
125              
126             # how much data haven't we read for the current packet?
127 7         7 my $need = $self->{pkt}{len} - length(${ $self->{pkt}{blobref} });
  7         8  
128              
129             # copy the MAX(need, have)
130 7 100       13 my $to_copy = $lendata > $need ? $need : $lendata;
131              
132 7         6 ${ $self->{pkt}{blobref} } .= substr($$dataref, 0, $to_copy, '');
  7         12  
133              
134 7 100       14 if ($to_copy == $need) {
135 5         11 $self->on_packet($self->{pkt}, $self);
136 5         67 $self->reset;
137             }
138             } ## end while (my $lendata = length...)
139              
140 9 100 100     28 if (defined($self->{pkt})
141 5         22 && length(${ $self->{pkt}{blobref} }) == $self->{pkt}{len})
142             {
143 3         7 $self->on_packet($self->{pkt}, $self);
144 3         9 $self->reset;
145             } ## end if (defined($self->{pkt...}))
146             } ## end sub parse_data
147              
148             =head2 eof()
149              
150             don't override
151              
152             =cut
153              
154             sub eof {
155 1     1 1 719 my $self = shift;
156              
157 1         2 $self->on_error("EOF");
158              
159             # ERROR if in middle of packet
160             } ## end sub eof
161              
162             =head2 parse_sock($sock)
163              
164             don't override
165              
166             C<$sock> is readable, we should sysread it and feed it to L
167              
168             =cut
169              
170             sub parse_sock {
171 0     0 1   my ($self, $sock) = @_;
172 0           my $data;
173 0           my $rv = sysread($sock, $data, 128 * 1024);
174              
175 0 0         if (!defined $rv) {
176 0           $self->on_error("read_error: $!");
177 0           return;
178             }
179              
180             # FIXME: EAGAIN , EWOULDBLOCK
181              
182 0 0         if (!$rv) {
183 0           $self->eof;
184 0           return;
185             }
186              
187 0           $self->parse_data(\$data);
188             } ## end sub parse_sock
189              
190             1;