File Coverage

blib/lib/Net/WebSocket/Server.pm
Criterion Covered Total %
statement 111 149 74.5
branch 24 56 42.8
condition 6 12 50.0
subroutine 19 26 73.0
pod 12 12 100.0
total 172 255 67.4


line stmt bran cond sub pod time code
1             package Net::WebSocket::Server;
2              
3 2     2   17678 use 5.006;
  2         10  
  2         131  
4 2     2   11 use strict;
  2         4  
  2         99  
5 2     2   13 use warnings FATAL => 'all';
  2         9  
  2         142  
6              
7 2     2   10 use Carp;
  2         2  
  2         237  
8 2     2   2583 use IO::Socket::INET;
  2         17189  
  2         38  
9 2     2   4335 use IO::Select;
  2         3158  
  2         101  
10 2     2   1000 use Net::WebSocket::Server::Connection;
  2         5  
  2         85  
11 2     2   1359 use Time::HiRes qw(time);
  2         3121  
  2         10  
12 2     2   482 use List::Util qw(min);
  2         4  
  2         2959  
13              
14             our $VERSION = '0.003002';
15             $VERSION = eval $VERSION;
16              
17             $SIG{PIPE} = 'IGNORE';
18              
19             sub new {
20 1     1 1 46 my $class = shift;
21              
22 1         52 my %params = @_;
23              
24             my $self = {
25             listen => 80,
26             silence_max => 20,
27             tick_period => 0,
28             watch_readable => [],
29             watch_writable => [],
30 0     0   0 on_connect => sub{},
31 0     0   0 on_tick => sub{},
32 1     1   2 on_shutdown => sub{},
33 1         12 };
34              
35 1         9 while (my ($key, $value) = each %params ) {
36 4 50       8 croak "Invalid $class parameter '$key'" unless exists $self->{$key};
37 4 50 66     37 croak "$class parameter '$key' expected type is ".ref($self->{$key}) if ref $self->{$key} && ref $value ne ref $self->{$key};
38 4         15 $self->{$key} = $value;
39             }
40              
41 1         2 bless $self, $class;
42              
43             # send a ping every silence_max by checking whether data was received in the last silence_max/2
44 1         13 $self->{silence_checkinterval} = $self->{silence_max} / 2;
45              
46 1         2 foreach my $watchtype (qw(readable writable)) {
47 2         11 $self->{"select_$watchtype"} = IO::Select->new();
48 2         20 my $key = "watch_$watchtype";
49 2 50       1 croak "$class parameter '$key' expects an arrayref containing an even number of elements" unless @{$self->{$key}} % 2 == 0;
  2         7  
50 2         2 my @watch = @{$self->{$key}};
  2         3  
51 2         3 $self->{$key} = {};
52 2         6 $self->_watch($watchtype, @watch);
53             }
54              
55 1         5 return $self;
56             }
57              
58             sub watch_readable {
59 0     0 1 0 my $self = shift;
60 0 0       0 croak "watch_readable expects an even number of arguments" unless @_ % 2 == 0;
61 0         0 $self->_watch(readable => @_);
62             }
63              
64             sub watched_readable {
65 0     0 1 0 my $self = shift;
66 0 0       0 return $self->{watch_readable}{$_[0]}{cb} if @_;
67 0         0 return map {$_->{fh}, $_->{cb}} values %{$self->{watch_readable}};
  0         0  
  0         0  
68             }
69              
70             sub watch_writable {
71 0     0 1 0 my $self = shift;
72 0 0       0 croak "watch_writable expects an even number of arguments" unless @_ % 2 == 0;
73 0         0 $self->_watch(writable => @_);
74             }
75              
76             sub watched_writable {
77 0     0 1 0 my $self = shift;
78 0 0       0 return $self->{watch_writable}{$_[0]}{cb} if @_;
79 0         0 return map {$_->{fh}, $_->{cb}} values %{$self->{watch_writable}};
  0         0  
  0         0  
80             }
81              
82             sub _watch {
83 2     2   2 my $self = shift;
84 2         3 my $watchtype = shift;
85 2 50       4 croak "watch_$watchtype expects an even number of arguments after the type" unless @_ % 2 == 0;
86 2         6 for (my $i = 0; $i < @_; $i+=2) {
87 2         4 my ($fh, $cb) = ($_[$i], $_[$i+1]);
88 2 50       5 croak "watch_$watchtype expects the second value of each pair to be a coderef, but element $i was not" unless ref $cb eq 'CODE';
89 2 50       6 if ($self->{"watch_$watchtype"}{$fh}) {
90 0         0 carp "watch_$watchtype was given a filehandle at index $i which is already being watched; ignoring!";
91 0         0 next;
92             }
93 2         7 $self->{"select_$watchtype"}->add($fh);
94 2         81 $self->{"watch_$watchtype"}{$fh} = {fh=>$fh, cb=>$cb};
95             }
96             }
97              
98             sub unwatch_readable {
99 1     1 1 8 my $self = shift;
100 1         5 $self->_unwatch(readable => @_);
101             }
102              
103             sub unwatch_writable {
104 1     1 1 45 my $self = shift;
105 1         5 $self->_unwatch(writable => @_);
106             }
107              
108             sub _unwatch {
109 2     2   4 my $self = shift;
110 2         3 my $watchtype = shift;
111 2         4 foreach my $fh (@_) {
112 2         10 $self->{"select_$watchtype"}->remove($fh);
113 2         83 delete $self->{"watch_$watchtype"}{$fh};
114             }
115             }
116              
117             sub on {
118 0     0 1 0 my $self = shift;
119 0         0 my %params = @_;
120              
121 0         0 while (my ($key, $value) = each %params ) {
122 0 0       0 croak "Invalid event '$key'" unless exists $self->{"on_$key"};
123 0 0       0 croak "Expected a coderef for event '$key'" unless ref $value eq 'CODE';
124 0         0 $self->{"on_$key"} = $value;
125             }
126             }
127              
128             sub start {
129 1     1 1 2 my $self = shift;
130              
131 1 50       13 if (ref $self->{listen}) {
132             # if we got a server, make sure it's valid by clearing errors and checking errors anyway; if there's still an error, it's closed
133 1         19 $self->{listen}->clearerr;
134 1 50       26 croak "failed to start websocket server; the TCP server provided via 'listen' is invalid. (is the listening socket is closed? are you trying to reuse a server that has already shut down?)"
135             if $self->{listen}->error;
136             } else {
137             # if we merely got a port, set up a reasonable default tcp server
138 0   0     0 $self->{listen} = IO::Socket::INET->new(
139             Listen => 5,
140             LocalPort => $self->{listen},
141             Proto => 'tcp',
142             ReuseAddr => 1,
143             ) || croak "failed to listen on port $self->{listen}: $!";
144             }
145              
146 1         4 $self->{select_readable}->add($self->{listen});
147              
148 1         25 $self->{conns} = {};
149 1 50       6 my $silence_nextcheck = $self->{silence_max} ? (time + $self->{silence_checkinterval}) : 0;
150 1 50       3 my $tick_next = $self->{tick_period} ? (time + $self->{tick_period}) : 0;
151              
152 1   100     3 while (%{$self->{conns}} || $self->{listen}->opened) {
  23         135  
153 22 50       84 my $silence_checktimeout = $self->{silence_max} ? ($silence_nextcheck - time) : undef;
154 22 50       54 my $tick_timeout = $self->{tick_period} ? ($tick_next - time) : undef;
155 22         42 my $timeout = min(grep {defined} ($silence_checktimeout, $tick_timeout));
  44         124  
156              
157 22         193 my ($ready_read, $ready_write, undef) = IO::Select->select($self->{select_readable}, $self->{select_writable}, undef, $timeout);
158 22 50       39609 foreach my $fh ($ready_read ? @$ready_read : ()) {
159 21 100       200 if ($fh == $self->{listen}) {
    100          
    50          
160 1         16 my $sock = $self->{listen}->accept;
161 1 50       267 next unless $sock;
162 1         8 my $conn = new Net::WebSocket::Server::Connection(socket => $sock, server => $self);
163 1         8 $self->{conns}{$sock} = {conn=>$conn, lastrecv=>time};
164 1         3 $self->{select_readable}->add($sock);
165 1         29 $self->{on_connect}($self, $conn);
166             } elsif ($self->{watch_readable}{$fh}) {
167 1         7 $self->{watch_readable}{$fh}{cb}($self, $fh);
168             } elsif ($self->{conns}{$fh}) {
169 19         46 my $connmeta = $self->{conns}{$fh};
170 19         70 $connmeta->{lastrecv} = time;
171 19         90 $connmeta->{conn}->recv();
172             } else {
173 0         0 warn "filehandle $fh became readable, but no handler took responsibility for it; removing it";
174 0         0 $self->{select_readable}->remove($fh);
175             }
176             }
177              
178 22 50       96 foreach my $fh ($ready_write ? @$ready_write : ()) {
179 1 50       9 if ($self->{watch_writable}{$fh}) {
180 1         6 $self->{watch_writable}{$fh}{cb}($self, $fh);
181             } else {
182 0         0 warn "filehandle $fh became writable, but no handler took responsibility for it; removing it";
183 0         0 $self->{select_writable}->remove($fh);
184             }
185             }
186              
187 22 50       78 if ($self->{silence_max}) {
188 22         93 my $now = time;
189 22 50       84 if ($silence_nextcheck < $now) {
190 0         0 my $lastcheck = $silence_nextcheck - $self->{silence_checkinterval};
191 0         0 $_->{conn}->send('ping') for grep { $_->{lastrecv} < $lastcheck } values %{$self->{conns}};
  0         0  
  0         0  
192              
193 0         0 $silence_nextcheck = $now + $self->{silence_checkinterval};
194             }
195             }
196              
197 22 50 33     130 if ($self->{tick_period} && $tick_next < time) {
198 0         0 $self->{on_tick}($self);
199 0         0 $tick_next += $self->{tick_period};
200             }
201             }
202             }
203              
204 2     2 1 15 sub connections { map {$_->{conn}} values %{$_[0]{conns}} }
  2         13  
  2         13  
205              
206             sub shutdown {
207 1     1 1 13 my ($self) = @_;
208 1         5 $self->{on_shutdown}($self);
209 1         5 $self->{select_readable}->remove($self->{listen});
210 1         56 $self->{listen}->close();
211 1         41 $_->disconnect(1001) for $self->connections;
212             }
213              
214             sub disconnect {
215 1     1 1 3 my ($self, $fh) = @_;
216 1         4 $self->{select_readable}->remove($fh);
217 1         30 $fh->close();
218 1         84 delete $self->{conns}{$fh};
219             }
220              
221             1; # End of Net::WebSocket::Server
222              
223             __END__