File Coverage

blib/lib/Plack/App/WebSocket/Connection.pm
Criterion Covered Total %
statement 18 69 26.0
branch 0 14 0.0
condition n/a
subroutine 6 18 33.3
pod 3 4 75.0
total 27 105 25.7


line stmt bran cond sub pod time code
1             package Plack::App::WebSocket::Connection;
2 3     3   16 use strict;
  3         6  
  3         69  
3 3     3   16 use warnings;
  3         4  
  3         58  
4 3     3   12 use Carp;
  3         3  
  3         135  
5 3     3   15 use Scalar::Util qw(weaken refaddr);
  3         4  
  3         141  
6 3     3   15 use Devel::GlobalDestruction ();
  3         5  
  3         55  
7 3     3   19 use AnyEvent;
  3         5  
  3         1896  
8              
9             our $VERSION = "0.08";
10              
11             sub new {
12 0     0 0   my ($class, $conn, $responder) = @_;
13 0           my $self = bless {
14             connection => $conn,
15             responder => $responder,
16             handlers => {
17             message => [],
18             finish => [],
19             },
20             }, $class;
21 0           $self->_setup_internal_event_handlers();
22 0           return $self;
23             }
24              
25             sub _setup_internal_event_handlers {
26 0     0     my ($self) = @_;
27 0           weaken $self;
28             $self->{connection}->on(each_message => sub {
29 0 0   0     return if !defined($self);
30 0           my $strong_self = $self; ## make sure $self is alive during callback execution
31             $_->($self, $_[1]->body, $self->_cancel_for('message',$_))
32 0           foreach @{$self->{handlers}{message}};
  0            
33 0           });
34             $self->{connection}->on(finish => sub {
35 0 0   0     return if !defined($self);
36 0           my $strong_self = $self; ## make sure $self is alive during callback execution
37 0           $_->($self) foreach @{$self->{handlers}{finish}};
  0            
38 0           });
39             }
40              
41             sub _clear_event_handlers {
42 0     0     my ($self) = @_;
43 0           foreach my $handler_list (values %{$self->{handlers}}) {
  0            
44 0           @$handler_list = ();
45             }
46             }
47              
48             sub _cancel_for {
49 0     0     my( $self, $event, $handler ) = @_;
50             return sub {
51             $self->{handlers}{$event} = [
52 0           grep { refaddr($_) != refaddr($handler) }
53 0     0     @{ $self->{handlers}{$event} }
  0            
54             ];
55 0           };
56             }
57              
58             sub on {
59 0     0 1   my ($self, @handlers) = @_;
60              
61 0           my @cancel;
62              
63 0           while( my( $event, $handler ) = splice @handlers, 0, 2 ) {
64 0 0         croak "handler for event $event must be a code-ref" if ref($handler) ne "CODE";
65 0 0         $event = "finish" if $event eq "close";
66 0           my $handler_list = $self->{handlers}{$event};
67 0 0         croak "Unknown event: $event" if not defined $handler_list;
68 0           push(@$handler_list, $handler);
69 0           push @cancel, $self->_cancel_for($event,$handler);
70             }
71              
72 0 0         return wantarray ? @cancel : $cancel[0];
73             }
74              
75             sub send {
76 0     0 1   my ($self, $message) = @_;
77 0           $self->{connection}->send($message);
78             }
79              
80             sub close {
81 0     0 1   my ($self) = @_;
82 0           $self->{connection}->close;
83             }
84              
85             our $WAIT_FOR_FLUSHING_SEC = 5;
86              
87             sub DESTROY {
88 0     0     my ($self) = @_;
89 0 0         return if Devel::GlobalDestruction::in_global_destruction;
90 0           $self->_clear_event_handlers();
91 0           my $connection = $self->{connection};
92 0           $connection->close(); ## explicit close because $responder may keep the socket.
93 0           my $responder = $self->{responder};
94 0           my $w; $w = AnyEvent->timer(after => $WAIT_FOR_FLUSHING_SEC, cb => sub {
95 0     0     $responder->([200, ["Content-Type", "text/plain"], ["WebSocket finished"]]);
96 0           undef $w;
97 0           undef $responder;
98              
99             ## Prolong $connection's life as long as $responder. This is
100             ## necessary to make sure $connection actively shuts down the
101             ## socket. If $connection is destroyed immediately and the
102             ## kernel's write buffer is full, $connection may fail to shut
103             ## down the socket (because $connection delays the active
104             ## shutdown after sending all the buffered data). If that
105             ## happens, the socket stays open, which is bad.
106 0           undef $connection;
107 0           });
108             }
109              
110             1;
111              
112             __END__