File Coverage

blib/lib/Gearman/Objects.pm
Criterion Covered Total %
statement 103 109 94.5
branch 36 46 78.2
condition 4 7 57.1
subroutine 23 24 95.8
pod 6 8 75.0
total 172 194 88.6


line stmt bran cond sub pod time code
1             package Gearman::Objects;
2 18     18   1313 use version ();
  18         1607  
  18         815  
3             $Gearman::Objects::VERSION = version->declare("2.002.002"); #TRIAL
4              
5 18     18   77 use strict;
  18         27  
  18         367  
6 18     18   69 use warnings;
  18         22  
  18         572  
7              
8             =head1 NAME
9              
10             Gearman::Objects - a parent class for L and L
11              
12             =head1 METHODS
13              
14             =cut
15              
16 18     18   69 use constant DEFAULT_PORT => 4730;
  18         19  
  18         1113  
17              
18 18     18   88 use Carp ();
  18         21  
  18         256  
19 18     18   4512 use IO::Socket::IP ();
  18         204131  
  18         361  
20 18     18   15910 use IO::Socket::SSL ();
  18         760961  
  18         482  
21 18     18   116 use Socket ();
  18         27  
  18         407  
22 18         121 use List::MoreUtils qw/
23             first_index
24 18     18   10678 /;
  18         156425  
25 18         1772 use Ref::Util qw/
26             is_plain_arrayref
27             is_plain_hashref
28             is_plain_ref
29             is_ref
30 18     18   20320 /;
  18         10682  
31              
32 18         143 use fields qw/
33             debug
34             job_servers
35             js_count
36             prefix
37             sock_cache
38 18     18   3451 /;
  18         8635  
