File Coverage

lib/PAGI/Test/WebSocket.pm
Criterion Covered Total %
statement 116 124 93.5
branch 22 36 61.1
condition 16 28 57.1
subroutine 20 20 100.0
pod 11 11 100.0
total 185 219 84.4


line stmt bran cond sub pod time code
1             package PAGI::Test::WebSocket;
2             $PAGI::Test::WebSocket::VERSION = '0.002000';
3 5     5   26 use strict;
  5         6  
  5         176  
4 5     5   17 use warnings;
  5         7  
  5         213  
5 5     5   20 use Future::AsyncAwait;
  5         6  
  5         34  
6 5     5   203 use Future;
  5         7  
  5         115  
7 5     5   18 use Carp qw(croak);
  5         6  
  5         9218  
8              
9              
10             sub new {
11 14     14 1 28 my ($class, %args) = @_;
12              
13 14 50       83 croak "app is required" unless $args{app};
14 14 50       19 croak "scope is required" unless $args{scope};
15              
16             return bless {
17             app => $args{app},
18             scope => $args{scope},
19 14         80 send_queue => [], # Messages from test -> app
20             recv_queue => [], # Messages from app -> test
21             closed => 0,
22             accepted => 0,
23             close_code => undef,
24             close_reason => '',
25             _pending_receives => [], # Pending receive futures
26             }, $class;
27             }
28              
29             sub _start {
30 14     14   17 my ($self) = @_;
31              
32             # Create receive coderef for the app
33 31     31   523 my $receive = async sub {
34             # First call returns websocket.connect
35 31 100       67 if (!$self->{_connect_sent}) {
36 12         13 $self->{_connect_sent} = 1;
37 12         67 return { type => 'websocket.connect' };
38             }
39              
40             # Return queued message if available
41 19 50       17 if (@{$self->{send_queue}}) {
  19         26  
42 0         0 return shift @{$self->{send_queue}};
  0         0  
43             }
44              
45             # Return disconnect if closed
46 19 50       35 if ($self->{closed}) {
47 0   0     0 return { type => 'websocket.disconnect', code => $self->{close_code} // 1000 };
48             }
49              
50             # Create a future that will be resolved when data arrives
51 19         24 my $future = Future->new;
52 19         130 push @{$self->{_pending_receives}}, $future;
  19         28  
53 19         28 return await $future;
54 14         45 };
55              
56             # Create send coderef for the app
57 28     28   1596 my $send = async sub {
58 28         32 my ($event) = @_;
59              
60 28 100       64 if ($event->{type} eq 'websocket.accept') {
    100          
    100          
61 14         16 $self->{accepted} = 1;
62             }
63             elsif ($event->{type} eq 'websocket.send') {
64 11         12 push @{$self->{recv_queue}}, $event;
  11         69  
65             }
66             elsif ($event->{type} eq 'websocket.close') {
67 2         2 $self->{closed} = 1;
68 2   50     3 $self->{close_code} = $event->{code} // 1000;
69 2   50     10 $self->{close_reason} = $event->{reason} // '';
70             }
71              
72 28         83 return;
73 14         29 };
74              
75             # Start the app future but don't block on it
76 14         53 $self->{app_future} = $self->{app}->($self->{scope}, $receive, $send);
77              
78             # Wait for acceptance (the first two awaits in the app should complete immediately)
79             # This is a bit hacky but works: we need to let the app run until it accepts
80 14         1060 $self->_pump_app;
81              
82 14 50       22 croak "WebSocket connection not accepted" unless $self->{accepted};
83              
84 14         37 return $self;
85             }
86              
87             sub _pump_app {
88 34     34   35 my ($self) = @_;
89              
90             # This pumps the app future by checking if it's waiting on a receive
91             # If there are pending receives and we have data, resolve them
92 34   100     37 while (@{$self->{_pending_receives}} && @{$self->{send_queue}}) {
  53         2609  
  38         83  
93 19         29 my $future = shift @{$self->{_pending_receives}};
  19         25  
94 19         16 my $event = shift @{$self->{send_queue}};
  19         20  
95 19         44 $future->done($event);
96             }
97              
98             # If closed and there are pending receives, resolve them with disconnect
99 34 50 66     124 if ($self->{closed} && @{$self->{_pending_receives}}) {
  14         33  
100 0         0 while (my $future = shift @{$self->{_pending_receives}}) {
  0         0  
101 0   0     0 $future->done({ type => 'websocket.disconnect', code => $self->{close_code} // 1000 });
102             }
103             }
104             }
105              
106             sub send_text {
107 8     8 1 47 my ($self, $text) = @_;
108              
109 8 100       176 croak "Cannot send on closed WebSocket" if $self->{closed};
110              
111 7         6 push @{$self->{send_queue}}, {
  7         18  
112             type => 'websocket.receive',
113             text => $text,
114             };
115              
116             # Pump the app to process this message
117 7         13 $self->_pump_app;
118              
119 7         8 return $self;
120             }
121              
122             sub send_bytes {
123 1     1 1 6 my ($self, $bytes) = @_;
124              
125 1 50       3 croak "Cannot send on closed WebSocket" if $self->{closed};
126              
127 1         2 push @{$self->{send_queue}}, {
  1         2  
128             type => 'websocket.receive',
129             bytes => $bytes,
130             };
131              
132             # Pump the app to process this message
133 1         2 $self->_pump_app;
134              
135 1         2 return $self;
136             }
137              
138             sub send_json {
139 1     1 1 8 my ($self, $data) = @_;
140              
141 1         438 require JSON::MaybeXS;
142 1         7846 my $text = JSON::MaybeXS::encode_json($data);
143              
144 1         4 return $self->send_text($text);
145             }
146              
147             sub receive_text {
148 11     11 1 41 my ($self, $timeout) = @_;
149 11   100     35 $timeout //= 5;
150              
151             # Check if we have a text message already waiting
152 11         9 for my $i (0 .. $#{$self->{recv_queue}}) {
  11         26  
153 10         12 my $event = $self->{recv_queue}[$i];
154 10 50 33     35 if ($event->{type} eq 'websocket.send' && exists $event->{text}) {
155 10         11 splice @{$self->{recv_queue}}, $i, 1;
  10         18  
156 10         47 return $event->{text};
157             }
158             }
159              
160             # Check if connection closed
161 1 50       3 return undef if $self->{closed};
162              
163             # No message available yet
164 1         96 croak "Timeout waiting for WebSocket text message";
165             }
166              
167             sub receive_bytes {
168 1     1 1 3 my ($self, $timeout) = @_;
169 1   50     5 $timeout //= 5;
170              
171             # Check if we have a bytes message waiting
172 1         1 for my $i (0 .. $#{$self->{recv_queue}}) {
  1         3  
173 1         2 my $event = $self->{recv_queue}[$i];
174 1 50 33     4 if ($event->{type} eq 'websocket.send' && exists $event->{bytes}) {
175 1         1 splice @{$self->{recv_queue}}, $i, 1;
  1         2  
176 1         5 return $event->{bytes};
177             }
178             }
179              
180             # Check if connection closed
181 0 0       0 return undef if $self->{closed};
182              
183             # No message available yet
184 0         0 croak "Timeout waiting for WebSocket bytes message";
185             }
186              
187             sub receive_json {
188 2     2 1 10 my ($self, $timeout) = @_;
189              
190 2         6 my $text = $self->receive_text($timeout);
191 2 50       4 return undef unless defined $text;
192              
193 2         11 require JSON::MaybeXS;
194 2         13 return JSON::MaybeXS::decode_json($text);
195             }
196              
197             sub close {
198 12     12 1 343 my ($self, $code, $reason) = @_;
199              
200 12 50       22 return if $self->{closed};
201              
202 12   100     35 $code //= 1000;
203 12   100     29 $reason //= '';
204              
205 12         12 $self->{closed} = 1;
206 12         13 $self->{close_code} = $code;
207 12         16 $self->{close_reason} = $reason;
208              
209             # Push disconnect event
210 12         41 push @{$self->{send_queue}}, {
  12         69  
211             type => 'websocket.disconnect',
212             code => $code,
213             reason => $reason,
214             };
215              
216             # Pump the app to let it process the disconnect
217 12         25 $self->_pump_app;
218              
219 12         31 return $self;
220             }
221              
222             sub close_code {
223 2     2 1 3 my ($self) = @_;
224 2         8 return $self->{close_code};
225             }
226              
227             sub close_reason {
228 1     1 1 2 my ($self) = @_;
229 1         4 return $self->{close_reason};
230             }
231              
232             sub is_closed {
233 14     14 1 19 my ($self) = @_;
234 14         55 return $self->{closed};
235             }
236              
237             1;
238              
239             __END__