File Coverage

blib/lib/AnyEvent/PocketIO/Client.pm
Criterion Covered Total %
statement 138 155 89.0
branch 23 42 54.7
condition 10 18 55.5
subroutine 29 33 87.8
pod 11 15 73.3
total 211 263 80.2


line stmt bran cond sub pod time code
1             package AnyEvent::PocketIO::Client;
2              
3 9     9   1681311 use strict;
  9         25  
  9         334  
4 7     7   55 use warnings;
  7         12  
  7         237  
5 7     7   37 use Carp ();
  7         36  
  7         296  
6 7     7   41 use AnyEvent;
  7         13  
  7         201  
7 7     7   42 use AnyEvent::Handle;
  7         15  
  7         190  
8 7     7   38 use AnyEvent::Socket;
  7         12  
  7         1091  
9 7     7   42 use PocketIO::Handle;
  7         16  
  7         187  
10 7     7   40 use PocketIO::Connection;
  7         20  
  7         16033  
11              
12             my %RESERVED_EVENT = map { $_ => 1 }
13             qw/message connect disconnect open close error retry reconnect/;
14              
15             our $VERSION = '0.01';
16              
17              
18             sub new {
19 9     9 1 351963 my $this = shift;
20 9   33     368 my $class = ref $this || $this;
21 9         195 bless {
22             handshake_timeout => 10,
23             open_timeout => 10,
24             @_,
25             }, $class;
26             }
27              
28 7     7 0 44 sub handle { $_[0]->{ handle }; }
29              
30 30     30 1 155 sub conn { $_[0]->{ conn }; }
31              
32 0     0 0 0 sub socket { $_[0]->conn->socket; }
33              
34             sub _start_timer {
35 17     17   72 my ( $self, $timer_name, $cb ) = @_;
36 17   50     103 my $after = $self->{ "${timer_name}_timeout" } || 0;
37 17         168 $self->{ "${timer_name}_timer" } = AnyEvent->timer( after => $after, cb => $cb );
38             }
39              
40             sub _stop_timer {
41 15     15   46 my ( $self, $timer_name ) = @_;
42 15         271 delete $self->{ "${timer_name}_timer" };
43             }
44              
45             sub handshake {
46 10     10 1 44236 my ( $self, $host, $port, $cb ) = @_;
47              
48 10   50 0   63 $cb ||= sub {};
  0         0  
49              
50             tcp_connect( $host, $port,
51             sub {
52 10 50   10   8931 my ($fh) = @_ or return $cb->( { code => 500, message => $! }, $self );
53              
54 10         39 @{$self}{qw/host port/} = ( $host, $port );
  10         95  
55              
56             my $socket = AnyEvent::Handle->new(
57             fh => $fh,
58             on_error => sub {
59 1         44 $self->disconnect(join(',', "error!:", $_[1], $_[2] ));
60             },
61 10         488 );
62              
63 10         1515 $socket->push_write("GET /socket.io/1/ HTTP/1.1\nHost: $host:$port\n\n");
64              
65 10         1138 my $read = 0; # handshake is finished?
66              
67             $self->_start_timer( 'handshake', sub {
68 1         2001453 $socket->fh->close;
69 1         139 $read++;
70 1         11 $self->_stop_timer( 'handshake' );
71 1         20 $cb->( { code => 500, message => 'Handshake timeout.' }, $self );
72 10         164 } );
73              
74             $socket->on_read( sub {
75 15 50       43925 return unless length $_[0]->rbuf;
76 15 100       234 return if $read;
77              
78 13         99 my ( $status_line ) = $_[0]->rbuf =~ /^(.+)\015\012/;
79 13         6459 my ( $code ) = $status_line =~ m{^HTTP/[.01]+ (\d+) };
80 13         28 my $error;
81              
82 13 100 66     148 if ( $code && $code != 200 ) {
83 2         9 $_[0]->rbuf =~ /\015\012\015\012(.*)/sm;
84 2         58 $error = { code => $code, message => $1 };
85 2         5 $read++;
86 2         11 $cb->( $error, $self );
87 2         7712 return;
88             }
89              
90 11         64 my ( $line ) = $_[0]->rbuf =~ /\015\012\015\012([^:]+:[^:]+:[^:]+:[^:]+)/sm;
91              
92 11 100       202 unless ( defined $line ) {
93 4         113 return;
94             }
95              
96 7         46 $self->_stop_timer( 'handshake' );
97              
98 7         48 my ( $sid, $hb_timeout, $con_timeout, $transports ) = split/:/, $line;
99 7         85 $transports = [split/,/, $transports];
100 7         26 $self->{ acceptable_transports } = $transports;
101 7         45 $self->{ session_id } = $sid;
102 7         107 $socket->destroy;
103 7         278 $read++;
104 7         45 $cb->( $error, $self, $sid, $hb_timeout, $con_timeout, $transports );
105 10         495 } );
106 10         295 } );
107              
108             }
109              
110             sub is_opened {
111 32     32 1 191 $_[0]->{ is_opened };
112             }
113              
114             sub opened {
115 6     6 0 49 $_[0]->{ is_opened } = 1;
116 6         28 $_[0]->_stop_timer( 'open' );
117             }
118              
119             sub reg_event {
120 1     1 1 17 my ( $self, $name, $cb ) = @_;
121 1 50 33     14 return Carp::carp('reg_event() must take a code reference.') if $cb && ref($cb) ne 'CODE';
122 1 50       4 return Carp::carp("$name is reserved event.") if exists $RESERVED_EVENT{ $name };
123              
124 1 50       3 if ( $self->is_opened ) {
125 1         2 $self->conn->socket->on( $name => $cb );
126             }
127             else {
128 0         0 $self->{ not_yet_reg_event }->{ $name } = $cb;
129             }
130             }
131              
132             sub on {
133 39     39 1 4990060 my ( $self, $event ) = @_;
134 39         117 my $name = "on_$event";
135              
136 39 100       226 if ( @_ > 2 ) {
137 18         120 $self->{ $name } = $_[2];
138 18         73 return $self;
139             }
140              
141 21   100 9   249 return $self->{ $name } ||= sub {};
  9         19  
142             }
143              
144             sub disconnect {
145 5     5 1 57 my ( $self ) = @_;
146              
147 5 100       69 return unless $self->is_opened;
148              
149 4         15 $self->{ is_opened } = 0;
150 4         18 $self->on('disconnect')->();
151 4         10061 $self->conn->close;
152 4         1301 $self->conn->disconnected;
153 4         205 delete $self->{ conn };
154             }
155              
156             sub emit {
157 4     4 1 3685 my $self = shift;
158 4 50       15 unless ( $self->is_opened ) {
159 0         0 Carp::carp('Not yet connected.');
160 0         0 return;
161             }
162 4         16 $self->conn->socket->emit( @_ );
163             }
164              
165             sub send {
166 0     0 1 0 my $self = shift;
167 0 0       0 unless ( $self->is_opened ) {
168 0         0 Carp::carp('Not yet connected.');
169 0         0 return;
170             }
171 0         0 $self->conn->socket->send( @_ );
172             }
173              
174             sub connect {
175 1     1 1 89 my ( $self, $endpoint ) = @_;
176 1         4 $self->conn->_stop_timer('close');
177 1         23 my $message = PocketIO::Message->new(type => 'connect');
178 1         33 $self->conn->write($message);
179 1         65 $self->conn->_start_timer('close');
180             #$self->conn->emit('connect');
181 1         27 $self->on('connect')->( $endpoint );
182             }
183              
184             sub transport {
185 7 50   7 0 32 $_[0]->{ transport } = $_[0] if @_ > 1;
186 7         43 $_[0]->{ transport };
187             }
188              
189             sub open {
190 7     7 1 7298 my ( $self, $trans, $cb ) = @_;
191 7         48 my $host = $self->{ host };
192 7         59 my $port = $self->{ port };
193 7         18 my $sid = $self->{ session_id };
194              
195 7 50 66     81 if ( $trans && ref $trans eq 'CODE' ) {
196 0         0 $cb = $trans; $trans = undef;
  0         0  
197             }
198              
199 7 50       27 unless ( $sid ) {
200 0         0 my $message = "Tried open but no session id.";
201 0 0       0 $cb ? return $cb->({ code => 500, message => $message }, $self)
202             : Carp::croak($message)
203             }
204              
205 7         47 $trans = 'websocket'; # TODO ||= $self->{ acceptable_transports }->[0];
206 7         42 $self->{ transport } = $self->_build_transport( $trans );
207              
208             tcp_connect( $host, $port,
209             sub {
210 7 0   7   4496 my ($fh) = @_
    50          
211             or ($cb ? return $cb->({ code => 500, message => $! }, $self)
212             : Carp::croak( $! )
213             );
214              
215             $self->{ handle } = PocketIO::Handle->new(
216             fh => $fh, heartbeat_timeout => $self->{ heartbeat_timeout }
217 7         259 );
218              
219 7         2206 $self->{ conn } = PocketIO::Connection->new();
220              
221             $self->_start_timer( 'open', sub {
222 1         998285 local $Carp::CarpLevel = 3;
223             #$self->handle->fh->close; # cases "Out of memory"?
224 1         16 $self->_stop_timer( 'open' );
225 1 50       60 $cb ? $cb->( { code => 500, message => 'Open timeout.' }, $self )
226             : Carp::croak('Open timeout.');
227 7         1418 } );
228              
229 7         116 $self->on('open')->( $self );
230              
231 7         93 return $self->transport->open( $self, $fh, $host, $port, $sid, $cb );
232             }
233 7         107 );
234             }
235              
236             sub _run_open_cb {
237 5     5   13 my ( $self, $cb ) = @_;
238 5         19 my $conn = $self->conn;
239              
240 5         28 $cb->( undef, $self );
241              
242 5         5261 for my $name ( keys %{ $self->{ not_yet_reg_event } } ) {
  5         63  
243             $conn->socket->on(
244 0         0 $name => delete $self->{ not_yet_reg_event }->{ $name }
245             );
246             }
247              
248             # default setting
249 5         15 for my $name ( qw/connect disconnect error/ ) {
250 15 50   0   292 $conn->socket->on( $name => sub {} ) unless $conn->socket->on( $name );
  0         0  
251             }
252             #$conn->socket->on('connect')->( $conn->socket );
253             }
254              
255              
256             my %Transport = (
257             websocket => 'WebSocket',
258             );
259              
260             sub _build_transport {
261 7     7   17 my ( $self, $transport_id ) = @_;
262 7         35 my $class = 'AnyEvent::PocketIO::Client::Transport::' . $Transport{ lc $transport_id };
263              
264 5     5   4824 eval qq{ use $class };
  5         15  
  5         87  
  7         937  
265 7 50       39 if ($@) { Carp::croak $@; }
  0         0  
266              
267 7         39 $class->new();
268             }
269              
270             1;
271             __END__