39              
40             sub new {
41 23     23 0 9907 my $self = shift;
42 23         50 my (%opts) = @_;
43 23 100       80 unless (is_ref($self)) {
44 10         25 $self = fields::new($self);
45             }
46 23         3557 $self->{job_servers} = [];
47 23         38 $self->{js_count} = 0;
48              
49             $opts{job_servers}
50 23 100       91 && $self->set_job_servers($opts{job_servers});
51              
52 23         123 $self->debug($opts{debug});
53 23         99 $self->prefix($opts{prefix});
54              
55 23         46 $self->{sock_cache} = {};
56              
57 23         60 return $self;
58             } ## end sub new
59              
60             =head2 job_servers([$js])
61              
62             getter/setter
63              
64             C<$js> array reference or scalar
65              
66             =cut
67              
68             sub job_servers {
69 9     9 1 1901 my ($self) = shift;
70 9 100       57 (@_) && $self->set_job_servers(@_);
71              
72 9 100       21 return wantarray ? @{ $self->{job_servers} } : $self->{job_servers};
  8         37  
73             } ## end sub job_servers
74              
75             =head2 set_job_servers($js)
76              
77             set job_servers attribute by canonicalized C<$js>_
78              
79             =cut
80              
81             sub set_job_servers {
82 7     7 1 13 my $self = shift;
83 7         29 my $list = $self->canonicalize_job_servers(@_);
84              
85 7         10 $self->{js_count} = scalar @{$list};
  7         17  
86 7         14 return $self->{job_servers} = $list;
87             } ## end sub set_job_servers
88              
89             =head2 canonicalize_job_servers($js)
90              
91             C<$js> a string, hash reference or array reference of aforementioned.
92              
93             Hash reference should contain at least host key.
94              
95             All keys: host, port (4730 on default), use_ssl, key_file, cert_file,
96             ca_certs, socket_cb
97              
98             B [canonicalized list]
99              
100             =cut
101              
102             sub canonicalize_job_servers {
103 12     12 1 912 my ($self) = shift;
104 12         16 my @in;
105 12 100       42 if (is_plain_ref($_[0])) {
106 8 100       27 if (is_plain_arrayref($_[0])) {
    50          
107 4         8 @in = @{ $_[0] };
  4         17  
108             }
109             elsif (is_plain_hashref($_[0])) {
110 4         8 @in = ($_[0]);
111             }
112             else {
113 0         0 Carp::croak "unsupported argument type ", ref($_[0]);
114             }
115             } ## end if (is_plain_ref($_[0]...))
116             else {
117 4         7 @in = @_;
118             }
119              
120 12         22 my $out = [];
121 12         25 foreach my $i (@in) {
122 11 100       32 if (is_ref($i)) {
    100          
123 5   50     12 $i->{port} ||= Gearman::Objects::DEFAULT_PORT;
124             }
125             elsif ($i !~ /:/) {
126 2         3 $i .= ':' . Gearman::Objects::DEFAULT_PORT;
127             }
128 11         11 push @{$out}, $i;
  11         21  
129             } ## end foreach my $i (@in)
130              
131 12         46 return $out;
132             } ## end sub canonicalize_job_servers
133              
134             sub debug {
135 32     32 0 625 return shift->_property("debug", @_);
136             }
137              
138             =head2 prefix([$prefix])
139              
140             getter/setter
141              
142             =cut
143              
144             sub prefix {
145 31     31 1 570 return shift->_property("prefix", @_);
146             }
147              
148             =head2 socket($js, [$timeout])
149              
150             depends on C
151             prepare L
152             or L
153              
154             =over
155              
156             =item
157              
158             C<$host_port> peer address
159              
160             =item
161              
162             C<$timeout> default: 1
163              
164             =back
165              
166             B depends on C IO::Socket::(IP|SSL) on success
167              
168             =cut
169              
170             sub socket {
171 3     3 1 1909 my ($self, $js, $t) = @_;
172 3 50       14 unless (is_ref($js)) {
173 0         0 my ($h, $p) = ($js =~ /^(.*):(\d+)$/);
174 0         0 $js = { host => $h, port => $p };
175             }
176              
177             my %opts = (
178             PeerPort => $js->{port},
179             PeerHost => $js->{host},
180 3   100     32 Timeout => $t || 1
181             );
182              
183 3         6 my $sc = "IO::Socket::IP";
184 3 100       12 if ($js->{use_ssl}) {
185 2         4 $sc = "IO::Socket::SSL";
186 2         6 for (qw/ key_file cert_file ca_certs /) {
187 6 50       17 $js->{$_} || next;
188 0         0 $opts{ join('_', "SSL", $_) } = $js->{$_};
189             }
190             } ## end if ($js->{use_ssl})
191              
192 3 100       16 $js->{socket_cb} && $js->{socket_cb}->(\%opts);
193              
194 3         43 my $s = $sc->new(%opts);
195 3 100       86069 unless ($s) {
196             $self->debug() && Carp::carp("connection failed error='$@'",
197             $js->{use_ssl}
198 2 0       14 ? ", ssl_error='$IO::Socket::SSL::SSL_ERROR'"
    50          
199             : "");
200             } ## end unless ($s)
201              
202 3         18 return $s;
203             } ## end sub socket
204              
205             =head2 sock_nodelay($sock)
206              
207             set TCP_NODELAY on $sock, die on failure
208              
209             =cut
210              
211             sub sock_nodelay {
212 0     0 1 0 my ($self, $sock) = @_;
213 0 0       0 setsockopt($sock, Socket::IPPROTO_TCP, Socket::TCP_NODELAY, pack("l", 1))
214             or Carp::croak "setsockopt: $!";
215             }
216              
217             # _sock_cache($js, [$sock, $delete])
218             #
219             # B $sock || undef
220             #
221              
222             sub _sock_cache {
223 5     5   894 my ($self, $js, $sock, $delete) = @_;
224 5         13 my $hp = $self->_js_str($js);
225 5 100       14 if ($sock) {
226 3         8 $self->{sock_cache}->{$hp} = $sock;
227             }
228              
229             return $delete
230             ? delete($self->{sock_cache}->{$hp})
231 5 100       30 : $self->{sock_cache}->{$hp};
232             } ## end sub _sock_cache
233              
234             #
235             # _property($name, [$value])
236             # set/get
237             sub _property {
238 63     63   66 my $self = shift;
239 63         70 my $name = shift;
240 63 50       114 $name || return;
241 63 100       121 if (@_) {
242 50         109 $self->{$name} = shift;
243             }
244              
245 63         127 return $self->{$name};
246             } ## end sub _property
247              
248             #
249             #_js_str($js)
250             #
251             # return host:port
252             sub _js_str {
253 10     10   369 my ($self, $js) = @_;
254 10 100       30 return is_plain_hashref($js) ? join(':', @{$js}{qw/host port/}) : $js;
  6         23  
255             }
256              
257             #
258             # _js($js_str)
259             #
260             # return job_servers item || undef
261             #
262             sub _js {
263 1     1   3 my ($self, $js_str) = @_;
264 1         3 my @s = $self->job_servers();
265 1     1   22 my $i = first_index { $js_str eq $self->_js_str($_) } @s;
  1         3  
266 1 50 33     13 return ($i == -1 || $i > $#s) ? undef : $s[$i];
267             } ## end sub _js
268              
269             1;