File Coverage

blib/lib/PocketIO/Connection.pm
Criterion Covered Total %
statement 149 220 67.7
branch 12 36 33.3
condition 23 30 76.6
subroutine 32 55 58.1
pod 23 24 95.8
total 239 365 65.4


line stmt bran cond sub pod time code
1             package PocketIO::Connection;
2              
3 14     14   3325 use strict;
  14         39  
  14         486  
4 14     14   74 use warnings;
  14         29  
  14         361  
5              
6 14     14   11241 use AnyEvent;
  14         39905  
  14         615  
7 14     14   105 use Scalar::Util qw(blessed);
  14         38  
  14         1028  
8              
9 14     14   21135 use PocketIO::Message;
  14         78  
  14         481  
10 14     14   9525 use PocketIO::Socket;
  14         46  
  14         448  
11 14     14   8207 use PocketIO::Sockets;
  14         42  
  14         424  
12 14     14   10154 use PocketIO::Broadcast;
  14         42  
  14         484  
13              
14 14     14   86 use constant DEBUG => $ENV{POCKETIO_CONNECTION_DEBUG};
  14         89  
  14         52166  
15              
16             sub new {
17 10     10 1 24270 my $class = shift;
18              
19 10         51 my $self = {@_};
20 10         41 bless $self, $class;
21              
22 10   100     114 $self->{connect_timeout} ||= 30;
23 10   100     66 $self->{reconnect_timeout} ||= 15;
24 10   100     77 $self->{close_timeout} ||= 15;
25              
26 10     1   75 $self->{on_connect_timeout} = sub { $_[0]->emit('connect_failed') };
  1         10  
27 10     1   66 $self->{on_reconnect_timeout} = sub { $_[0]->emit('reconnect_failed') };
  1         10  
28 10     1   56 $self->{on_close_timeout} = sub { $_[0]->close };
  1         8  
29              
30 10   50     69 $self->{max_messages_to_stage} ||= 32;
31 10         23 $self->{messages} = [];
32              
33 10   100 0   60 $self->{on_connect_failed} ||= sub { };
  0         0  
34 10   100 0   78 $self->{on_reconnect} ||= sub { };
  0         0  
35 10   100 0   252 $self->{on_reconnect_failed} ||= sub { };
  0         0  
36 10   50 0   87 $self->{on_message} ||= sub { };
  0         0  
37 10   50 0   97 $self->{on_disconnect} ||= sub { };
  0         0  
38 10   50 0   87 $self->{on_error} ||= sub { };
  0         0  
39 10   100 0   76 $self->{on_close} ||= sub { };
  0         0  
40              
41 10   33     88 $self->{socket} ||= $self->_build_socket;
42 10   100 3   93 my $on_connect = delete $self->{on_connect} || sub { };
  3         4  
43             $self->{on_connect} = sub {
44 4     4   9 my $self = shift;
45              
46             eval {
47 4 50       10 $on_connect->($self->{socket}, @{$self->{on_connect_args} || []});
  4         41  
48 4         28 1;
49 4 50       6 } || do {
50 0         0 warn "Connection error: $_";
51              
52 0         0 $self->close;
53             };
54 10         61 };
55              
56 10         19 DEBUG && $self->_debug('Connection created');
57              
58 10         41 $self->connecting;
59              
60 10         77339 return $self;
61             }
62              
63             sub new_passive {
64 0     0 0 0 my $class = shift;
65 0         0 my $self = {@_};
66 0         0 bless $self, $class;
67 0         0 return $self;
68             }
69              
70 1     1 1 715 sub socket { $_[0]->{socket} }
71              
72 0     0 1 0 sub pool { $_[0]->{pool} }
73              
74 0 0   0 1 0 sub type { @_ > 1 ? $_[0]->{type} = $_[1] : $_[0]->{type} }
75              
76 0     0 1 0 sub is_connected { $_[0]->{is_connected} }
77              
78             sub connecting {
79 10     10 1 20 my $self = shift;
80              
81 10         15 DEBUG && $self->_debug("State 'connecting'");
82              
83 10         41 $self->_start_timer('connect');
84             }
85              
86             sub reconnecting {
87 2     2 1 10 my $self = shift;
88              
89 2         3 DEBUG && $self->_debug("State 'reconnecting'");
90              
91 2         5 $self->_stop_timer('close');
92              
93 2         5 $self->_start_timer('reconnect');
94             }
95              
96             sub connected {
97 4     4 1 25 my $self = shift;
98              
99 4         8 DEBUG && $self->_debug("State 'connected'");
100              
101 4         15 $self->_stop_timer('connect');
102              
103 4         10 $self->{is_connected} = 1;
104              
105 4         53 my $message = PocketIO::Message->new(type => 'connect');
106 4         300 $self->write($message);
107              
108 4         12 $self->_start_timer('close');
109              
110 4         71 $self->emit('connect');
111              
112 4         37 return $self;
113             }
114              
115             sub reconnected {
116 1     1 1 15 my $self = shift;
117              
118 1         1 DEBUG && $self->_debug("State 'reconnected'");
119              
120 1         4 $self->_stop_timer('reconnect');
121              
122 1         2 $self->emit('reconnect');
123              
124 1         7 $self->_start_timer('close');
125              
126 1         12 return $self;
127             }
128              
129             sub disconnected {
130 0     0 1 0 my $self = shift;
131              
132 0         0 DEBUG && $self->_debug("State 'disconnected'");
133              
134 0         0 $self->_stop_timer('connect');
135 0         0 $self->_stop_timer('reconnect');
136 0         0 $self->_stop_timer('close');
137              
138 0         0 $self->{data} = '';
139 0         0 $self->{messages} = [];
140              
141 0         0 $self->{is_connected} = 0;
142              
143             $self->{disconnect_timer} = AnyEvent->timer(
144             after => 0,
145             cb => sub {
146 0 0   0   0 return unless $self;
147              
148 0 0       0 if ($self->{socket}) {
149 0 0       0 if (my $cb = $self->{socket}->on('disconnect')) {
150 0         0 $cb->($self->{socket});
151             }
152 0         0 undef $self->{socket};
153             }
154              
155 0         0 undef $self;
156             }
157 0         0 );
158              
159 0         0 return $self;
160             }
161              
162             sub close {
163 1     1 1 4 my $self = shift;
164              
165 1         11 my $message = PocketIO::Message->new(type => 'disconnect');
166 1         23 $self->write($message);
167              
168 1         5 $self->emit('close');
169              
170             #$self->disconnected;
171              
172 1         12 return $self;
173             }
174              
175             sub id {
176 10     10 1 16 my $self = shift;
177              
178 10   66     42 $self->{id} ||= $self->_generate_id;
179              
180 10         74 return $self->{id};
181             }
182              
183             sub on {
184 0     0 1 0 my $self = shift;
185 0         0 my $event = shift;
186              
187 0         0 my $name = "on_$event";
188              
189 0 0       0 unless (@_) {
190 0         0 DEBUG && $self->_debug("Event 'on_$event'");
191              
192 0         0 return $self->{$name};
193             }
194              
195 0         0 $self->{$name} = $_[0];
196              
197 0         0 return $self;
198             }
199              
200             sub emit {
201 8     8 1 21 my $self = shift;
202 8         19 my $event = shift;
203              
204 8         20 $event = "on_$event";
205              
206 8 50       43 return unless exists $self->{$event};
207              
208 8         12 DEBUG && $self->_debug("Emitting '$event'");
209              
210 8         37 $self->{$event}->($self, @_);
211              
212 8         213 return $self;
213             }
214              
215             sub stage_message {
216 5     5 1 11 my $self = shift;
217 5         10 my ($message) = @_;
218              
219 5 50       8 return if @{$self->{messages}} >= $self->{max_messages_to_stage};
  5         26  
220              
221 5         10 push @{$self->{messages}}, $message;
  5         13  
222              
223 5         12 return $self;
224             }
225              
226             sub has_staged_messages {
227 0     0 1 0 my $self = shift;
228              
229 0         0 return @{$self->{messages}} > 0;
  0         0  
230             }
231              
232             sub staged_message {
233 0     0 1 0 my $self = shift;
234              
235 0         0 return shift @{$self->{messages}};
  0         0  
236             }
237              
238             sub send_heartbeat {
239 0     0 1 0 my $self = shift;
240              
241 0         0 $self->{heartbeat}++;
242              
243 0         0 DEBUG && $self->_debug('Send heartbeat');
244              
245 0         0 my $message = PocketIO::Message->new(type => 'heartbeat');
246              
247 0         0 return $self->write($message);
248             }
249              
250             sub send {
251 0     0 1 0 my $self = shift;
252 0         0 my ($message) = @_;
253              
254 0         0 $message = $self->_build_message($message);
255              
256 0         0 $self->write($message);
257              
258 0         0 return $self;
259             }
260              
261             sub broadcast {
262 0     0 1 0 my $self = shift;
263              
264 0         0 return PocketIO::Broadcast->new(conn => $self, pool => $self->pool);
265             }
266              
267             sub sockets {
268 0     0 1 0 my $self = shift;
269              
270 0         0 return PocketIO::Sockets->new(pool => $self->pool);
271             }
272              
273             sub parse_message {
274 5     5 1 7564 my $self = shift;
275 5         11 my ($message) = @_;
276              
277 5         8 DEBUG && $self->_debug("Received '" . substr($message, 0, 80) . "'");
278              
279 5         44 $message = PocketIO::Message->new->parse($message);
280 5 50       722 return unless $message;
281              
282 5         863 $self->_stop_timer('close');
283              
284 5 100       21 if ($message->is_message) {
    50          
    0          
285 4         61 $self->{socket}->on('message')->($self->{socket}, $message->data);
286             }
287             elsif ($message->type eq 'event') {
288 1         17 my $name = $message->data->{name};
289 1         7 my $args = $message->data->{args};
290              
291 1         12 my $id = $message->id;
292              
293             $self->{socket}->on($name)->(
294             $self->{socket},
295             @$args => sub {
296 0     0   0 my $message = PocketIO::Message->new(
297             type => 'ack',
298             message_id => $id,
299             args => [@_]
300             );
301              
302 0         0 $self->write($message);
303             }
304 1 50       8 ) if defined $self->{socket}->on($name);
305             }
306             elsif ($message->type eq 'heartbeat') {
307              
308             # TODO
309             }
310             else {
311              
312             # TODO
313             }
314              
315 5         36 $self->_start_timer('close');
316              
317 5         98 return $self;
318             }
319              
320             sub write {
321 5     5 1 14 my $self = shift;
322 5         11 my ($bytes) = @_;
323              
324 5         20 $self->_restart_timer('close');
325              
326 5 50       130 $bytes = $bytes->to_bytes if blessed $bytes;
327              
328 5 50       232 if ($self->{on_write}) {
329 0         0 DEBUG && $self->_debug("Writing '" . substr($bytes, 0, 50) . "'");
330 0         0 $self->emit('write', $bytes);
331             }
332             else {
333 5         10 DEBUG && $self->_debug("Staging '" . substr($bytes, 0, 50) . "'");
334 5         30 $self->stage_message($bytes);
335             }
336             }
337              
338             sub _start_timer {
339 27     27   47 my $self = shift;
340 27         44 my ($timer) = @_;
341              
342 27         68 my $timeout = $self->{"${timer}_timeout"};
343 27 50       80 return if (!defined $timeout);
344              
345 27         33 DEBUG && $self->_debug("Start '${timer}_timer' ($timeout)");
346              
347             $self->{"${timer}_timer"} = AnyEvent->timer(
348             after => $timeout,
349             cb => sub {
350 3     3   292182 DEBUG && $self->_debug("Timeout '${timer}_timeout'");
351              
352 3         50 $self->{"on_${timer}_timeout"}->($self);
353             }
354 27         246 );
355             }
356              
357             sub _stop_timer {
358 17     17   30 my $self = shift;
359 17         26 my ($timer) = @_;
360              
361 17         26 DEBUG && $self->_debug("Stop '${timer}_timer'");
362              
363 17         143 delete $self->{"${timer}_timer"};
364             }
365              
366             sub _restart_timer {
367 5     5   10 my $self = shift;
368 5         10 my ($timer) = @_;
369              
370 5         24 $self->_stop_timer($timer);
371 5         27 $self->_start_timer($timer);
372             }
373              
374             sub _build_message {
375 0     0   0 my $self = shift;
376 0         0 my ($message) = @_;
377              
378 0 0       0 return $message if blessed $message;
379              
380 0         0 return PocketIO::Message->new(data => $message);
381             }
382              
383             sub _generate_id {
384 3     3   9 my $self = shift;
385              
386 3         6 my $string = '';
387              
388 3         10 for (1 .. 16) {
389 48         167 $string .= int(rand(10));
390             }
391              
392 3         15 return $string;
393             }
394              
395             sub _debug {
396 0     0   0 my $self = shift;
397 0         0 my ($message) = @_;
398              
399 0         0 warn time . ' (' . $self->id . '): ' . $message . "\n";
400             }
401              
402             sub _build_socket {
403 10     10   20 my $self = shift;
404              
405 10         89 return PocketIO::Socket->new(conn => $self);
406             }
407              
408             1;
409             __END__