File Coverage

blib/lib/Net/WebSocket/Server.pm
Criterion Covered Total %
statement 110 146 75.3
branch 24 58 41.3
condition 3 9 33.3
subroutine 19 26 73.0
pod 12 12 100.0
total 168 251 66.9


line stmt bran cond sub pod time code
1             package Net::WebSocket::Server;
2              
3 2     2   77559 use 5.006;
  2         13  
4 2     2   24 use strict;
  2         14  
  2         104  
5 2     2   13 use warnings FATAL => 'all';
  2         13  
  2         208  
6              
7 2     2   19 use Carp;
  2         5  
  2         337  
8 2     2   544 use IO::Socket::INET;
  2         23627  
  2         83  
9 2     2   3303 use IO::Select;
  2         4298  
  2         133  
10 2     2   1190 use Net::WebSocket::Server::Connection;
  2         8  
  2         246  
11 2     2   1603 use Time::HiRes qw(time);
  2         3984  
  2         8  
12 2     2   520 use List::Util qw(min);
  2         5  
  2         4477  
13              
14             our $VERSION = '0.004000';
15             $VERSION = eval $VERSION;
16              
17             $SIG{PIPE} = 'IGNORE';
18              
19             sub new {
20 1     1 1 69 my $class = shift;
21              
22 1         5 my %params = @_;
23              
24             my $self = {
25             listen => 80,
26             silence_max => 20,
27             tick_period => 0,
28             watch_readable => [],
29             watch_writable => [],
30       0     on_connect => sub{},
31       0     on_tick => sub{},
32       1     on_shutdown => sub{},
33 1         8 };
34              
35 1         12 while (my ($key, $value) = each %params ) {
36 4 50       11 croak "Invalid $class parameter '$key'" unless exists $self->{$key};
37 4 50 66     33 croak "$class parameter '$key' expected type is ".ref($self->{$key}) if ref $self->{$key} && ref $value ne ref $self->{$key};
38 4         14 $self->{$key} = $value;
39             }
40              
41 1         2 bless $self, $class;
42              
43             # send a ping every silence_max by checking whether data was received in the last silence_max/2
44 1         11 $self->{silence_checkinterval} = $self->{silence_max} / 2;
45              
46 1         3 foreach my $watchtype (qw(readable writable)) {
47 2         9 $self->{"select_$watchtype"} = IO::Select->new();
48 2         25 my $key = "watch_$watchtype";
49 2 50       3 croak "$class parameter '$key' expects an arrayref containing an even number of elements" unless @{$self->{$key}} % 2 == 0;
  2         17  
50 2         4 my @watch = @{$self->{$key}};
  2         6  
51 2         4 $self->{$key} = {};
52 2         5 $self->_watch($watchtype, @watch);
53             }
54              
55 1         6 return $self;
56             }
57              
58             sub watch_readable {
59 0     0 1 0 my $self = shift;
60 0 0       0 croak "watch_readable expects an even number of arguments" unless @_ % 2 == 0;
61 0         0 $self->_watch(readable => @_);
62             }
63              
64             sub watched_readable {
65 0     0 1 0 my $self = shift;
66 0 0       0 return $self->{watch_readable}{$_[0]}{cb} if @_;
67 0         0 return map {$_->{fh}, $_->{cb}} values %{$self->{watch_readable}};
  0         0  
  0         0  
68             }
69              
70             sub watch_writable {
71 0     0 1 0 my $self = shift;
72 0 0       0 croak "watch_writable expects an even number of arguments" unless @_ % 2 == 0;
73 0         0 $self->_watch(writable => @_);
74             }
75              
76             sub watched_writable {
77 0     0 1 0 my $self = shift;
78 0 0       0 return $self->{watch_writable}{$_[0]}{cb} if @_;
79 0         0 return map {$_->{fh}, $_->{cb}} values %{$self->{watch_writable}};
  0         0  
  0         0  
80             }
81              
82             sub _watch {
83 2     2   4 my $self = shift;
84 2         3 my $watchtype = shift;
85 2 50       5 croak "watch_$watchtype expects an even number of arguments after the type" unless @_ % 2 == 0;
86 2         11 for (my $i = 0; $i < @_; $i+=2) {
87 2         5 my ($fh, $cb) = ($_[$i], $_[$i+1]);
88 2 50       7 croak "watch_$watchtype expects the second value of each pair to be a coderef, but element $i was not" unless ref $cb eq 'CODE';
89 2 50       6 if ($self->{"watch_$watchtype"}{$fh}) {
90 0         0 carp "watch_$watchtype was given a filehandle at index $i which is already being watched; ignoring!";
91 0         0 next;
92             }
93 2         8 $self->{"select_$watchtype"}->add($fh);
94 2         92 $self->{"watch_$watchtype"}{$fh} = {fh=>$fh, cb=>$cb};
95             }
96             }
97              
98             sub unwatch_readable {
99 1     1 1 75 my $self = shift;
100 1         10 $self->_unwatch(readable => @_);
101             }
102              
103             sub unwatch_writable {
104 1     1 1 69 my $self = shift;
105 1         7 $self->_unwatch(writable => @_);
106             }
107              
108             sub _unwatch {
109 2     2   4 my $self = shift;
110 2         6 my $watchtype = shift;
111 2         7 foreach my $fh (@_) {
112 2         14 $self->{"select_$watchtype"}->remove($fh);
113 2         143 delete $self->{"watch_$watchtype"}{$fh};
114             }
115             }
116              
117             sub on {
118 0     0 1 0 my $self = shift;
119 0         0 my %params = @_;
120              
121 0         0 while (my ($key, $value) = each %params ) {
122 0 0       0 croak "Invalid event '$key'" unless exists $self->{"on_$key"};
123 0 0       0 croak "Expected a coderef for event '$key'" unless ref $value eq 'CODE';
124 0         0 $self->{"on_$key"} = $value;
125             }
126             }
127              
128             sub start {
129 1     1 1 2 my $self = shift;
130              
131 1 50       4 if (ref $self->{listen}) {
132             # if we got a server, make sure it's valid by clearing errors and checking errors anyway; if there's still an error, it's closed
133 1         43 $self->{listen}->clearerr;
134             croak "failed to start websocket server; the TCP server provided via 'listen' is invalid. (is the listening socket is closed? are you trying to reuse a server that has already shut down?)"
135 1 50       23 if $self->{listen}->error;
136             } else {
137             # if we merely got a port, set up a reasonable default tcp server
138             $self->{listen} = IO::Socket::INET->new(
139             Listen => 5,
140             LocalPort => $self->{listen},
141 0   0     0 Proto => 'tcp',
142             ReuseAddr => 1,
143             ) || croak "failed to listen on port $self->{listen}: $!";
144             }
145              
146 1         7 $self->{select_readable}->add($self->{listen});
147              
148 1         39 $self->{conns} = {};
149 1 50       8 my $silence_nextcheck = $self->{silence_max} ? (time + $self->{silence_checkinterval}) : 0;
150 1 50       3 my $tick_next = $self->{tick_period} ? (time + $self->{tick_period}) : 0;
151              
152 1         31 while ($self->{listen}->opened) {
153 22 50       387 my $silence_checktimeout = $self->{silence_max} ? ($silence_nextcheck - time) : undef;
154 22 50       64 my $tick_timeout = $self->{tick_period} ? ($tick_next - time) : undef;
155 22         51 my $timeout = min(grep {defined} ($silence_checktimeout, $tick_timeout));
  44         238  
156              
157 22         163 my ($ready_read, $ready_write, undef) = IO::Select->select($self->{select_readable}, $self->{select_writable}, undef, $timeout);
158 22 50       66414 foreach my $fh ($ready_read ? @$ready_read : ()) {
159 21 100       220 if ($fh == $self->{listen}) {
    100          
    50          
160 1         23 my $sock = $self->{listen}->accept;
161 1 50       407 next unless $sock;
162 1         9 my $conn = new Net::WebSocket::Server::Connection(socket => $sock, server => $self);
163 1         6 $self->{conns}{$sock} = {conn=>$conn, lastrecv=>time};
164 1         5 $self->{select_readable}->add($sock);
165 1         42 $self->{on_connect}($self, $conn);
166             } elsif ($self->{watch_readable}{$fh}) {
167 1         21 $self->{watch_readable}{$fh}{cb}($self, $fh);
168             } elsif ($self->{conns}{$fh}) {
169 19         95 my $connmeta = $self->{conns}{$fh};
170 19         97 $connmeta->{lastrecv} = time;
171 19         107 $connmeta->{conn}->recv();
172             } else {
173 0         0 warn "filehandle $fh became readable, but no handler took responsibility for it; removing it";
174 0         0 $self->{select_readable}->remove($fh);
175             }
176             }
177              
178 22 50       111 foreach my $fh ($ready_write ? @$ready_write : ()) {
179 1 50       7 if ($self->{watch_writable}{$fh}) {
180 1         12 $self->{watch_writable}{$fh}{cb}($self, $fh);
181             } else {
182 0         0 warn "filehandle $fh became writable, but no handler took responsibility for it; removing it";
183 0         0 $self->{select_writable}->remove($fh);
184             }
185             }
186              
187 22 50       80 if ($self->{silence_max}) {
188 22         78 my $now = time;
189 22 50       77 if ($silence_nextcheck < $now) {
190 0         0 my $lastcheck = $silence_nextcheck - $self->{silence_checkinterval};
191 0 0       0 $_->{conn}->send('ping') for grep { $_->{conn}->is_ready && $_->{lastrecv} < $lastcheck } values %{$self->{conns}};
  0         0  
  0         0  
192              
193 0         0 $silence_nextcheck = $now + $self->{silence_checkinterval};
194             }
195             }
196              
197 22 50 33     249 if ($self->{tick_period} && $tick_next < time) {
198 0         0 $self->{on_tick}($self);
199 0         0 $tick_next += $self->{tick_period};
200             }
201             }
202             }
203              
204 2     2 1 63 sub connections { grep {$_->is_ready} map {$_->{conn}} values %{$_[0]{conns}} }
  2         18  
  2         40  
  2         18  
205              
206             sub shutdown {
207 1     1 1 28 my ($self) = @_;
208 1         10 $self->{on_shutdown}($self);
209 1         8 $self->{select_readable}->remove($self->{listen});
210 1         161 $self->{listen}->shutdown(2);
211 1         71 $self->{listen}->close();
212 1         97 $_->disconnect(1001) for $self->connections;
213             }
214              
215             sub disconnect {
216 1     1 1 6 my ($self, $fh) = @_;
217 1         11 $self->{select_readable}->remove($fh);
218 1         52 $fh->close();
219 1         92 delete $self->{conns}{$fh};
220             }
221              
222             1; # End of Net::WebSocket::Server
223              
224             __END__