File Coverage

blib/lib/PAGI/Server/Connection.pm
Criterion Covered Total %
statement 1620 1967 82.3
branch 682 1164 58.5
condition 328 575 57.0
subroutine 136 148 91.8
pod 0 2 0.0
total 2766 3856 71.7


line stmt bran cond sub pod time code
1             package PAGI::Server::Connection;
2 109     109   4749864 use strict;
  109         222  
  109         3659  
3 109     109   401 use warnings;
  109         160  
  109         6644  
4              
5             our $VERSION = '0.002002';
6              
7 109     109   1537 use Future;
  109         21937  
  109         1877  
8 109     109   2536 use Future::AsyncAwait;
  109         14277  
  109         540  
9 109     109   5015 use Scalar::Util qw(weaken refaddr);
  109         201  
  109         5129  
10 109     109   43362 use Protocol::WebSocket::Handshake::Server;
  109         2500770  
  109         4757  
11 109     109   901 use Protocol::WebSocket::Frame;
  109         191  
  109         2204  
12 109     109   495 use Digest::SHA qw(sha1_base64);
  109         209  
  109         8838  
13 109     109   563 use Encode;
  109         159  
  109         7825  
14 109     109   25877 use URI::Escape qw(uri_unescape);
  109         110117  
  109         6076  
15 109     109   27491 use IO::Async::Timer::Countdown;
  109         87661  
  109         3951  
16 109     109   8451 use IO::Async::Timer::Periodic;
  109         23862  
  109         3261  
17 109     109   479 use Time::HiRes qw(gettimeofday tv_interval);
  109         149  
  109         1430  
18 109     109   55385 use PAGI::Server::AsyncFile;
  109         356  
  109         7102  
19 109     109   49532 use PAGI::Server::ConnectionState;
  109         570  
  109         5990  
20 109     109   43786 use PAGI::Server::TransportState;
  109         230  
  109         5359  
21              
22              
23 109     109   720 use constant FILE_CHUNK_SIZE => 65536; # 64KB chunks for file streaming
  109         163  
  109         109405  
24              
25             # Per-second cache for CLF timestamp in access log (same pattern as HTTP1::format_date)
26             my $_cached_log_timestamp;
27             my $_cached_log_time = 0;
28              
29             # =============================================================================
30             # Unrecognized Event Type Handler (PAGI spec compliance)
31             # =============================================================================
32             # Per main.mkdn: "Servers must raise exceptions if... The type field is unrecognized"
33              
34             sub _unrecognized_event_type {
35 0     0   0 my ($type, $protocol) = @_;
36 0         0 die "Unrecognized event type '$type' for $protocol protocol\n";
37             }
38              
39             # =============================================================================
40             # Header Validation (CRLF Injection Prevention)
41             # =============================================================================
42             # RFC 7230 Section 3.2.6: Field values MUST NOT contain CR or LF
43              
44             sub _validate_header_value {
45 40     40   82 my ($value) = @_;
46              
47 40 100       130 if ($value =~ /[\r\n\0]/) {
48 1         10 die "Invalid header value: contains CR, LF, or null byte\n";
49             }
50 39         161 return $value;
51             }
52              
53             sub _validate_header_name {
54 40     40   83 my ($name) = @_;
55              
56 40 50       267 if ($name =~ /[\r\n\0]/) {
57 0         0 die "Invalid header name: contains CR, LF, or null byte\n";
58             }
59 40 50       157 if ($name =~ /[[:cntrl:]]/) {
60 0         0 die "Invalid header name: contains control characters\n";
61             }
62 40         175 return $name;
63             }
64              
65             # RFC 6455 Section 11.3.4: Subprotocol must be a token (no whitespace, separators)
66             sub _validate_subprotocol {
67 1     1   3 my ($value) = @_;
68              
69 1 50       5 if ($value =~ /[\r\n\0\s]/) {
70 1         22 die "Invalid subprotocol: contains CR, LF, null, or whitespace\n";
71             }
72             # Token characters only (roughly)
73 0 0       0 if ($value !~ /^[\w\-\.]+$/) {
74 0         0 die "Invalid subprotocol: must be alphanumeric, dash, underscore, or dot\n";
75             }
76 0         0 return $value;
77             }
78              
79             =head1 NAME
80              
81             PAGI::Server::Connection - Per-connection state machine
82              
83             =head1 SYNOPSIS
84              
85             # Internal use by PAGI::Server
86             my $conn = PAGI::Server::Connection->new(
87             stream => $stream,
88             app => $app,
89             protocol => $protocol,
90             server => $server,
91             extensions => {},
92             );
93             $conn->start;
94              
95             =head1 DESCRIPTION
96              
97             PAGI::Server::Connection manages the state machine for a single client
98             connection. It handles:
99              
100             =over 4
101              
102             =item * Request parsing via Protocol::HTTP1
103              
104             =item * Scope creation for the application
105              
106             =item * Event queue management for $receive and $send
107              
108             =item * Protocol upgrades (WebSocket, SSE)
109              
110             =item * SSE over HTTP/1.1 and HTTP/2
111              
112             =item * Connection lifecycle and cleanup
113              
114             =back
115              
116             =cut
117              
118             sub new {
119 275     275 0 26761 my ($class, %args) = @_;
120              
121             my $self = bless {
122             stream => $args{stream},
123             app => $args{app},
124             protocol => $args{protocol},
125             server => $args{server},
126             extensions => $args{extensions} // {},
127             state => $args{state} // {},
128             tls_enabled => $args{tls_enabled} // 0,
129             timeout => $args{timeout} // 60, # Idle timeout in seconds
130             request_timeout => $args{request_timeout} // 0, # Request stall timeout in seconds (0 = disabled, default for performance)
131             ws_idle_timeout => $args{ws_idle_timeout} // 0, # WebSocket idle timeout (0 = disabled)
132             sse_idle_timeout => $args{sse_idle_timeout} // 0, # SSE idle timeout (0 = disabled)
133             max_body_size => $args{max_body_size}, # 0 = unlimited
134             access_log => $args{access_log}, # Filehandle for access logging
135             _access_log_formatter => $args{_access_log_formatter}, # Pre-compiled format closure
136             max_receive_queue => $args{max_receive_queue} // 1000, # Max WebSocket receive queue size
137             max_ws_frame_size => $args{max_ws_frame_size} // 65536, # Max WebSocket frame size in bytes
138             sync_file_threshold => $args{sync_file_threshold} // 65536, # Threshold for sync file reads (default 64KB)
139             validate_events => $args{validate_events} // 0, # Dev-mode event validation (0 = disabled)
140             # Send-side backpressure (watermarks in bytes)
141             # Defaults match Python asyncio: 64KB high, 16KB low (high/4)
142             write_high_watermark => $args{write_high_watermark} // 65536, # 64KB - pause sending above this
143             write_low_watermark => $args{write_low_watermark} // 16384, # 16KB - resume sending below this
144             _drain_waiters => [], # Pending Futures waiting for buffer drain
145             _drain_check_active => 0, # Flag to prevent redundant on_outgoing_empty setup
146             tls_info => undef, # Populated on first request if TLS
147             buffer => '',
148             closed => 0,
149             response_started => 0,
150             response_status => undef, # Track response status for logging
151             _response_size => 0, # Track response body bytes for logging
152             request_start => undef, # Track request start time for logging
153             idle_timer => undef, # IO::Async::Timer for idle timeout
154             stall_timer => undef, # IO::Async::Timer for request stall timeout
155             ws_idle_timer => undef, # IO::Async::Timer for WebSocket idle timeout
156             sse_idle_timer => undef, # IO::Async::Timer for SSE idle timeout
157             # Event queue for $receive
158             receive_queue => [],
159             receive_pending => undef,
160             # Track all pending receive Futures to cancel on close
161             receive_futures => [],
162             # Track request handling Future to prevent "lost future" warning
163             request_future => undef,
164             # Idempotency guard for disconnect handling
165             _disconnect_handled => 0,
166             # WebSocket state
167             websocket_mode => 0,
168             websocket_frame => undef, # Protocol::WebSocket::Frame for parsing
169             websocket_accepted => 0,
170             # SSE state
171             sse_mode => 0,
172             sse_started => 0,
173             sse_disconnect_reason => undef, # Reason for SSE disconnect (client_closed, write_error, etc.)
174             ws_disconnect_reason => undef, # Standard reason token for the app-facing websocket.disconnect event
175             ws_disconnect_code => undef, # Wire close code for that event (defaults to 1006, abnormal closure)
176             # Keepalive state (protocol-level ping/pong for WebSocket, comments for SSE)
177             ws_keepalive_timer => undef, # Periodic timer for sending WebSocket pings
178             ws_pong_timeout => undef, # Timeout timer for pong response
179             ws_waiting_pong => 0, # Flag: are we waiting for a pong?
180             ws_keepalive_interval => 0, # Current keepalive interval (0 = disabled)
181             ws_keepalive_timeout => 0, # Current pong timeout (0 = no timeout check)
182             sse_keepalive_timer => undef, # Periodic timer for sending SSE keepalive comments
183             sse_keepalive_comment => '', # Comment text to send
184             # HTTP/2 state
185             alpn_protocol => $args{alpn_protocol}, # ALPN-negotiated protocol (e.g. 'h2', 'http/1.1')
186             h2_protocol => $args{h2_protocol}, # PAGI::Server::Protocol::HTTP2 instance
187             h2c_enabled => $args{h2c_enabled} // 0, # Allow h2c preface detection on cleartext
188             is_h2 => 0, # Set during start() if HTTP/2 detected
189             h2_session => undef, # PAGI::Server::Protocol::HTTP2::Session
190             h2_streams => {}, # Per-stream state for HTTP/2
191             # Transport info (tcp or unix)
192             transport_type => $args{transport_type} // 'tcp',
193             transport_path => $args{transport_path}, # socket path for unix
194             # Cached connection info (populated in start(), used by _create_scope)
195 275   100     24276 client_host => '127.0.0.1',
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
      100        
196             client_port => 0,
197             server_host => '127.0.0.1',
198             server_port => 5000,
199             }, $class;
200              
201             # Extract TLS info if this is a TLS connection
202 275 100       1220 if ($self->{tls_enabled}) {
203 45         239 $self->_extract_tls_info;
204             }
205              
206 275         1271 return $self;
207             }
208              
209 109     109   867 use Socket qw(IPPROTO_TCP TCP_NODELAY);
  109         196  
  109         2986207  
210              
211             sub start {
212 273     273 0 15736 my ($self) = @_;
213              
214 273         563 my $stream = $self->{stream};
215 273         540 weaken(my $weak_self = $self);
216              
217             # Enable TCP_NODELAY to reduce latency for small responses (TCP only)
218 273   33     991 my $handle = $stream->write_handle // $stream->read_handle;
219 273 100 66     3888 if ($self->{transport_type} eq 'tcp' && $handle && $handle->can('setsockopt')) {
      100        
220 201         331 eval {
221 201         1039 $handle->setsockopt(IPPROTO_TCP, TCP_NODELAY, 1);
222             };
223             # Ignore errors - not all sockets support this
224             }
225              
226             # Cache connection info once (avoids per-request socket method calls)
227 273 100 66     5621 if ($self->{transport_type} eq 'unix') {
    100          
228             # Unix socket: no peer IP/port, server is identified by path
229 2         33 $self->{client_host} = undef;
230 2         30 $self->{client_port} = undef;
231 2         12 $self->{server_host} = $self->{transport_path};
232 2         18 $self->{server_port} = undef;
233             } elsif ($handle && $handle->can('peerhost')) {
234 200         377 eval {
235 200   100     803 $self->{client_host} = $handle->peerhost // '127.0.0.1';
236 200   100     10617 $self->{client_port} = $handle->peerport // 0;
237 200   50     5841 $self->{server_host} = $handle->sockhost // '127.0.0.1';
238 200   50     5675 $self->{server_port} = $handle->sockport // 5000;
239             };
240             # Ignore errors - keep defaults if extraction fails
241             }
242              
243             # Detect HTTP/2 via ALPN negotiation
244 273 50 100     5487 if ($self->{alpn_protocol} && $self->{alpn_protocol} eq 'h2' && $self->{h2_protocol}) {
      66        
245 25         132 $self->_init_h2_session;
246             }
247              
248             # Set up idle timeout timer
249 273 50 33     2160 if ($self->{timeout} && $self->{timeout} > 0 && $self->{server}) {
      33        
250             my $timer = IO::Async::Timer::Countdown->new(
251             delay => $self->{timeout},
252             on_expire => sub {
253 0 0   0   0 return unless $weak_self;
254 0 0       0 return if $weak_self->{closed};
255             # Close idle connection
256 0         0 $weak_self->_handle_disconnect_and_close('idle_timeout');
257             },
258 273         4298 );
259 273         19819 $self->{idle_timer} = $timer;
260 273         1062 $self->{server}->add_child($timer);
261 273         22018 $timer->start;
262             }
263              
264             # Set up read handler
265             $stream->configure(
266             on_read => sub {
267 504     504   38247620 my ($s, $buffref, $eof) = @_;
268 504 100       1732 return 0 unless $weak_self;
269              
270             # Reset idle timer on any read activity
271 502         2486 $weak_self->_reset_idle_timer;
272              
273             # Reset stall timer on read activity (if handling a request)
274 502 100       3028 $weak_self->_reset_stall_timer if $weak_self->{handling_request};
275              
276 502         1391 $weak_self->{buffer} .= $$buffref;
277 502         956 $$buffref = '';
278              
279 502 100       1564 if ($eof) {
280             # EOF means client closed - handle disconnect and cleanup
281 12         106 $weak_self->_handle_disconnect_and_close('client_closed');
282 12         896 return 0;
283             }
284              
285             # h2c detection: check if cleartext connection starts with HTTP/2 preface
286 490 100 66     1743 if ($weak_self->{h2c_enabled} && !$weak_self->{is_h2}) {
287 43 50       148 if (length($weak_self->{buffer}) >= 24) { # HTTP/2 preface is 24 bytes
288 43 100 66     383 if ($weak_self->{h2_protocol} && PAGI::Server::Protocol::HTTP2->detect_preface($weak_self->{buffer})) {
289 42         180 $weak_self->_init_h2_session;
290 42         124 $weak_self->{h2c_enabled} = 0; # Detection done
291             } else {
292 1         2 $weak_self->{h2c_enabled} = 0; # Not h2c, stop checking
293             }
294             } else {
295             # Not enough data yet to determine protocol, wait for more
296 0         0 return 0;
297             }
298             }
299              
300             # Wrap processing in eval to prevent exceptions from crashing the event loop
301             # This is critical - Protocol::WebSocket::Frame can throw exceptions for
302             # oversized payloads, and other parsing code may throw as well
303 490         853 eval {
304             # HTTP/2: feed data to session for frame processing
305 490 100       1305 if ($weak_self->{is_h2}) {
306 220         770 $weak_self->_h2_process_data;
307 220         510 return;
308             }
309              
310             # If in WebSocket mode, process WebSocket frames
311 270 100       681 if ($weak_self->{websocket_mode}) {
312 43         140 $weak_self->_process_websocket_frames;
313 42         465 return;
314             }
315              
316             # If we're waiting for body data, notify the receive handler
317 227 100 66     750 if ($weak_self->{receive_pending} && !$weak_self->{receive_pending}->is_ready) {
318 12         49 my $f = $weak_self->{receive_pending};
319 12         13 $weak_self->{receive_pending} = undef;
320 12         25 $f->done;
321             }
322              
323 227         1868 $weak_self->_try_handle_request;
324             };
325 490 100       14482 if (my $error = $@) {
326             # Log the error and close the connection gracefully
327 1         59 warn "PAGI connection error: $error";
328 1 50       4 return 0 unless $weak_self;
329 1         4 $weak_self->_handle_disconnect_and_close('server_error');
330             }
331 490         1758 return 0;
332             },
333             on_closed => sub {
334 270 100   270   74174152 return unless $weak_self;
335             # Stream closed - handle disconnect and remove from connections hash
336 215         1204 $weak_self->_handle_disconnect_and_close('client_closed');
337             },
338 273         67429 );
339             }
340              
341             sub _reset_idle_timer {
342 502     502   1431 my ($self) = @_;
343              
344 502 100       1634 return unless $self->{idle_timer};
345              
346             # Debounce: rescheduling the IO::Async countdown on every read is costly
347             # under keep-alive load -- it re-enqueues a loop timer each time. Reset at
348             # most ~20x/second; this coarsens the idle timeout by at most ~50ms, which is
349             # immaterial for a multi-second idle timeout but cuts the per-request timer
350             # churn dramatically under load.
351 456         1189 my $now = Time::HiRes::time();
352 456 100 100     2333 return if defined $self->{_idle_reset_at} && ($now - $self->{_idle_reset_at}) < 0.05;
353 321         717 $self->{_idle_reset_at} = $now;
354              
355 321         1932 $self->{idle_timer}->reset;
356 321 50       37059 $self->{idle_timer}->start unless $self->{idle_timer}->is_running;
357             }
358              
359             sub _stop_idle_timer {
360 310     310   603 my ($self) = @_;
361              
362 310 100       983 return unless $self->{idle_timer};
363 273 50       1508 $self->{idle_timer}->stop if $self->{idle_timer}->is_running;
364             # Remove timer completely so _reset_idle_timer won't restart it
365             # This is important for long-lived connections (WebSocket, SSE)
366 273 50       11759 if ($self->{server}) {
367 273         1474 $self->{server}->remove_child($self->{idle_timer});
368             }
369 273         25650 $self->{idle_timer} = undef;
370             }
371              
372             # =============================================================================
373             # HTTP/2 Session Initialization
374             # =============================================================================
375              
376             sub _init_h2_session {
377 67     67   139 my ($self) = @_;
378              
379 67         137 $self->{is_h2} = 1;
380              
381 67         159 weaken(my $weak_self = $self);
382              
383             $self->{h2_session} = $self->{h2_protocol}->create_session(
384             on_request => sub {
385 66     66   222 my ($stream_id, $pseudo, $headers, $has_body) = @_;
386 66 50       199 return unless $weak_self;
387 66         333 $weak_self->_h2_on_request($stream_id, $pseudo, $headers, $has_body);
388             },
389             on_body => sub {
390 22     22   69 my ($stream_id, $data, $eof) = @_;
391 22 50       48 return unless $weak_self;
392 22         121 $weak_self->_h2_on_body($stream_id, $data, $eof);
393             },
394             on_close => sub {
395 39     39   89 my ($stream_id, $error_code) = @_;
396 39 50       93 return unless $weak_self;
397 39         148 $weak_self->_h2_on_close($stream_id, $error_code);
398             },
399 67         924 );
400              
401             # Send initial SETTINGS to client
402 67         349 $self->_h2_write_pending;
403             }
404              
405             sub _h2_process_data {
406 221     221   403 my ($self) = @_;
407 221 50       536 return unless $self->{h2_session};
408              
409 221 100       638 if (length($self->{buffer}) > 0) {
410 220         1209 $self->{h2_session}->feed($self->{buffer});
411 220         492 $self->{buffer} = '';
412             }
413              
414 221         616 $self->_h2_write_pending;
415              
416             # Close connection when session is done (GOAWAY received or sent)
417 221 100 66     1014 if ($self->{h2_session} && !$self->{h2_session}->want_read) {
418 2         7 $self->_h2_write_pending; # Flush any remaining output
419 2         9 $self->_handle_disconnect_and_close;
420             }
421             }
422              
423             sub _h2_write_pending {
424 471     471   858 my ($self) = @_;
425 471 50       1060 return unless $self->{h2_session};
426 471         701 while (1) {
427 756         53000 my $data = $self->{h2_session}->extract;
428 756 100 66     3870 last unless defined $data && length($data) > 0;
429 285         1443 $self->{stream}->write($data);
430             }
431             }
432              
433             # =============================================================================
434             # HTTP/2 Stream Callbacks
435             # =============================================================================
436              
437             sub _h2_on_request {
438 67     67   804601 my ($self, $stream_id, $pseudo, $headers, $has_body) = @_;
439              
440             # Detect CONNECT method
441 67         136 my $is_websocket = 0;
442 67 100 50     355 if (($pseudo->{':method'} // '') eq 'CONNECT') {
443 17 100 100     82 if (($pseudo->{':protocol'} // '') eq 'websocket') {
444             # Extended CONNECT for WebSocket (RFC 8441)
445 16         29 $is_websocket = 1;
446             } else {
447             # Plain CONNECT not supported — defer response to avoid
448             # re-entrant nghttp2 calls (we're inside feed/mem_recv)
449 1         5 weaken(my $ws = $self);
450             $self->{server}->loop->later(sub {
451 1 50   1   107 return unless $ws;
452 1 50       3 return if $ws->{closed};
453 1         12 $ws->{h2_session}->submit_response($stream_id,
454             status => 501,
455             headers => [['content-type', 'text/plain']],
456             body => "CONNECT method not supported\n",
457             );
458 1         42 $ws->_h2_write_pending;
459 1         9 });
460 1         38 return;
461             }
462             }
463              
464             # Detect SSE (Accept: text/event-stream)
465 66         122 my $is_sse = 0;
466 66 100       216 if (!$is_websocket) {
467 50         195 for my $h (@$headers) {
468 58 100 100     322 if ($h->[0] eq 'accept' && $h->[1] =~ m{text/event-stream}) {
469 12         24 $is_sse = 1;
470 12         32 last;
471             }
472             }
473             }
474              
475             # Initialize per-stream state
476 66         1192 $self->{h2_streams}{$stream_id} = {
477             pseudo => $pseudo,
478             headers => $headers,
479             has_body => $has_body,
480             body => '',
481             body_complete => !$has_body,
482             body_pending => undef, # Future for body availability
483             receive_queue => [],
484             response_started => 0,
485             is_websocket => $is_websocket,
486             is_sse => $is_sse,
487             ws_accepted => 0,
488             ws_frame => undef, # Protocol::WebSocket::Frame for parsing
489             ws_connect_sent => 0,
490             };
491              
492             # Check Content-Length against max_body_size limit before dispatching
493             # (after stream init so _h2_on_body/_h2_on_close can find the stream)
494 66 100 100     341 if ($self->{max_body_size} && $has_body) {
495 6         16 for my $h (@$headers) {
496 12 100       27 if ($h->[0] eq 'content-length') {
497 1 50       4 if ($h->[1] > $self->{max_body_size}) {
498             # Defer response to avoid re-entrant nghttp2 calls
499 1         3 weaken(my $ws = $self);
500             $self->{server}->loop->later(sub {
501 1 50   1   68 return unless $ws;
502 1 50       4 return if $ws->{closed};
503 1         8 $ws->{h2_session}->submit_response($stream_id,
504             status => 413,
505             headers => [['content-type', 'text/plain']],
506             body => "Payload Too Large\n",
507             );
508 1         35 $ws->_h2_write_pending;
509 1         5 });
510 1         35 return;
511             }
512 0         0 last;
513             }
514             }
515             }
516              
517             # Defer dispatch to next event loop tick to prevent re-entrant nghttp2 calls
518 65         190 weaken(my $weak_self = $self);
519             $self->{server}->loop->later(sub {
520 65 50   65   3876 return unless $weak_self;
521 65 50       234 return if $weak_self->{closed};
522 65         366 $weak_self->_h2_dispatch_stream($stream_id);
523 65         363 });
524             }
525              
526             sub _h2_on_body {
527 22     22   52 my ($self, $stream_id, $data, $eof) = @_;
528              
529 22         47 my $stream = $self->{h2_streams}{$stream_id};
530 22 100       149 return unless $stream;
531              
532 21 50 66     79 if ($stream->{is_websocket} && $stream->{ws_accepted}) {
533             # WebSocket: DATA frames contain raw WebSocket frames
534 16 50       81 $self->_h2_process_ws_frames($stream_id, $stream, $data) if length($data);
535              
536 16 50       675 if ($eof) {
537             # END_STREAM = client closing the WebSocket stream
538 0         0 push @{$stream->{receive_queue}}, {
  0         0  
539             type => 'websocket.disconnect',
540             code => 1005,
541             reason => '',
542             };
543 0         0 $self->_h2_wake_pending($stream);
544             }
545 16         49 return;
546             }
547              
548 5 100       12 if (length($data) > 0) {
549 3         10 $stream->{body} .= $data;
550              
551             # Enforce max_body_size (0 = unlimited)
552 3 100 66     13 if ($self->{max_body_size} && length($stream->{body}) > $self->{max_body_size}) {
553 1         9 $self->{h2_session}->submit_response($stream_id,
554             status => 413,
555             headers => [['content-type', 'text/plain']],
556             body => 'Payload Too Large',
557             );
558             # No _h2_write_pending here — we're inside feed(); _h2_process_data
559             # flushes after feed() returns
560 1         39 $self->_h2_resolve_stream_drain_waiters($stream);
561             # Drop (don't fire) the app's on_drain fires: the stream is closing,
562             # not draining, and the transport handle is going away. Also break
563             # the $stream <-> transport_state cycle (the handle's measure/arm
564             # closures hold $stream strongly), or the stream state leaks for the
565             # life of the process once h2_streams drops its external ref.
566 1         2 $stream->{transport_drain_fires} = [];
567 1         2 delete $stream->{transport_state};
568 1         3 delete $self->{h2_streams}{$stream_id};
569 1         4 return;
570             }
571             }
572              
573 4 100       8 if ($eof) {
574 2         4 $stream->{body_complete} = 1;
575             }
576              
577 4         11 $self->_h2_wake_pending($stream);
578             }
579              
580             sub _h2_wake_pending {
581 56     56   120 my ($self, $stream) = @_;
582 56 100 66     272 if ($stream->{body_pending} && !$stream->{body_pending}->is_ready) {
583 15         108 my $f = $stream->{body_pending};
584 15         25 $stream->{body_pending} = undef;
585 15         43 $f->done;
586             }
587             }
588              
589             sub _h2_on_close {
590 39     39   100 my ($self, $stream_id, $error_code) = @_;
591              
592 39         97 my $stream = $self->{h2_streams}{$stream_id};
593 39 100       98 return unless $stream;
594              
595             # Mark body complete to unblock any pending receive
596 37         98 $stream->{body_complete} = 1;
597              
598             # Enqueue disconnect event
599 37 50       168 if ($stream->{is_websocket}) {
    100          
600 0         0 push @{$stream->{receive_queue}}, {
  0         0  
601             type => 'websocket.disconnect',
602             code => 1006,
603             reason => '',
604             };
605             } elsif ($stream->{is_sse}) {
606 1         2 push @{$stream->{receive_queue}}, {
  1         5  
607             type => 'sse.disconnect',
608             reason => 'client_closed',
609             };
610             } else {
611 36         56 push @{$stream->{receive_queue}}, { type => 'http.disconnect' };
  36         158  
612             }
613              
614 37         155 $self->_h2_wake_pending($stream);
615              
616             # Release any producer blocked on this stream's backpressure drain — the
617             # stream is closing, so it must not hang waiting for a queue that will
618             # never drain.
619 37         183 $self->_h2_resolve_stream_drain_waiters($stream);
620             # Drop (don't fire) the app's on_drain fires: this is a close, not a drain.
621             # Also break the $stream <-> transport_state cycle so the stream state can be
622             # collected once the deferred delete below drops h2_streams' external ref.
623 37         85 $stream->{transport_drain_fires} = [];
624 37         137 delete $stream->{transport_state};
625              
626             # Clean up after a delay (let any pending futures resolve)
627 37         182 weaken(my $weak_self = $self);
628             $self->{server}->loop->later(sub {
629 37 50   37   22724 return unless $weak_self;
630 37         400 delete $weak_self->{h2_streams}{$stream_id};
631 37         196 });
632             }
633              
634             # =============================================================================
635             # HTTP/2 Stream Dispatch (scope/receive/send creation)
636             # =============================================================================
637              
638             sub _h2_dispatch_stream {
639 65     65   182 my ($self, $stream_id) = @_;
640              
641 65         179 my $stream_state = $self->{h2_streams}{$stream_id};
642 65 100       191 return unless $stream_state;
643              
644 64         127 my ($scope, $receive, $send);
645              
646 64 100       310 if ($stream_state->{is_websocket}) {
    100          
647 16         71 $scope = $self->_h2_create_websocket_scope($stream_id, $stream_state);
648 16         62 $receive = $self->_h2_create_websocket_receive($stream_id, $stream_state);
649 16         61 $send = $self->_h2_create_websocket_send($stream_id, $stream_state);
650             } elsif ($stream_state->{is_sse}) {
651 12         65 $scope = $self->_h2_create_sse_scope($stream_id, $stream_state);
652 12         57 $receive = $self->_h2_create_sse_receive($stream_id, $stream_state);
653 12         55 $send = $self->_h2_create_sse_send($stream_id, $stream_state);
654             } else {
655 36         167 $scope = $self->_h2_create_scope($stream_id, $stream_state);
656 36         163 $receive = $self->_h2_create_receive($stream_id, $stream_state);
657 36         139 $send = $self->_h2_create_send($stream_id, $stream_state);
658             }
659              
660 64         158 weaken(my $weak_self = $self);
661              
662 64     64   129 my $future = (async sub {
663 64         106 eval {
664 64         335 await $weak_self->{app}->($scope, $receive, $send);
665             };
666 64         5288 my $error = $@;
667              
668             # If the application failed, OR returned without starting a response,
669             # synthesize a 500 (only possible while no response has begun). A clean
670             # return that produced no response is a protocol error, same as a throw.
671 64 100 66     474 if ($weak_self && !$stream_state->{response_started}) {
    50          
672 5 100       441 warn $error
673             ? "PAGI application error (HTTP/2 stream $stream_id): $error\n"
674             : "PAGI application returned without starting a response (HTTP/2 stream $stream_id)\n";
675 5         19 eval {
676 5         40 $weak_self->{h2_session}->submit_response($stream_id,
677             status => 500,
678             headers => [['content-type', 'text/plain']],
679             body => "Internal Server Error\n",
680             );
681 5         204 $weak_self->_h2_write_pending;
682             };
683             # The synthesized 500 is this stream's response — mark it started.
684             $stream_state->{connection_state}->_mark_response_started
685 5 100       26 if $stream_state->{connection_state};
686             }
687             elsif ($error) {
688             # Response already started; cannot send a 500. Log only.
689 0         0 warn "PAGI application error after response started (HTTP/2 stream $stream_id): $error\n";
690             }
691              
692             # Notify server that request completed (for max_requests tracking)
693 64 50 33     710 $weak_self->{server}->_on_request_complete if $weak_self && $weak_self->{server};
694 64         395 })->();
695              
696 64         4632 $self->{server}->adopt_future($future);
697             }
698              
699             sub _h2_create_scope {
700 39     39   205566 my ($self, $stream_id, $stream_state) = @_;
701              
702 39         83 my $pseudo = $stream_state->{pseudo};
703 39         71 my $headers = $stream_state->{headers};
704              
705             # Parse path and query string from :path pseudo-header
706 39   50     134 my $full_path = $pseudo->{':path'} // '/';
707 39         187 my ($path, $query_string) = split(/\?/, $full_path, 2);
708 39   100     219 $query_string //= '';
709              
710             # Decode percent-encoded path for scope (keep raw_path as-is)
711             # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
712 39         265 my $unescaped = uri_unescape($path);
713 39   33     440 my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
  39         718  
714             // $unescaped;
715              
716 39         4941 my $connection_state = PAGI::Server::ConnectionState->new(
717             connection => $self,
718             );
719             # Store on the stream-state so the send path can mark response_started on
720             # this stream's own connection object (h2 multiplexes many streams).
721 39         93 $stream_state->{connection_state} = $connection_state;
722              
723             return {
724             type => 'http',
725             pagi => {
726             version => '0.3',
727             spec_version => '0.3',
728             },
729             http_version => '2',
730             method => $pseudo->{':method'} // 'GET',
731             scheme => $pseudo->{':scheme'} // $self->_get_scheme,
732             path => $decoded_path,
733             raw_path => $path,
734             query_string => $query_string,
735             root_path => '',
736             headers => $headers,
737             (defined $self->{client_host}
738             ? (client => [$self->{client_host}, $self->{client_port}])
739             : ()
740             ),
741             server => [$self->{server_host}, $self->{server_port}],
742 39         375 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
743             extensions => $self->_get_extensions_for_scope,
744             'pagi.connection' => $connection_state,
745             # h2 transport handle measures THIS stream's send queue (per-stream),
746             # stored on the stream state rather than $self->{current_transport_state}
747             # because h2 multiplexes many concurrent streams over one connection.
748 39 50 50     686 'pagi.transport' => ($stream_state->{transport_state} = $self->_h2_transport_state($stream_state)),
    50 33        
749             };
750             }
751              
752             sub _h2_create_receive {
753 36     36   88 my ($self, $stream_id, $stream_state) = @_;
754              
755 36         84 weaken(my $weak_self = $self);
756              
757             return sub {
758 35 50   35   358 return Future->done({ type => 'http.disconnect' }) unless $weak_self;
759 35 50       91 return Future->done({ type => 'http.disconnect' }) if $weak_self->{closed};
760              
761 35         73 my $ss = $weak_self->{h2_streams}{$stream_id};
762 35 50       93 return Future->done({ type => 'http.disconnect' }) unless $ss;
763              
764 35         58 my $future = (async sub {
765 35 50       85 return { type => 'http.disconnect' } unless $weak_self;
766              
767 35         66 my $ss = $weak_self->{h2_streams}{$stream_id};
768 35 50       104 return { type => 'http.disconnect' } unless $ss;
769              
770             # Check queue first
771 35 50       59 if (@{$ss->{receive_queue}}) {
  35         106  
772 0         0 return shift @{$ss->{receive_queue}};
  0         0  
773             }
774              
775             # If body is already complete, return final body event
776 35 50       93 if ($ss->{body_complete}) {
777 35         96 my $body = $ss->{body};
778 35         85 $ss->{body} = '';
779             return {
780 35         673 type => 'http.request',
781             body => $body,
782             more => 0,
783             };
784             }
785              
786             # Wait for body data
787 0 0       0 if (!$ss->{body_pending}) {
788 0         0 $ss->{body_pending} = Future->new;
789             }
790 0         0 await $ss->{body_pending};
791              
792             # Re-fetch stream state (may have changed)
793 0         0 $ss = $weak_self->{h2_streams}{$stream_id};
794 0 0       0 return { type => 'http.disconnect' } unless $ss;
795              
796             # Check queue after waking
797 0 0       0 if (@{$ss->{receive_queue}}) {
  0         0  
798 0         0 return shift @{$ss->{receive_queue}};
  0         0  
799             }
800              
801 0         0 my $body = $ss->{body};
802 0         0 $ss->{body} = '';
803             return {
804             type => 'http.request',
805             body => $body,
806 0 0       0 more => $ss->{body_complete} ? 0 : 1,
807             };
808 35         252 })->();
809              
810 35         1870 return $future;
811 36         215 };
812             }
813              
814             sub _h2_create_send {
815 36     36   135 my ($self, $stream_id, $stream_state) = @_;
816              
817 36         90 weaken(my $weak_self = $self);
818              
819 36         71 my $status;
820             my @response_headers;
821              
822             # Streaming state for deferred data provider pattern.
823             # The send queue lives on per-stream state ($ss->{send_queue} /
824             # $ss->{send_queue_bytes}) so the h2 transport handle can measure it;
825             # $eof_pending / $streaming_started stay closure-local.
826 36         54 my $eof_pending = 0;
827 36         53 my $streaming_started = 0;
828              
829             # Data callback for nghttp2's streaming response.
830             # Returns ($data, $eof) when data is available, or undef to defer.
831             my $data_callback = sub {
832 132     132   225 my ($cb_stream_id, $max_len) = @_;
833              
834 132   33     410 my $ss = $weak_self && $weak_self->{h2_streams}{$stream_id};
835 132 50       227 return undef unless $ss;
836 132   50     269 my $q = $ss->{send_queue} ||= [];
837              
838 132 100       232 if (@$q) {
839 98         168 my $chunk = shift @$q;
840             # Respect max_len — XS truncates without preserving remainder
841 98 100       223 if (length($chunk) > $max_len) {
842 15         255 unshift @$q, substr($chunk, $max_len);
843 15         100 $chunk = substr($chunk, 0, $max_len);
844             }
845 98         176 $ss->{send_queue_bytes} -= length($chunk);
846              
847             # Per-stream backpressure: once this stream's queue falls below the
848             # low watermark, release any producer blocked in
849             # _h2_wait_for_stream_drain. This callback runs inside nghttp2's
850             # extract(), so resolve on the next loop tick — completing the Future
851             # resumes the awaiting producer synchronously, and it must not call
852             # resume_stream/_h2_write_pending re-entrantly into nghttp2.
853 98 100 50     380 if (($ss->{send_queue_bytes} // 0) < $weak_self->{write_low_watermark}
      100        
      100        
854 12         37 && $ss->{stream_drain_waiters} && @{$ss->{stream_drain_waiters}}) {
855 5         8 my @waiters = splice @{$ss->{stream_drain_waiters}};
  5         18  
856             $weak_self->{server}->loop->later(sub {
857 5         2133 $_->done for grep { !$_->is_ready } @waiters;
  5         21  
858 5         21 });
859             }
860              
861             # Fire the app's on_drain hysteresis callbacks once this stream's
862             # queue falls below the low watermark. Like the waiters above, this
863             # runs inside nghttp2's extract(), and an on_drain callback may call
864             # $send to resume its source — which would re-enter nghttp2. Splice
865             # the fires out first (so they can't double-fire), then invoke them on
866             # the next loop tick.
867 98 100 50     476 if (($ss->{send_queue_bytes} // 0) < $weak_self->{write_low_watermark}
      100        
      100        
868 15         41 && $ss->{transport_drain_fires} && @{$ss->{transport_drain_fires}}) {
869 4         8 my @fires = splice @{$ss->{transport_drain_fires}};
  4         10  
870             $weak_self->{server}->loop->later(sub {
871 4         903 $_->() for @fires;
872 4         14 });
873             }
874              
875 98 100 100     347 my $eof = (!@$q && $eof_pending) ? 1 : 0;
876 98         802 return ($chunk, $eof);
877             }
878              
879             # Queue empty but EOF pending — signal end of stream
880 34 100       77 if ($eof_pending) {
881 2         17 return ('', 1);
882             }
883              
884             # Queue empty, more data expected — defer (NGHTTP2_ERR_DEFERRED in C layer)
885 32         139 return undef;
886 36         324 };
887              
888 141     141   5982 return async sub {
889 141         230 my ($event) = @_;
890 141 50       286 return unless $weak_self;
891 141 50       295 return if $weak_self->{closed};
892              
893 141   50     321 my $type = $event->{type} // '';
894              
895 141 100       379 if ($type eq 'http.response.start') {
    50          
896 33 50       119 my $ss = $weak_self->{h2_streams}{$stream_id} or return;
897 33 50       89 return if $ss->{response_started};
898 33         74 $ss->{response_started} = 1;
899 33 50       363 $ss->{connection_state}->_mark_response_started if $ss->{connection_state};
900              
901 33   50     123 $status = $event->{status} // 200;
902             @response_headers = map {
903 32         188 [_validate_header_name($_->[0]), _validate_header_value($_->[1])]
904 33   50     58 } @{$event->{headers} // []};
  33         130  
905             # Server-supplied Date header (HTTP/1.1 parity) — add if the app didn't.
906 33 50       75 unless (grep { lc($_->[0]) eq 'date' } @response_headers) {
  32         423  
907 33         448 push @response_headers, ['date', $weak_self->{protocol}->format_date];
908             }
909             }
910             elsif ($type eq 'http.response.body') {
911 108 50       285 my $ss = $weak_self->{h2_streams}{$stream_id} or return;
912 108 50       262 return unless $ss->{response_started};
913              
914 108   50     249 my $body = $event->{body} // '';
915 108   100     276 my $more = $event->{more} // 0;
916              
917 108 100       192 if ($more) {
918 75 100       130 if (!$streaming_started) {
919             # First streaming chunk — submit with data callback
920 10         16 $streaming_started = 1;
921 10   50     60 $ss->{send_queue} //= [];
922 10   50     60 $ss->{send_queue_bytes} //= 0;
923 10 50       26 if (length $body) {
924 10         23 push @{$ss->{send_queue}}, $body;
  10         30  
925 10         25 $ss->{send_queue_bytes} += length $body;
926             }
927             # Synchronous: we're in the app's send path (not nghttp2's
928             # extract), so on_high_water can fire here to tell the app to
929             # pause its source.
930 10 50       80 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
931             $weak_self->{h2_session}->submit_response_streaming(
932 10         77 $stream_id,
933             status => $status,
934             headers => \@response_headers,
935             data_callback => $data_callback,
936             );
937 10         392 $weak_self->_h2_write_pending;
938             } else {
939             # Subsequent chunk — backpressure check then push and resume.
940             # Bound on THIS stream's send queue (per-stream), not the
941             # shared TCP buffer which is meaningless across multiplexed
942             # streams.
943 65 100 50     203 if (($ss->{send_queue_bytes} // 0) >= $weak_self->{write_high_watermark}) {
944 5         17 await $weak_self->_h2_wait_for_stream_drain($stream_id);
945 5 50       597 return unless $weak_self;
946 5 50       13 return if $weak_self->{closed};
947 5 50       18 return unless $weak_self->{h2_streams}{$stream_id};
948             }
949 65 50       160 if (length $body) {
950 65         102 push @{$ss->{send_queue}}, $body;
  65         148  
951 65         111 $ss->{send_queue_bytes} += length $body;
952             }
953             # Synchronous — app send path, not nghttp2 extract.
954 65 50       271 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
955 65         269 $weak_self->{h2_session}->resume_stream($stream_id);
956 65         576 $weak_self->_h2_write_pending;
957             }
958             } else {
959 33 100       75 if ($streaming_started) {
960             # Final chunk on an already-streaming response. Bound on THIS
961             # stream's send queue (per-stream), not the shared TCP buffer.
962 10 50 50     45 if (($ss->{send_queue_bytes} // 0) >= $weak_self->{write_high_watermark}) {
963 0         0 await $weak_self->_h2_wait_for_stream_drain($stream_id);
964 0 0       0 return unless $weak_self;
965 0 0       0 return if $weak_self->{closed};
966 0 0       0 return unless $weak_self->{h2_streams}{$stream_id};
967             }
968 10         31 $eof_pending = 1;
969 10 100       26 if (length $body) {
970 8         14 push @{$ss->{send_queue}}, $body;
  8         19  
971 8         18 $ss->{send_queue_bytes} += length $body;
972             }
973             # Synchronous — app send path, not nghttp2 extract.
974 10 50       56 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
975 10         43 $weak_self->{h2_session}->resume_stream($stream_id);
976 10         95 $weak_self->_h2_write_pending;
977             } else {
978             # Non-streaming: single response (unchanged one-shot path)
979 23         165 $weak_self->{h2_session}->submit_response($stream_id,
980             status => $status,
981             headers => \@response_headers,
982             body => $body,
983             );
984 23         897 $weak_self->_h2_write_pending;
985             }
986             }
987             }
988             else {
989 0         0 _unrecognized_event_type($type, 'http');
990             }
991 36         458 };
992             }
993              
994             # =============================================================================
995             # HTTP/2 WebSocket over HTTP/2 (RFC 8441)
996             # =============================================================================
997              
998             sub _h2_create_websocket_scope {
999 17     17   190689 my ($self, $stream_id, $stream_state) = @_;
1000              
1001 17         39 my $pseudo = $stream_state->{pseudo};
1002 17         43 my $headers = $stream_state->{headers};
1003              
1004 17   50     55 my $full_path = $pseudo->{':path'} // '/';
1005 17         80 my ($path, $query_string) = split(/\?/, $full_path, 2);
1006 17   50     96 $query_string //= '';
1007              
1008             # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
1009 17         123 my $unescaped = uri_unescape($path);
1010 17   33     164 my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
  17         271  
1011             // $unescaped;
1012              
1013             # Extract subprotocols from headers
1014 17         973 my @subprotocols;
1015 17         38 for my $header (@$headers) {
1016 34         63 my ($name, $value) = @$header;
1017 34 100       80 if ($name eq 'sec-websocket-protocol') {
1018 1         65 push @subprotocols, map { s/^\s+|\s+$//gr } split /,/, $value;
  2         13  
1019             }
1020             }
1021              
1022             return {
1023             type => 'websocket',
1024             pagi => {
1025             version => '0.3',
1026             spec_version => '0.3',
1027             },
1028             http_version => '2',
1029             scheme => $self->_get_ws_scheme,
1030             path => $decoded_path,
1031             raw_path => $path,
1032             query_string => $query_string,
1033             root_path => '',
1034             headers => $headers,
1035             (defined $self->{client_host}
1036             ? (client => [$self->{client_host}, $self->{client_port}])
1037             : ()
1038             ),
1039             server => [$self->{server_host}, $self->{server_port}],
1040             subprotocols => \@subprotocols,
1041 17         189 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
1042 17 50       123 extensions => { %{$self->_get_extensions_for_scope}, 'websocket.http.response' => {} },
  17 50       60  
1043             };
1044             }
1045              
1046             sub _h2_create_websocket_receive {
1047 16     16   38 my ($self, $stream_id, $stream_state) = @_;
1048              
1049 16         30 weaken(my $weak_self = $self);
1050              
1051             return sub {
1052 31 50   31   1160 return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
1053             unless $weak_self;
1054             return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
1055 31 50       88 if $weak_self->{closed};
1056              
1057 31         53 my $ss = $weak_self->{h2_streams}{$stream_id};
1058 31 50       54 return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
1059             unless $ss;
1060              
1061 31         43 my $future = (async sub {
1062 31 50       53 return { type => 'websocket.disconnect', code => 1006, reason => '' }
1063             unless $weak_self;
1064              
1065 31         59 my $ss = $weak_self->{h2_streams}{$stream_id};
1066 31 50       75 return { type => 'websocket.disconnect', code => 1006, reason => '' }
1067             unless $ss;
1068              
1069             # Check queue first
1070 31 50       37 if (@{$ss->{receive_queue}}) {
  31         97  
1071 0         0 return shift @{$ss->{receive_queue}};
  0         0  
1072             }
1073              
1074             # First call returns websocket.connect
1075 31 100       79 if (!$ss->{ws_connect_sent}) {
1076 15         91 $ss->{ws_connect_sent} = 1;
1077 15         155 return { type => 'websocket.connect' };
1078             }
1079              
1080             # Wait for events
1081 16         21 while (1) {
1082 38 100       49 if (@{$ss->{receive_queue}}) {
  38         91  
1083 9         36 return shift @{$ss->{receive_queue}};
  9         85  
1084             }
1085              
1086             return { type => 'websocket.disconnect', code => 1006, reason => '' }
1087 29 100       104 if $weak_self->{closed};
1088              
1089 22 50       63 if (!$ss->{body_pending}) {
1090 22         49 $ss->{body_pending} = Future->new;
1091             }
1092 22         156 await $ss->{body_pending};
1093              
1094 22         1929 $ss = $weak_self->{h2_streams}{$stream_id};
1095 22 50       52 return { type => 'websocket.disconnect', code => 1006, reason => '' }
1096             unless $ss;
1097             }
1098 31         137 })->();
1099              
1100 31         1381 return $future;
1101 16         102 };
1102             }
1103              
1104             sub _h2_create_websocket_send {
1105 16     16   40 my ($self, $stream_id, $stream_state) = @_;
1106              
1107 16         33 weaken(my $weak_self = $self);
1108              
1109 25     25   680 return async sub {
1110 25         49 my ($event) = @_;
1111 25 50       54 return unless $weak_self;
1112 25 50       56 return if $weak_self->{closed};
1113              
1114 25         47 my $ss = $weak_self->{h2_streams}{$stream_id};
1115 25 50       48 return unless $ss;
1116              
1117 25   50     61 my $type = $event->{type} // '';
1118              
1119 25 100       84 if ($type eq 'websocket.accept') {
    100          
    100          
    100          
    50          
1120 12 50       56 return if $ss->{ws_accepted};
1121              
1122             # HTTP/2 WebSocket: respond with 200 (not 101)
1123 12         14 my @headers;
1124 12 50       33 if (my $subprotocol = $event->{subprotocol}) {
1125 0         0 $subprotocol = _validate_subprotocol($subprotocol);
1126 0         0 push @headers, ['sec-websocket-protocol', $subprotocol];
1127             }
1128 12 50       33 if (my $extra = $event->{headers}) {
1129             push @headers, map {
1130 0         0 [_validate_header_name($_->[0]), _validate_header_value($_->[1])]
  0         0  
1131             } @$extra;
1132             }
1133              
1134 12         22 $ss->{ws_accepted} = 1;
1135 12         19 $ss->{response_started} = 1;
1136             $ss->{ws_frame} = Protocol::WebSocket::Frame->new(
1137             max_payload_size => $weak_self->{max_ws_frame_size},
1138 12         147 );
1139              
1140             # Submit 200 response with streaming body that defers
1141             $weak_self->{h2_session}->submit_response($stream_id,
1142             status => 200,
1143             headers => \@headers,
1144 12         40 body => sub { return undef }, # defer until submit_data
1145 12         484 );
1146 12         388 $weak_self->_h2_write_pending;
1147              
1148             # Process any data that arrived before accept
1149 12 50       43 if (length($ss->{body}) > 0) {
1150 0         0 my $buffered = $ss->{body};
1151 0         0 $ss->{body} = '';
1152 0         0 $weak_self->_h2_process_ws_frames($stream_id, $ss, $buffered);
1153             }
1154             }
1155             elsif ($type eq 'websocket.send') {
1156 5 50       14 return unless $ss->{ws_accepted};
1157              
1158 5         9 my $frame;
1159 5 100       12 if (defined $event->{text}) {
    50          
1160             $frame = Protocol::WebSocket::Frame->new(
1161             buffer => $event->{text},
1162 4         16 type => 'text',
1163             );
1164             }
1165             elsif (defined $event->{bytes}) {
1166             $frame = Protocol::WebSocket::Frame->new(
1167             buffer => $event->{bytes},
1168 1         5 type => 'binary',
1169             );
1170             }
1171             else {
1172 0         0 return;
1173             }
1174              
1175 5         302 my $bytes = $frame->to_bytes;
1176 5         179 $weak_self->{h2_session}->submit_data($stream_id, $bytes, 0);
1177 5         16 $weak_self->_h2_write_pending;
1178             }
1179             elsif ($type eq 'websocket.http.response.start') {
1180 2 50       7 return if $ss->{ws_accepted};
1181 2 50       5 return if $ss->{ws_denial_started};
1182 2         5 $ss->{ws_denial_started} = 1;
1183 2   50     6 $ss->{ws_denial_status} = $event->{status} // 403;
1184             $ss->{ws_denial_headers} = [
1185 2         10 map { [_validate_header_name($_->[0]), _validate_header_value($_->[1])] }
1186 2   50     2 @{$event->{headers} // []}
  2         5  
1187             ];
1188 2         4 $ss->{ws_denial_body} = '';
1189             }
1190             elsif ($type eq 'websocket.http.response.body') {
1191 3 50       6 return unless $ss->{ws_denial_started};
1192 3 50       6 return if $ss->{response_started};
1193 3   50     6 $ss->{ws_denial_body} .= $event->{body} // '';
1194 3 100       14 return if $event->{more}; # more chunks coming — keep buffering
1195              
1196 2         4 $ss->{response_started} = 1;
1197             $weak_self->{h2_session}->submit_response($stream_id,
1198             status => $ss->{ws_denial_status},
1199             headers => $ss->{ws_denial_headers},
1200             body => $ss->{ws_denial_body},
1201 2         11 );
1202 2         82 $weak_self->_h2_write_pending;
1203             }
1204             elsif ($type eq 'websocket.close') {
1205 3 100       10 if (!$ss->{ws_accepted}) {
1206             # Reject: send 403
1207 2         12 $weak_self->{h2_session}->submit_response($stream_id,
1208             status => 403,
1209             headers => [['content-type', 'text/plain']],
1210             body => 'Forbidden',
1211             );
1212 2         67 $weak_self->_h2_write_pending;
1213 2         14 return;
1214             }
1215              
1216 1   50     3 my $code = $event->{code} // 1000;
1217 1   50     2 my $reason = $event->{reason} // '';
1218              
1219 1         8 my $frame = Protocol::WebSocket::Frame->new(
1220             type => 'close',
1221             buffer => pack('n', $code) . $reason,
1222             );
1223              
1224             # Send close frame + END_STREAM
1225 1         34 $weak_self->{h2_session}->submit_data($stream_id, $frame->to_bytes, 1);
1226 1         3 $weak_self->_h2_write_pending;
1227             }
1228              
1229 22         219 return;
1230 16         178 };
1231             }
1232              
1233             # =============================================================================
1234             # HTTP/2 SSE (Server-Sent Events over HTTP/2)
1235             # =============================================================================
1236              
1237             sub _h2_create_sse_scope {
1238 13     13   2063 my ($self, $stream_id, $stream_state) = @_;
1239              
1240 13         30 my $pseudo = $stream_state->{pseudo};
1241 13         37 my $headers = $stream_state->{headers};
1242              
1243 13   50     64 my $full_path = $pseudo->{':path'} // '/';
1244 13         104 my ($path, $query_string) = split(/\?/, $full_path, 2);
1245 13   50     206 $query_string //= '';
1246              
1247             # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
1248 13         107 my $unescaped = uri_unescape($path);
1249 13   33     147 my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
  13         261  
1250             // $unescaped;
1251              
1252             return {
1253             type => 'sse',
1254             pagi => {
1255             version => '0.3',
1256             spec_version => '0.3',
1257             },
1258             http_version => '2',
1259             method => $pseudo->{':method'} // 'GET',
1260             scheme => $pseudo->{':scheme'} // $self->_get_scheme,
1261             path => $decoded_path,
1262             raw_path => $path,
1263             query_string => $query_string,
1264             root_path => '',
1265             headers => $headers,
1266             (defined $self->{client_host}
1267             ? (client => [$self->{client_host}, $self->{client_port}])
1268             : ()
1269             ),
1270             server => [$self->{server_host}, $self->{server_port}],
1271 13         128 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
1272             extensions => $self->_get_extensions_for_scope,
1273             # Per-stream outbound flow-control handle. Like the h2 streaming scope,
1274             # it measures THIS stream's send queue (h2 multiplexes many streams over
1275             # one connection, so the shared TCP buffer is meaningless per stream).
1276 13 50 50     1432 'pagi.transport' => ($stream_state->{transport_state} = $self->_h2_transport_state($stream_state)),
    50 33        
1277             };
1278             }
1279              
1280             sub _h2_create_sse_receive {
1281 12     12   34 my ($self, $stream_id, $stream_state) = @_;
1282              
1283 12         31 weaken(my $weak_self = $self);
1284              
1285             my $sse_disconnect = sub {
1286             return {
1287 2     2   19 type => 'sse.disconnect',
1288             reason => 'client_closed',
1289             };
1290 12         64 };
1291              
1292             return sub {
1293 13 50   13   179 return Future->done($sse_disconnect->()) unless $weak_self;
1294 13 50       62 return Future->done($sse_disconnect->()) if $weak_self->{closed};
1295              
1296 13         30 my $ss = $weak_self->{h2_streams}{$stream_id};
1297 13 50       37 return Future->done($sse_disconnect->()) unless $ss;
1298              
1299 13         35 my $future = (async sub {
1300 13 50       30 return $sse_disconnect->() unless $weak_self;
1301              
1302 13         44 my $ss = $weak_self->{h2_streams}{$stream_id};
1303 13 50       32 return $sse_disconnect->() unless $ss;
1304              
1305             # Check queue first
1306 13 50       20 if (@{$ss->{receive_queue}}) {
  13         60  
1307 0         0 return shift @{$ss->{receive_queue}};
  0         0  
1308             }
1309              
1310             # First call returns sse.request with body
1311 13 100       42 if (!$ss->{sse_request_sent}) {
1312 11         29 $ss->{sse_request_sent} = 1;
1313             return {
1314             type => 'sse.request',
1315             body => $ss->{body},
1316 11         284 more => 0,
1317             };
1318             }
1319              
1320             # Wait for disconnect
1321 2         3 while (1) {
1322 4 50       6 if (@{$ss->{receive_queue}}) {
  4         12  
1323 0         0 return shift @{$ss->{receive_queue}};
  0         0  
1324             }
1325              
1326             return $sse_disconnect->()
1327 4 100       15 if $weak_self->{closed};
1328              
1329 2 50       5 if (!$ss->{body_pending}) {
1330 2         7 $ss->{body_pending} = Future->new;
1331             }
1332 2         21 await $ss->{body_pending};
1333              
1334 2         247 $ss = $weak_self->{h2_streams}{$stream_id};
1335 2 50       6 return $sse_disconnect->() unless $ss;
1336             }
1337 13         81 })->();
1338              
1339 13         1016 return $future;
1340 12         109 };
1341             }
1342              
1343             sub _h2_create_sse_send {
1344 12     12   34 my ($self, $stream_id, $stream_state) = @_;
1345              
1346 12         36 weaken(my $weak_self = $self);
1347              
1348             # Streaming state for the data-provider pull pattern. The send queue lives on
1349             # per-stream state ($ss->{send_queue} / $ss->{send_queue_bytes}) so the
1350             # pagi.transport handle can measure THIS stream's backlog. $streaming_started
1351             # stays closure-local.
1352 12         23 my $streaming_started = 0;
1353              
1354             # Data callback for nghttp2's streaming response. Pulls from the per-stream
1355             # queue; SSE responses stay open, so this never signals EOF (returns eof=0),
1356             # or undef to defer when the queue is empty.
1357             my $data_callback = sub {
1358 74     74   133 my ($cb_stream_id, $max_len) = @_;
1359              
1360 74   33     467 my $ss = $weak_self && $weak_self->{h2_streams}{$stream_id};
1361 74 50       132 return undef unless $ss;
1362 74   50     160 my $q = $ss->{send_queue} ||= [];
1363              
1364 74 100       161 if (@$q) {
1365 33         69 my $chunk = shift @$q;
1366             # Respect max_len — XS truncates without preserving remainder
1367 33 100       138 if (length($chunk) > $max_len) {
1368 4         310 unshift @$q, substr($chunk, $max_len);
1369 4         50 $chunk = substr($chunk, 0, $max_len);
1370             }
1371 33         148 $ss->{send_queue_bytes} -= length($chunk);
1372              
1373             # Per-stream backpressure: once this stream's queue falls below the
1374             # low watermark, release any producer blocked in
1375             # _h2_wait_for_stream_drain. This runs inside nghttp2's extract(), so
1376             # resolve on the next loop tick — completing the Future resumes the
1377             # awaiting producer synchronously, and it must not re-enter nghttp2.
1378 33 50 50     216 if (($ss->{send_queue_bytes} // 0) < $weak_self->{write_low_watermark}
      66        
      33        
1379 0         0 && $ss->{stream_drain_waiters} && @{$ss->{stream_drain_waiters}}) {
1380 0         0 my @waiters = splice @{$ss->{stream_drain_waiters}};
  0         0  
1381             $weak_self->{server}->loop->later(sub {
1382 0         0 $_->done for grep { !$_->is_ready } @waiters;
  0         0  
1383 0         0 });
1384             }
1385              
1386             # Fire the app's on_drain hysteresis callbacks once this stream's
1387             # queue falls below the low watermark. Deferred for the same reason:
1388             # an on_drain callback may call $send, which would re-enter nghttp2.
1389 33 100 50     184 if (($ss->{send_queue_bytes} // 0) < $weak_self->{write_low_watermark}
      100        
      66        
1390 1         4 && $ss->{transport_drain_fires} && @{$ss->{transport_drain_fires}}) {
1391 1         2 my @fires = splice @{$ss->{transport_drain_fires}};
  1         6  
1392             $weak_self->{server}->loop->later(sub {
1393 1         829 $_->() for @fires;
1394 1         15 });
1395             }
1396              
1397 33         502 return ($chunk, 0); # SSE streams never EOF via data_callback
1398             }
1399              
1400             # Queue empty — defer (NGHTTP2_ERR_DEFERRED in the C layer)
1401 41         173 return undef;
1402 12         186 };
1403              
1404 38     38   103043 return async sub {
1405 38         64 my ($event) = @_;
1406 38 50       106 return unless $weak_self;
1407 38 50       92 return if $weak_self->{closed};
1408              
1409 38         74 my $ss = $weak_self->{h2_streams}{$stream_id};
1410 38 50       210 return unless $ss;
1411              
1412             # Reset SSE idle timer on send activity
1413 38         150 $weak_self->_reset_sse_idle_timer;
1414              
1415 38   50     108 my $type = $event->{type} // '';
1416              
1417             # Dev-mode event validation (PAGI spec compliance)
1418 38 50       74 if ($weak_self->{validate_events}) {
1419 0         0 require PAGI::Server::EventValidator;
1420 0         0 PAGI::Server::EventValidator::validate_sse_send($event);
1421             }
1422              
1423 38 100       142 if ($type eq 'sse.start') {
    100          
    100          
    50          
1424 12 50       72 return if $ss->{response_started};
1425 12         34 $ss->{response_started} = 1;
1426              
1427 12   50     37 my $status = $event->{status} // 200;
1428 12   100     56 my $headers = $event->{headers} // [];
1429              
1430             # Ensure Content-Type is text/event-stream
1431 12         32 my $has_content_type = 0;
1432 12         32 for my $h (@$headers) {
1433 2 100       8 if (lc($h->[0]) eq 'content-type') {
1434 1         2 $has_content_type = 1;
1435 1         2 last;
1436             }
1437             }
1438              
1439 12         22 my @final_headers;
1440 12         23 for my $h (@$headers) {
1441 2         7 push @final_headers, [_validate_header_name($h->[0]), _validate_header_value($h->[1])];
1442             }
1443 12 100       39 if (!$has_content_type) {
1444 11         51 push @final_headers, ['content-type', 'text/event-stream'];
1445             }
1446 12         35 push @final_headers, ['cache-control', 'no-cache'];
1447             # Server-supplied Date header (HTTP/1.1 parity) — the h1 SSE path adds
1448             # this too; add it unless the app supplied one.
1449 12 50       28 unless (grep { lc($_->[0]) eq 'date' } @final_headers) {
  25         83  
1450 12         127 push @final_headers, ['date', $weak_self->{protocol}->format_date];
1451             }
1452              
1453 12         26 $streaming_started = 1;
1454 12   50     70 $ss->{send_queue} //= [];
1455 12   50     58 $ss->{send_queue_bytes} //= 0;
1456             $weak_self->{h2_session}->submit_response_streaming(
1457 12         86 $stream_id,
1458             status => $status,
1459             headers => \@final_headers,
1460             data_callback => $data_callback,
1461             );
1462 12         606 $weak_self->_h2_write_pending;
1463              
1464             # Protocol-specific keepalive writer (HTTP/2 DATA frames). Keepalive
1465             # bytes are counted in the per-stream backlog so buffered_amount stays
1466             # accurate, but they do not poke the watermark callbacks — a server
1467             # heartbeat is not an application send.
1468             $weak_self->{sse_keepalive_writer} = sub {
1469 6         21 my ($text) = @_;
1470 6 50       19 return unless $weak_self;
1471 6 50       36 return if $weak_self->{closed};
1472 6 50       35 my $ss = $weak_self->{h2_streams}{$stream_id} or return;
1473 6   50     13 push @{$ss->{send_queue} ||= []}, $text;
  6         39  
1474 6   50     36 $ss->{send_queue_bytes} = ($ss->{send_queue_bytes} // 0) + length $text;
1475 6         96 $weak_self->{h2_session}->resume_stream($stream_id);
1476 6         155 $weak_self->_h2_write_pending;
1477 12         95 };
1478              
1479             # Start SSE idle timer if configured
1480 12         58 $weak_self->_start_sse_idle_timer;
1481             }
1482             elsif ($type eq 'sse.send') {
1483 23 50       63 return unless $ss->{response_started};
1484              
1485             # Per-stream backpressure: bound on THIS stream's queue, not the
1486             # shared TCP buffer (meaningless across multiplexed h2 streams).
1487 23 50 50     119 if (($ss->{send_queue_bytes} // 0) >= $weak_self->{write_high_watermark}) {
1488 0         0 await $weak_self->_h2_wait_for_stream_drain($stream_id);
1489 0 0       0 return unless $weak_self;
1490 0 0       0 return if $weak_self->{closed};
1491 0 0       0 return unless $weak_self->{h2_streams}{$stream_id};
1492             }
1493              
1494 23         93 my $sse_data = _format_sse_event($event);
1495 23   50     66 push @{$ss->{send_queue} ||= []}, $sse_data;
  23         285  
1496 23   50     122 $ss->{send_queue_bytes} = ($ss->{send_queue_bytes} // 0) + length $sse_data;
1497             # Synchronous — app send path, not nghttp2 extract — so on_high_water
1498             # may fire here to tell the app to pause its source.
1499 23 50       176 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
1500 23         120 $weak_self->{h2_session}->resume_stream($stream_id);
1501 23         278 $weak_self->_h2_write_pending;
1502             }
1503             elsif ($type eq 'sse.comment') {
1504 1 50       4 return unless $ss->{response_started};
1505              
1506 1         3 my $comment = _format_sse_comment($event);
1507 1   50     2 push @{$ss->{send_queue} ||= []}, $comment;
  1         3  
1508 1   50     3 $ss->{send_queue_bytes} = ($ss->{send_queue_bytes} // 0) + length $comment;
1509 1 50       19 $ss->{transport_state}->_check_watermarks if $ss->{transport_state};
1510 1         5 $weak_self->{h2_session}->resume_stream($stream_id);
1511 1         8 $weak_self->_h2_write_pending;
1512             }
1513             elsif ($type eq 'sse.keepalive') {
1514 2   50     9 my $interval = $event->{interval} // 0;
1515 2         5 my $comment = $event->{comment};
1516              
1517 2 50       9 if ($interval > 0) {
1518 2         11 $weak_self->_start_sse_keepalive($interval, $comment);
1519             }
1520             else {
1521 0         0 $weak_self->_stop_sse_keepalive;
1522             }
1523             }
1524             else {
1525 0         0 _unrecognized_event_type($type, 'sse');
1526             }
1527              
1528 38         545 return;
1529 12         214 };
1530             }
1531              
1532             sub _h2_process_ws_frames {
1533 16     16   36 my ($self, $stream_id, $stream, $data) = @_;
1534              
1535 16         37 my $frame = $stream->{ws_frame};
1536 16 50       33 return unless $frame;
1537              
1538 16         85 $frame->append($data);
1539              
1540 16         224 while (defined(my $bytes = $frame->next_bytes)) {
1541 10         1093 my $opcode = $frame->opcode;
1542              
1543 10 100       68 if ($opcode == 1) {
    100          
    50          
    0          
1544             # Text frame
1545 5         7 my $text = eval { Encode::decode('UTF-8', $bytes, Encode::FB_CROAK) };
  5         46  
1546 5 100       347 unless (defined $text) {
1547 1         5 $self->_h2_ws_close($stream_id, 1007, 'Invalid UTF-8');
1548 1         2 return;
1549             }
1550 4         7 push @{$stream->{receive_queue}}, {
  4         30  
1551             type => 'websocket.receive',
1552             text => $text,
1553             };
1554             }
1555             elsif ($opcode == 2) {
1556             # Binary frame
1557 1         1 push @{$stream->{receive_queue}}, {
  1         7  
1558             type => 'websocket.receive',
1559             bytes => $bytes,
1560             };
1561             }
1562             elsif ($opcode == 8) {
1563             # Close frame
1564 4         22 my ($code, $reason) = (1005, '');
1565              
1566             # RFC 6455 Section 5.5.1: Close frame payload is 0 or >=2 bytes
1567 4 100       14 if (length($bytes) == 1) {
1568 1         5 $self->_h2_ws_close($stream_id, 1002, 'Invalid close frame');
1569 1         2 push @{$stream->{receive_queue}}, {
  1         6  
1570             type => 'websocket.disconnect',
1571             code => 1002,
1572             reason => 'Invalid close frame',
1573             };
1574 1         6 $self->_h2_wake_pending($stream);
1575 1         138 return;
1576             }
1577              
1578 3 50       11 if (length($bytes) >= 2) {
1579 3         11 $code = unpack('n', substr($bytes, 0, 2));
1580 3   50     10 $reason = substr($bytes, 2) // '';
1581              
1582             # RFC 6455 Section 7.4.1: Validate close code
1583 3         6 my $valid_code = 0;
1584 3 100 66     22 if ($code == 1000 || $code == 1001 || $code == 1002 || $code == 1003) {
    50 66        
    50 33        
      33        
      33        
1585 2         4 $valid_code = 1;
1586             }
1587             elsif ($code >= 1007 && $code <= 1011) {
1588 0         0 $valid_code = 1;
1589             }
1590             elsif ($code >= 3000 && $code <= 4999) {
1591 0         0 $valid_code = 1;
1592             }
1593 3 100       12 unless ($valid_code) {
1594 1         4 $self->_h2_ws_close($stream_id, 1002, 'Invalid close code');
1595 1         2 push @{$stream->{receive_queue}}, {
  1         5  
1596             type => 'websocket.disconnect',
1597             code => 1002,
1598             reason => 'Invalid close code',
1599             };
1600 1         4 $self->_h2_wake_pending($stream);
1601 1         115 return;
1602             }
1603              
1604             # RFC 6455: Close reason must be valid UTF-8
1605 2 50       7 if (length($reason) > 0) {
1606 2         3 my $reason_copy = $reason;
1607 2         4 my $decoded = eval { Encode::decode('UTF-8', $reason_copy, Encode::FB_CROAK) };
  2         20  
1608 2 100       128 unless (defined $decoded) {
1609 1         4 $self->_h2_ws_close($stream_id, 1007, 'Invalid UTF-8 in close reason');
1610 1         1 push @{$stream->{receive_queue}}, {
  1         5  
1611             type => 'websocket.disconnect',
1612             code => 1007,
1613             reason => 'Invalid UTF-8 in close reason',
1614             };
1615 1         4 $self->_h2_wake_pending($stream);
1616 1         117 return;
1617             }
1618             }
1619             }
1620              
1621             # Send close frame back + END_STREAM
1622 1         7 my $close_frame = Protocol::WebSocket::Frame->new(
1623             type => 'close',
1624             buffer => pack('n', $code) . $reason,
1625             );
1626 1         31 $self->{h2_session}->submit_data($stream_id, $close_frame->to_bytes, 1);
1627             # No _h2_write_pending — inside feed(); flushed by _h2_process_data
1628              
1629 1         2 push @{$stream->{receive_queue}}, {
  1         8  
1630             type => 'websocket.disconnect',
1631             code => $code,
1632             reason => $reason,
1633             };
1634             }
1635             elsif ($opcode == 9) {
1636             # Ping — respond with pong
1637 0         0 my $pong = Protocol::WebSocket::Frame->new(
1638             type => 'pong',
1639             buffer => $bytes,
1640             );
1641 0         0 $self->{h2_session}->submit_data($stream_id, $pong->to_bytes, 0);
1642             # No _h2_write_pending — inside feed(); flushed by _h2_process_data
1643             }
1644             # Opcode 10 (pong) — ignore
1645             }
1646              
1647 12         390 $self->_h2_wake_pending($stream);
1648             }
1649              
1650             sub _h2_ws_close {
1651 4     4   13 my ($self, $stream_id, $code, $reason) = @_;
1652              
1653 4   50     27 my $frame = Protocol::WebSocket::Frame->new(
1654             type => 'close',
1655             buffer => pack('n', $code) . ($reason // ''),
1656             );
1657 4         126 $self->{h2_session}->submit_data($stream_id, $frame->to_bytes, 1);
1658             # No _h2_write_pending — called from inside feed(); flushed by _h2_process_data
1659             }
1660              
1661             # Request stall timeout - closes connection if no I/O activity during request processing
1662             sub _start_stall_timer {
1663 172     172   318 my ($self) = @_;
1664              
1665 172 50 33     585 return unless $self->{request_timeout} && $self->{request_timeout} > 0;
1666 0 0       0 return unless $self->{server};
1667 0 0       0 return if $self->{stall_timer}; # Already running
1668              
1669 0         0 weaken(my $weak_self = $self);
1670              
1671             my $timer = IO::Async::Timer::Countdown->new(
1672             delay => $self->{request_timeout},
1673             on_expire => sub {
1674 0 0   0   0 return unless $weak_self;
1675 0 0       0 return if $weak_self->{closed};
1676             # Log the timeout
1677 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1678 0         0 $weak_self->{server}->_log(warn =>
1679             "Request stall timeout ($weak_self->{request_timeout}s) - closing connection");
1680             }
1681 0         0 $weak_self->_handle_disconnect_and_close('client_timeout');
1682             },
1683 0         0 );
1684 0         0 $self->{stall_timer} = $timer;
1685 0         0 $self->{server}->add_child($timer);
1686 0         0 $timer->start;
1687             }
1688              
1689             sub _reset_stall_timer {
1690 387     387   613 my ($self) = @_;
1691              
1692 387 50       840 return unless $self->{stall_timer};
1693 0         0 $self->{stall_timer}->reset;
1694 0 0       0 $self->{stall_timer}->start unless $self->{stall_timer}->is_running;
1695             }
1696              
1697             sub _stop_stall_timer {
1698 428     428   807 my ($self) = @_;
1699              
1700 428 50       1176 return unless $self->{stall_timer};
1701 0 0       0 $self->{stall_timer}->stop if $self->{stall_timer}->is_running;
1702 0 0       0 if ($self->{server}) {
1703 0         0 $self->{server}->remove_child($self->{stall_timer});
1704             }
1705 0         0 $self->{stall_timer} = undef;
1706             }
1707              
1708             # WebSocket idle timeout - closes connection if no activity
1709             sub _start_ws_idle_timer {
1710 22     22   43 my ($self) = @_;
1711              
1712 22 50 33     76 return unless $self->{ws_idle_timeout} && $self->{ws_idle_timeout} > 0;
1713 0 0       0 return unless $self->{server};
1714 0 0       0 return if $self->{ws_idle_timer};
1715              
1716 0         0 weaken(my $weak_self = $self);
1717              
1718             my $timer = IO::Async::Timer::Countdown->new(
1719             delay => $self->{ws_idle_timeout},
1720             on_expire => sub {
1721 0 0   0   0 return unless $weak_self;
1722 0 0       0 return if $weak_self->{closed};
1723 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1724 0         0 $weak_self->{server}->_log(warn =>
1725             "WebSocket idle timeout ($weak_self->{ws_idle_timeout}s) - closing connection");
1726             }
1727 0         0 $weak_self->_handle_disconnect_and_close('idle_timeout');
1728             },
1729 0         0 );
1730 0         0 $self->{ws_idle_timer} = $timer;
1731 0         0 $self->{server}->add_child($timer);
1732 0         0 $timer->start;
1733             }
1734              
1735             sub _reset_ws_idle_timer {
1736 85     85   132 my ($self) = @_;
1737              
1738 85 50       176 return unless $self->{ws_idle_timer};
1739 0         0 $self->{ws_idle_timer}->reset;
1740 0 0       0 $self->{ws_idle_timer}->start unless $self->{ws_idle_timer}->is_running;
1741             }
1742              
1743             sub _stop_ws_idle_timer {
1744 273     273   599 my ($self) = @_;
1745              
1746 273 50       709 return unless $self->{ws_idle_timer};
1747 0 0       0 $self->{ws_idle_timer}->stop if $self->{ws_idle_timer}->is_running;
1748 0 0       0 if ($self->{server}) {
1749 0         0 $self->{server}->remove_child($self->{ws_idle_timer});
1750             }
1751 0         0 $self->{ws_idle_timer} = undef;
1752             }
1753              
1754             # SSE idle timeout - closes connection if no activity
1755             sub _start_sse_idle_timer {
1756 27     27   118 my ($self) = @_;
1757              
1758 27 50 33     210 return unless $self->{sse_idle_timeout} && $self->{sse_idle_timeout} > 0;
1759 0 0       0 return unless $self->{server};
1760 0 0       0 return if $self->{sse_idle_timer};
1761              
1762 0         0 weaken(my $weak_self = $self);
1763              
1764             my $timer = IO::Async::Timer::Countdown->new(
1765             delay => $self->{sse_idle_timeout},
1766             on_expire => sub {
1767 0 0   0   0 return unless $weak_self;
1768 0 0       0 return if $weak_self->{closed};
1769 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
1770 0         0 $weak_self->{server}->_log(warn =>
1771             "SSE idle timeout ($weak_self->{sse_idle_timeout}s) - closing connection");
1772             }
1773 0         0 $weak_self->{sse_disconnect_reason} = 'idle_timeout';
1774 0         0 $weak_self->_handle_disconnect_and_close('idle_timeout');
1775             },
1776 0         0 );
1777 0         0 $self->{sse_idle_timer} = $timer;
1778 0         0 $self->{server}->add_child($timer);
1779 0         0 $timer->start;
1780             }
1781              
1782             sub _reset_sse_idle_timer {
1783 72     72   122 my ($self) = @_;
1784              
1785 72 50       193 return unless $self->{sse_idle_timer};
1786 0         0 $self->{sse_idle_timer}->reset;
1787 0 0       0 $self->{sse_idle_timer}->start unless $self->{sse_idle_timer}->is_running;
1788             }
1789              
1790             sub _stop_sse_idle_timer {
1791 273     273   486 my ($self) = @_;
1792              
1793 273 50       764 return unless $self->{sse_idle_timer};
1794 0 0       0 $self->{sse_idle_timer}->stop if $self->{sse_idle_timer}->is_running;
1795 0 0       0 if ($self->{server}) {
1796 0         0 $self->{server}->remove_child($self->{sse_idle_timer});
1797             }
1798 0         0 $self->{sse_idle_timer} = undef;
1799             }
1800              
1801             # ============================================================================
1802             # Send-side backpressure support
1803             # ============================================================================
1804             #
1805             # Prevents unbounded memory growth when apps send faster than slow clients
1806             # can receive. Uses watermark-based flow control:
1807             # - High watermark (default 1MB): pause sending when buffer exceeds this
1808             # - Low watermark (default 256KB): resume sending when buffer drops below this
1809             #
1810             # The $send->() Future will block (await) when high watermark is exceeded,
1811             # and resolve when buffer drains below low watermark.
1812              
1813             sub _get_write_buffer_size {
1814 384     384   605 my ($self) = @_;
1815              
1816 384 50       883 return 0 unless $self->{stream};
1817              
1818             # Access IO::Async::Stream's internal write queue
1819             # IO::Async doesn't expose a public API for buffer size, so we access internals
1820 384   50     896 my $queue = $self->{stream}{writequeue} // [];
1821 384         512 my $total = 0;
1822              
1823 384         841 for my $writer (@$queue) {
1824 255         5177 my $data = $writer->data;
1825 255 50 33     1892 if (defined $data && !ref $data) {
1826 255         487 $total += length($data);
1827             }
1828             }
1829              
1830 384         1552 return $total;
1831             }
1832              
1833             # HTTP/1.1: the transport handle reads the shared TCP write buffer. One
1834             # connection is one stream here, so the IO::Async write queue is the per-stream
1835             # backlog. The connection is held weakly so the handle never keeps it alive.
1836             sub _h1_transport_state {
1837 209     209   537 my ($self) = @_;
1838 209         579 weaken(my $w = $self);
1839             return PAGI::Server::TransportState->new(
1840 185 50   185   566 measure => sub { $w ? $w->_get_write_buffer_size : 0 },
1841 185 50   185   608 high => sub { $w ? $w->{write_high_watermark} : undef },
1842 3 50   3   11 low => sub { $w ? $w->{write_low_watermark} : undef },
1843 1 50   1   2 arm_drain => sub { my $fire = shift; $w->_wait_for_drain->on_ready($fire) if $w },
  1         5  
1844 209         3741 );
1845             }
1846              
1847             # HTTP/2: the transport handle reads this stream's send queue, not the shared
1848             # TCP write buffer — under h2, N streams multiplex one connection, so that
1849             # buffer is the whole connection's backlog (meaningless per stream). $ss is the
1850             # per-stream state hashref; it's held directly (it is the stream's own state),
1851             # while $self is weakened so the handle never keeps the connection alive.
1852             # arm_drain parks the $fire callback on the stream; the data_callback pull fires
1853             # it (deferred) when the queue crosses below the low watermark. Kept separate
1854             # from stream_drain_waiters: those are Futures for blocking backpressure, these
1855             # are the on_drain hysteresis fires.
1856             sub _h2_transport_state {
1857 50     50   125 my ($self, $ss) = @_;
1858 50         131 weaken(my $w = $self);
1859             return PAGI::Server::TransportState->new(
1860 92   100 92   405 measure => sub { $ss->{send_queue_bytes} // 0 },
1861 92 50   92   325 high => sub { $w ? $w->{write_high_watermark} : undef },
1862 0 0   0   0 low => sub { $w ? $w->{write_low_watermark} : undef },
1863 5     5   9 arm_drain => sub { my $fire = shift; push @{$ss->{transport_drain_fires}}, $fire },
  5         11  
  5         82  
1864 50         1217 );
1865             }
1866              
1867             # Notify the current transport-state handle after an application write so its
1868             # backpressure callbacks (on_high_water/on_drain) can fire on a watermark cross.
1869             sub _notify_transport_write {
1870 181     181   371 my ($self) = @_;
1871 181         328 my $ts = $self->{current_transport_state};
1872 181 50       1263 $ts->_check_watermarks if $ts;
1873             }
1874              
1875             sub _check_drain_waiters {
1876 1     1   2 my ($self) = @_;
1877              
1878 1 50       2 return unless @{$self->{_drain_waiters}};
  1         3  
1879 1 50       3 return unless $self->{stream};
1880              
1881 1         3 my $buffered = $self->_get_write_buffer_size;
1882              
1883             # Resolve all waiters if we've drained below low watermark
1884 1 50       4 if ($buffered < $self->{write_low_watermark}) {
1885 1         1 my @waiters = splice @{$self->{_drain_waiters}};
  1         3  
1886 1         2 for my $f (@waiters) {
1887 1 50       3 $f->done unless $f->is_ready;
1888             }
1889             # Disable drain checking until next high watermark hit
1890 1         10 $self->{_drain_check_active} = 0;
1891             }
1892             }
1893              
1894             sub _setup_drain_detection {
1895 1     1   2 my ($self) = @_;
1896              
1897             # Avoid redundant setup
1898 1 50       3 return if $self->{_drain_check_active};
1899 1         2 $self->{_drain_check_active} = 1;
1900              
1901 1         2 weaken(my $weak_self = $self);
1902              
1903             # Primary mechanism: check when write queue empties
1904             # This guarantees we notice drain even for fast-draining connections
1905             # Store previous handler to chain if needed
1906 1         2 my $prev_on_empty = $self->{_prev_on_outgoing_empty};
1907              
1908             $self->{stream}->configure(
1909             on_outgoing_empty => sub {
1910 1 50   1   4192 return unless $weak_self;
1911 1         15 $weak_self->_check_drain_waiters;
1912             # Call previous handler if any
1913 1 50       4 $prev_on_empty->(@_) if $prev_on_empty;
1914             },
1915 1         6 );
1916             }
1917              
1918             sub _wait_for_drain {
1919 1     1   2 my ($self) = @_;
1920              
1921             # Fast path: already below low watermark
1922 1         2 my $buffered = $self->_get_write_buffer_size;
1923 1 50       3 if ($buffered < $self->{write_low_watermark}) {
1924 0         0 return Future->done;
1925             }
1926              
1927             # Create Future to be resolved when drained
1928 1         5 my $f = $self->{server}->loop->new_future;
1929 1         31 push @{$self->{_drain_waiters}}, $f;
  1         2  
1930              
1931             # Ensure drain detection is active
1932 1         4 $self->_setup_drain_detection;
1933              
1934 1         84 return $f;
1935             }
1936              
1937             sub _cancel_drain_waiters {
1938 546     546   987 my ($self, $reason) = @_;
1939 546   100     1096 $reason //= 'connection closed';
1940              
1941 546         751 my @waiters = splice @{$self->{_drain_waiters}};
  546         1226  
1942 546         1278 for my $f (@waiters) {
1943             # Resolve (not fail) - app should check connection state after await
1944 0 0       0 $f->done unless $f->is_ready;
1945             }
1946 546         1007 $self->{_drain_check_active} = 0;
1947             }
1948              
1949             # HTTP/2 per-stream backpressure: the h2 analogue of _wait_for_drain. Resolves
1950             # when this stream's send queue falls below the low watermark. Each multiplexed
1951             # stream is bounded independently, so a quiet TCP buffer can't let one stream's
1952             # queue grow without limit.
1953             sub _h2_wait_for_stream_drain {
1954 5     5   26 my ($self, $stream_id) = @_;
1955              
1956 5 50       15 my $ss = $self->{h2_streams}{$stream_id} or return Future->done;
1957              
1958             # Fast path: already below low watermark
1959 5 50 50     18 if (($ss->{send_queue_bytes} // 0) < $self->{write_low_watermark}) {
1960 0         0 return Future->done;
1961             }
1962              
1963             # Create Future to be resolved when this stream's queue drains (in the
1964             # data_callback pull) or when the stream is torn down.
1965 5         23 my $f = $self->{server}->loop->new_future;
1966 5   100     2018 push @{$ss->{stream_drain_waiters} //= []}, $f;
  5         21  
1967              
1968 5         25 return $f;
1969             }
1970              
1971             # Release any producer blocked on _h2_wait_for_stream_drain for a stream that
1972             # is being torn down (close/RST/connection shutdown). Resolve, never fail - the
1973             # producer rechecks connection/stream state after the await. Some teardown
1974             # sites run inside nghttp2's feed() (e.g. the oversize-body 413 path); completing
1975             # a waiter resumes the producer synchronously, so defer to the next loop tick to
1976             # keep the resumed producer out of a re-entrant nghttp2 call.
1977             sub _h2_resolve_stream_drain_waiters {
1978 67     67   179 my ($self, $ss) = @_;
1979 67 100 66     376 return unless $ss && $ss->{stream_drain_waiters};
1980 1         2 my @waiters = splice @{$ss->{stream_drain_waiters}};
  1         5  
1981 1 50       4 return unless @waiters;
1982             $self->{server}->loop->later(sub {
1983 0     0   0 $_->done for grep { !$_->is_ready } @waiters;
  0         0  
1984 0         0 });
1985             }
1986              
1987             # ============================================================================
1988              
1989             # WebSocket keepalive - sends protocol-level ping frames (RFC 6455)
1990             sub _start_ws_keepalive {
1991 0     0   0 my ($self, $interval, $timeout) = @_;
1992              
1993             # Stop existing timers first
1994 0         0 $self->_stop_ws_keepalive;
1995              
1996 0 0 0     0 return unless $interval && $interval > 0;
1997 0 0       0 return unless $self->{server};
1998              
1999 0         0 $self->{ws_keepalive_interval} = $interval;
2000 0   0     0 $self->{ws_keepalive_timeout} = $timeout // 0;
2001              
2002 0         0 weaken(my $weak_self = $self);
2003              
2004             my $timer = IO::Async::Timer::Periodic->new(
2005             interval => $interval,
2006             on_tick => sub {
2007 0 0   0   0 return unless $weak_self;
2008 0 0       0 return if $weak_self->{closed};
2009 0 0       0 return unless $weak_self->{websocket_mode};
2010              
2011             # Send ping frame
2012 0         0 my $ping = Protocol::WebSocket::Frame->new(
2013             type => 'ping',
2014             buffer => '',
2015             );
2016 0         0 $weak_self->{stream}->write($ping->to_bytes);
2017              
2018             # Start pong timeout if configured
2019 0 0       0 if ($weak_self->{ws_keepalive_timeout} > 0) {
2020 0         0 $weak_self->{ws_waiting_pong} = 1;
2021 0         0 $weak_self->_start_ws_pong_timeout;
2022             }
2023             },
2024 0         0 );
2025              
2026 0         0 $self->{ws_keepalive_timer} = $timer;
2027 0         0 $self->{server}->add_child($timer);
2028 0         0 $timer->start;
2029             }
2030              
2031             sub _start_ws_pong_timeout {
2032 0     0   0 my ($self) = @_;
2033              
2034             # Don't start another timeout if one is running
2035 0 0       0 return if $self->{ws_pong_timeout};
2036 0 0       0 return unless $self->{ws_keepalive_timeout} > 0;
2037 0 0       0 return unless $self->{server};
2038              
2039 0         0 weaken(my $weak_self = $self);
2040              
2041             my $timer = IO::Async::Timer::Countdown->new(
2042             delay => $self->{ws_keepalive_timeout},
2043             on_expire => sub {
2044 0 0   0   0 return unless $weak_self;
2045 0 0       0 return if $weak_self->{closed};
2046              
2047 0 0       0 if ($weak_self->{ws_waiting_pong}) {
2048             # No pong received within timeout - close connection
2049 0 0 0     0 if ($weak_self->{server} && $weak_self->{server}->can('_log')) {
2050 0         0 $weak_self->{server}->_log(warn =>
2051             "WebSocket keepalive timeout - no pong received within $weak_self->{ws_keepalive_timeout}s");
2052             }
2053 0         0 $weak_self->_handle_disconnect_and_close('keepalive_timeout');
2054             }
2055             },
2056 0         0 );
2057              
2058 0         0 $self->{ws_pong_timeout} = $timer;
2059 0         0 $self->{server}->add_child($timer);
2060 0         0 $timer->start;
2061             }
2062              
2063             sub _cancel_ws_pong_timeout {
2064 273     273   505 my ($self) = @_;
2065              
2066 273         556 $self->{ws_waiting_pong} = 0;
2067              
2068 273 50       828 return unless $self->{ws_pong_timeout};
2069 0 0       0 $self->{ws_pong_timeout}->stop if $self->{ws_pong_timeout}->is_running;
2070 0 0       0 if ($self->{server}) {
2071 0         0 $self->{server}->remove_child($self->{ws_pong_timeout});
2072             }
2073 0         0 $self->{ws_pong_timeout} = undef;
2074             }
2075              
2076             sub _stop_ws_keepalive {
2077 273     273   680 my ($self) = @_;
2078              
2079             # Stop pong timeout first
2080 273         1266 $self->_cancel_ws_pong_timeout;
2081              
2082 273 50       783 return unless $self->{ws_keepalive_timer};
2083 0 0       0 $self->{ws_keepalive_timer}->stop if $self->{ws_keepalive_timer}->is_running;
2084 0 0       0 if ($self->{server}) {
2085 0         0 $self->{server}->remove_child($self->{ws_keepalive_timer});
2086             }
2087 0         0 $self->{ws_keepalive_timer} = undef;
2088 0         0 $self->{ws_keepalive_interval} = 0;
2089 0         0 $self->{ws_keepalive_timeout} = 0;
2090             }
2091              
2092             # SSE keepalive - sends comment lines to prevent proxy timeouts
2093             sub _start_sse_keepalive {
2094 2     2   6 my ($self, $interval, $comment) = @_;
2095              
2096             # Stop existing timer first
2097 2         67 $self->_stop_sse_keepalive;
2098              
2099 2 50 33     19 return unless $interval && $interval > 0;
2100 2 50       7 return unless $self->{server};
2101              
2102 2   50     22 $self->{sse_keepalive_comment} = $comment // '';
2103              
2104 2         6 weaken(my $weak_self = $self);
2105              
2106             my $timer = IO::Async::Timer::Periodic->new(
2107             interval => $interval,
2108             on_tick => sub {
2109 6 50   6   893938 return unless $weak_self;
2110 6 50       33 return if $weak_self->{closed};
2111              
2112 6         24 my $text = $weak_self->{sse_keepalive_comment};
2113 6 50       85 $text = ":$text" unless $text =~ /^:/;
2114 6         23 my $formatted = "$text\n\n";
2115              
2116 6 50       30 if (my $writer = $weak_self->{sse_keepalive_writer}) {
2117 6         22 $writer->($formatted);
2118             }
2119             },
2120 2         74 );
2121              
2122 2         234 $self->{sse_keepalive_timer} = $timer;
2123 2         11 $self->{server}->add_child($timer);
2124 2         267 $timer->start;
2125             }
2126              
2127             sub _stop_sse_keepalive {
2128 275     275   619 my ($self) = @_;
2129              
2130 275 100       798 return unless $self->{sse_keepalive_timer};
2131 2 50       38 $self->{sse_keepalive_timer}->stop if $self->{sse_keepalive_timer}->is_running;
2132 2 50       155 if ($self->{server}) {
2133 2         12 $self->{server}->remove_child($self->{sse_keepalive_timer});
2134             }
2135 2         254 $self->{sse_keepalive_timer} = undef;
2136 2         11 $self->{sse_keepalive_comment} = '';
2137             }
2138              
2139             sub _try_handle_request {
2140 228     228   547 my ($self) = @_;
2141              
2142 228 50       655 return if $self->{closed};
2143 228 100       553 return if $self->{handling_request};
2144              
2145             # Try to parse a request from the buffer
2146 217         1656 my ($request, $consumed) = $self->{protocol}->parse_request($self->{buffer});
2147              
2148 217 100       694 return unless $request;
2149              
2150             # Remove consumed bytes from buffer
2151 215         791 substr($self->{buffer}, 0, $consumed) = '';
2152              
2153             # Handle parse errors (malformed request, header too large)
2154 215 100       564 if ($request->{error}) {
2155             # Mark connection as disconnected with protocol_error reason (PAGI spec compliance)
2156 5         22 $self->_handle_disconnect('protocol_error');
2157 5         37 $self->_send_error_response($request->{error}, $request->{message});
2158 5         23 $self->_close;
2159 5         99 return;
2160             }
2161              
2162             # Check Content-Length against max_body_size limit (0 = unlimited)
2163 210 100 100     1037 if ($self->{max_body_size} && defined $request->{content_length}) {
2164 6 100       41 if ($request->{content_length} > $self->{max_body_size}) {
2165 1         4 $self->_handle_disconnect('body_too_large');
2166 1         3 $self->_send_error_response(413, 'Payload Too Large');
2167 1         5 $self->_close;
2168 1         18 return;
2169             }
2170             }
2171              
2172             # Check if this is a WebSocket upgrade request
2173 209         937 my $is_websocket = $self->_is_websocket_upgrade($request);
2174              
2175             # Check if this is an SSE request
2176 209   100     952 my $is_sse = !$is_websocket && $self->_is_sse_request($request);
2177              
2178             # Handle the request - store the Future to prevent "lost future" warning
2179 209         573 $self->{handling_request} = 1;
2180 209         882 $self->{request_start} = [gettimeofday];
2181 209         487 $self->{current_request} = $request; # Store for access logging
2182              
2183 209 100       573 if ($is_websocket) {
    100          
2184 22         81 $self->{request_future} = $self->_handle_websocket_request($request);
2185             } elsif ($is_sse) {
2186 15         56 $self->{request_future} = $self->_handle_sse_request($request);
2187             } else {
2188             # Start stall timer for HTTP requests (WebSocket/SSE have their own handling)
2189 172         880 $self->_start_stall_timer;
2190 172         626 $self->{request_future} = $self->_handle_request($request);
2191             }
2192              
2193             # Use adopt_future for proper error tracking instead of retain
2194             # This ensures errors are propagated to the server's error handling
2195 209         13715 $self->{server}->adopt_future($self->{request_future});
2196             }
2197              
2198             sub _is_websocket_upgrade {
2199 209     209   438 my ($self, $request) = @_;
2200              
2201             # Check for WebSocket upgrade headers
2202 209         301 my $has_upgrade = 0;
2203 209         374 my $has_connection_upgrade = 0;
2204 209         302 my $has_ws_key = 0;
2205              
2206 209         318 for my $header (@{$request->{headers}}) {
  209         636  
2207 646         1069 my ($name, $value) = @$header;
2208 646 100 66     1956 if ($name eq 'upgrade' && lc($value) eq 'websocket') {
    100          
    100          
2209 22         35 $has_upgrade = 1;
2210             }
2211             elsif ($name eq 'connection') {
2212             # Connection header can have multiple values
2213 193 100       965 $has_connection_upgrade = 1 if lc($value) =~ /upgrade/;
2214             }
2215             elsif ($name eq 'sec-websocket-key') {
2216 22         44 $has_ws_key = 1;
2217             }
2218             }
2219              
2220 209   66     770 return $has_upgrade && $has_connection_upgrade && $has_ws_key;
2221             }
2222              
2223             sub _is_sse_request {
2224 187     187   400 my ($self, $request) = @_;
2225              
2226             # SSE detection per spec:
2227             # - Accept header includes text/event-stream
2228             # - Request has not been upgraded to WebSocket (already checked)
2229             # Note: SSE works with any HTTP method (GET, POST, etc.) to support
2230             # modern patterns like htmx 4 and datastar using fetch-event-source
2231              
2232 187         324 for my $header (@{$request->{headers}}) {
  187         461  
2233 506         787 my ($name, $value) = @$header;
2234 506 100       942 if ($name eq 'accept') {
2235             # Check if Accept header includes text/event-stream
2236 15 50       88 return 1 if $value =~ m{text/event-stream};
2237             }
2238             }
2239              
2240 172         388 return 0;
2241             }
2242              
2243 172     172   288 async sub _handle_request {
2244 172         334 my ($self, $request) = @_;
2245              
2246 172         681 my $scope = $self->_create_scope($request);
2247 172         578 my $receive = $self->_create_receive($request);
2248 172         600 my $send = $self->_create_send($request);
2249              
2250 172         280 eval {
2251 172         862 await $self->{app}->($scope, $receive, $send);
2252             };
2253              
2254 171 100       12357 if (my $error = $@) {
2255             # Handle application error - always close connection after exception
2256             # If response already started, we can't send error page (3.17)
2257 15 100       67 if ($self->{response_started}) {
2258 2         10 $self->_flush_pending_headers; # don't lose a started response's headers
2259 2         462 warn "PAGI application error (after response started): $error\n";
2260             } else {
2261 13         62 $self->_send_error_response(500, "Internal Server Error");
2262 13         849 warn "PAGI application error: $error\n";
2263             }
2264             # Write access log before closing
2265 15         114 $self->_write_access_log;
2266             # Notify server that request completed (for max_requests tracking)
2267 15 50       158 $self->{server}->_on_request_complete if $self->{server};
2268             # Always close connection after exception (3.2) - don't try keep-alive
2269 15         68 $self->_handle_disconnect('server_error');
2270 15         68 $self->_close;
2271 15         1214 return;
2272             }
2273              
2274             # The application returned without starting a response. An incomplete
2275             # response is a protocol error: if the client is still connected, synthesize
2276             # a 500; either way do not keep-alive a connection on which no response was
2277             # written (that would hang the client, which is waiting for a response). A
2278             # response that was started but not completed is handled by the body-framing
2279             # and keep-alive logic below.
2280 156 100       455 if (!$self->{response_started}) {
2281 1 50       3 unless ($self->{closed}) {
2282 1         12 warn "PAGI application returned without starting a response\n";
2283 1         9 $self->_send_error_response(500, "Internal Server Error");
2284             }
2285 1         3 $self->_write_access_log;
2286 1 50       8 $self->{server}->_on_request_complete if $self->{server};
2287 1         5 $self->_handle_disconnect('server_error');
2288 1         4 $self->_close;
2289 1         48 return;
2290             }
2291              
2292             # Flush any headers buffered by response.start that were never paired with a
2293             # body write (a started-but-bodyless response).
2294 155         594 $self->_flush_pending_headers;
2295              
2296             # Write access log entry
2297 155         675 $self->_write_access_log;
2298              
2299             # Notify server that request completed (for max_requests tracking)
2300 155 50       1596 $self->{server}->_on_request_complete if $self->{server};
2301              
2302             # Stop stall timer - request completed successfully
2303 155         611 $self->_stop_stall_timer;
2304              
2305             # Request finished cleanly: fire on_complete (not on_disconnect) on the
2306             # HTTP connection-state object. Must happen on BOTH the keep-alive and
2307             # close paths, and before the keep-alive branch clears the state below.
2308             # Once marked complete, the non-keep-alive _handle_disconnect_and_close
2309             # call below no-ops the state transition, so on_disconnect never fires for
2310             # a completed request.
2311 155 50       567 if (my $conn_state = $self->{current_connection_state}) {
2312 155         794 $conn_state->_mark_complete;
2313             }
2314              
2315             # Determine if we should keep the connection alive
2316 155         497 my $keep_alive = $self->_should_keep_alive($request);
2317              
2318 155 100       406 if ($keep_alive) {
2319             # Reset for next request
2320 142         262 $self->{handling_request} = 0;
2321 142         264 $self->{response_started} = 0;
2322 142         234 $self->{_resp_pending} = undef;
2323 142         243 $self->{response_status} = undef;
2324 142         249 $self->{_response_size} = 0;
2325 142         301 $self->{request_start} = undef;
2326 142         246 $self->{current_request} = undef;
2327 142         260 $self->{request_future} = undef;
2328 142         236 $self->{current_connection_state} = undef; # Clear for next request
2329 142         222 $self->{current_transport_state} = undef; # New request gets a fresh handle
2330              
2331             # Check if there's more data in the buffer (pipelining)
2332 142 100       8611 if (length($self->{buffer}) > 0) {
2333 1         9 $self->_try_handle_request;
2334             }
2335             } else {
2336             # Not keeping alive - close connection
2337 13         82 $self->_handle_disconnect_and_close('request_complete');
2338             }
2339             }
2340              
2341             sub _should_keep_alive {
2342 155     155   317 my ($self, $request) = @_;
2343              
2344 155   50     436 my $http_version = $request->{http_version} // '1.1';
2345              
2346             # Check for Connection header
2347 155         260 my $connection_header;
2348 155         200 for my $header (@{$request->{headers}}) {
  155         472  
2349 302 100       758 if ($header->[0] eq 'connection') {
2350 146         319 $connection_header = lc($header->[1]);
2351 146         232 last;
2352             }
2353             }
2354              
2355             # HTTP/1.1: keep-alive by default unless Connection: close
2356 155 100       408 if ($http_version eq '1.1') {
2357 152 100 100     1176 return 0 if $connection_header && $connection_header =~ /close/;
2358 141         323 return 1;
2359             }
2360              
2361             # HTTP/1.0: close by default unless Connection: keep-alive
2362 3 50       8 if ($http_version eq '1.0') {
2363 3 100 66     13 return 1 if $connection_header && $connection_header =~ /keep-alive/;
2364 2         4 return 0;
2365             }
2366              
2367             # Unknown version: close connection
2368 0         0 return 0;
2369             }
2370              
2371             sub _create_scope {
2372 172     172   512 my ($self, $request) = @_;
2373              
2374             # Create connection state object for disconnect tracking
2375             # Uses lazy Future creation - Future only allocated if disconnect_future() is called
2376 172         2327 my $connection_state = PAGI::Server::ConnectionState->new(
2377             connection => $self,
2378             );
2379 172         443 $self->{current_connection_state} = $connection_state;
2380              
2381             my $scope = {
2382             type => 'http',
2383             pagi => {
2384             version => '0.3',
2385             spec_version => '0.3',
2386             },
2387             http_version => $request->{http_version},
2388             method => $request->{method},
2389             scheme => $self->_get_scheme,
2390             path => $request->{path},
2391             raw_path => $request->{raw_path},
2392             query_string => $request->{query_string},
2393             root_path => '',
2394             headers => $request->{headers},
2395             (defined $self->{client_host}
2396             ? (client => [$self->{client_host}, $self->{client_port}])
2397             : ()
2398             ),
2399             server => [$self->{server_host}, $self->{server_port}],
2400             # Optimized: avoid hash copy when state is empty (common case)
2401 172         1180 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  3         15  
2402             extensions => $self->_get_extensions_for_scope,
2403             # Connection state for non-destructive disconnect detection
2404             'pagi.connection' => $connection_state,
2405             # Outbound flow-control introspection (buffered_amount, watermarks,
2406             # on_high_water/on_drain). Stashed on the connection too, so the send
2407             # path can poke _check_watermarks after each write.
2408 172 100       1420 'pagi.transport' => ($self->{current_transport_state} = $self->_h1_transport_state),
    100          
2409             };
2410              
2411 172         604 return $scope;
2412             }
2413              
2414             sub _create_receive {
2415 172     172   375 my ($self, $request) = @_;
2416              
2417 172         290 my $content_length = $request->{content_length};
2418 172   50     495 my $is_chunked = $request->{chunked} // 0;
2419 172   50     420 my $expect_continue = $request->{expect_continue} // 0;
2420 172         285 my $continue_sent = 0;
2421 172         250 my $body_complete = 0;
2422 172         241 my $bytes_read = 0;
2423 172         240 my $chunk_size = 65536; # 64KB chunks for large bodies
2424              
2425             # For requests without Content-Length and not chunked, treat as no body
2426 172   100     817 my $has_body = defined($content_length) && $content_length > 0 || $is_chunked;
2427              
2428 172         305 weaken(my $weak_self = $self);
2429              
2430             # Return a wrapper that tracks the Future from the async receive
2431             return sub {
2432 57 50   57   1270 return Future->done({ type => 'http.disconnect' }) unless $weak_self;
2433 57 50       143 return Future->done({ type => 'http.disconnect' }) if $weak_self->{closed};
2434              
2435             # The actual async implementation
2436 57         136 my $future = (async sub {
2437 57 50       152 return { type => 'http.disconnect' } unless $weak_self;
2438 57 50       124 return { type => 'http.disconnect' } if $weak_self->{closed};
2439              
2440             # Check queue first - events from disconnect handler
2441 57 50       85 if (@{$weak_self->{receive_queue}}) {
  57         152  
2442 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2443             }
2444              
2445             # If body is already complete, wait for disconnect
2446 57 100       152 if ($body_complete) {
2447 1 50       3 if (!$weak_self->{receive_pending}) {
2448 1         3 $weak_self->{receive_pending} = Future->new;
2449             }
2450              
2451 1 50       6 if ($weak_self->{closed}) {
2452 0         0 $weak_self->{receive_pending} = undef;
2453 0         0 return { type => 'http.disconnect' };
2454             }
2455              
2456 1         2 my $result = await $weak_self->{receive_pending};
2457             # receive_pending may be completed with a value (disconnect event)
2458             # or just done() as a signal
2459 1 50       86 return $result if ref $result eq 'HASH';
2460             # If no value, check queue
2461 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
2462 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2463             }
2464 0         0 return { type => 'http.disconnect' };
2465             }
2466              
2467             # For requests without body, return empty body immediately
2468 56 100       183 if (!$has_body) {
2469 39         80 $body_complete = 1;
2470             return {
2471 39         554 type => 'http.request',
2472             body => '',
2473             more => 0,
2474             };
2475             }
2476              
2477             # Send 100 Continue if client expects it (before reading body)
2478 17 50 33     38 if ($expect_continue && !$continue_sent) {
2479 0         0 $continue_sent = 1;
2480 0         0 $weak_self->{stream}->write($weak_self->{protocol}->serialize_continue);
2481             }
2482              
2483             # Handle chunked Transfer-Encoding
2484 17 100       26 if ($is_chunked) {
2485             # Wait for data if buffer is empty
2486 1   33     5 while (length($weak_self->{buffer}) == 0 && !$weak_self->{closed}) {
2487 0 0       0 if (!$weak_self->{receive_pending}) {
2488 0         0 $weak_self->{receive_pending} = Future->new;
2489             }
2490 0         0 await $weak_self->{receive_pending};
2491 0         0 $weak_self->{receive_pending} = undef;
2492              
2493             # Check queue after waiting
2494 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
2495 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2496             }
2497             }
2498              
2499             # Try to parse chunked data
2500 1         5 my ($data, $consumed, $complete) = $weak_self->{protocol}->parse_chunked_body($weak_self->{buffer});
2501              
2502             # Check for parse error (invalid chunk size)
2503 1 0 33     3 if (ref($data) eq 'HASH' && $data->{error}) {
2504 0         0 $weak_self->_handle_disconnect('protocol_error');
2505 0   0     0 $weak_self->_send_error_response($data->{error}, $data->{message} // 'Bad Request');
2506 0         0 $weak_self->_close;
2507 0         0 return { type => 'http.disconnect' };
2508             }
2509              
2510 1 50       4 if ($consumed > 0) {
2511 1         2 substr($weak_self->{buffer}, 0, $consumed) = '';
2512              
2513             # Track total bytes read for max_body_size check
2514 1   50     3 $bytes_read += length($data // '');
2515              
2516             # Check max_body_size for chunked requests (0 = unlimited)
2517 1 50 33     6 if ($weak_self->{max_body_size} && $bytes_read > $weak_self->{max_body_size}) {
2518             # Body too large - close connection
2519 0         0 $weak_self->_send_error_response(413, 'Payload Too Large');
2520 0         0 $weak_self->_handle_disconnect('body_too_large');
2521 0         0 $weak_self->_close;
2522 0         0 return { type => 'http.disconnect' };
2523             }
2524              
2525 1 50       2 if ($complete) {
2526 1         1 $body_complete = 1;
2527             }
2528              
2529             return {
2530 1 50 50     13 type => 'http.request',
2531             body => $data // '',
2532             more => $complete ? 0 : 1,
2533             };
2534             }
2535              
2536             # Need more data - wait for it
2537 0 0       0 if (!$weak_self->{receive_pending}) {
2538 0         0 $weak_self->{receive_pending} = Future->new;
2539             }
2540 0         0 await $weak_self->{receive_pending};
2541 0         0 $weak_self->{receive_pending} = undef;
2542              
2543             # Recursive call to re-process - but we can't use __SUB__ in nested async
2544             # Just return disconnect if closed
2545 0 0       0 return { type => 'http.disconnect' } if $weak_self->{closed};
2546             # This shouldn't happen often - caller should retry
2547 0         0 return { type => 'http.request', body => '', more => 1 };
2548             }
2549              
2550             # Handle Content-Length based body reading
2551 16         25 my $remaining = $content_length - $bytes_read;
2552              
2553 16 50       27 if ($remaining <= 0) {
2554 0         0 $body_complete = 1;
2555             return {
2556 0         0 type => 'http.request',
2557             body => '',
2558             more => 0,
2559             };
2560             }
2561              
2562             # Wait for data if buffer is empty
2563 16   66     75 while (length($weak_self->{buffer}) == 0 && !$weak_self->{closed}) {
2564 12 50       19 if (!$weak_self->{receive_pending}) {
2565 12         30 $weak_self->{receive_pending} = Future->new;
2566             }
2567 12         80 await $weak_self->{receive_pending};
2568 12         719 $weak_self->{receive_pending} = undef;
2569              
2570             # Check queue after waiting
2571 12 50       12 if (@{$weak_self->{receive_queue}}) {
  12         38  
2572 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
2573             }
2574             }
2575              
2576             # Return disconnect if closed while waiting
2577 16 50 33     36 if ($weak_self->{closed} && length($weak_self->{buffer}) == 0) {
2578 0         0 return { type => 'http.disconnect' };
2579             }
2580              
2581             # Read up to chunk_size or remaining bytes, whichever is smaller
2582 16 100       30 my $to_read = $remaining < $chunk_size ? $remaining : $chunk_size;
2583 16 100       42 $to_read = length($weak_self->{buffer}) if length($weak_self->{buffer}) < $to_read;
2584              
2585 16         58 my $body = substr($weak_self->{buffer}, 0, $to_read, '');
2586 16         21 $bytes_read += length($body);
2587              
2588             # Check if we've read all the body
2589 16 100       33 my $more = ($bytes_read < $content_length) ? 1 : 0;
2590              
2591 16 100       24 if (!$more) {
2592 4         5 $body_complete = 1;
2593             }
2594              
2595             return {
2596 16         98 type => 'http.request',
2597             body => $body,
2598             more => $more,
2599             };
2600 57         499 })->();
2601              
2602             # Track this Future so we can cancel it on close
2603 57         3139 push @{$weak_self->{receive_futures}}, $future;
  57         148  
2604              
2605             # Clean up completed futures from the list
2606 57         96 @{$weak_self->{receive_futures}} = grep { !$_->is_ready } @{$weak_self->{receive_futures}};
  57         306  
  68         195  
  57         125  
2607              
2608 57         273 return $future;
2609 172         1171 };
2610             }
2611              
2612             sub _create_send {
2613 172     172   439 my ($self, $request) = @_;
2614              
2615 172         274 my $chunked = 0;
2616 172         256 my $response_started = 0;
2617 172         268 my $expects_trailers = 0;
2618 172         243 my $body_complete = 0;
2619 172   50     466 my $is_head_request = ($request->{method} // '') eq 'HEAD';
2620 172   50     410 my $http_version = $request->{http_version} // '1.1';
2621 172         361 my $is_http10 = ($http_version eq '1.0');
2622              
2623             # Check if HTTP/1.0 client requested keep-alive
2624 172         217 my $client_wants_keepalive = 0;
2625 172 100       529 if ($is_http10) {
2626 3         5 for my $h (@{$request->{headers}}) {
  3         7  
2627 3 100 66     11 if ($h->[0] eq 'connection' && lc($h->[1]) =~ /keep-alive/) {
2628 1         2 $client_wants_keepalive = 1;
2629 1         6 last;
2630             }
2631             }
2632             }
2633              
2634 172         307 weaken(my $weak_self = $self);
2635              
2636 328     328   709600 return async sub {
2637 328         497 my ($event) = @_;
2638 328 50       650 return Future->done unless $weak_self;
2639 328 50       683 return Future->done if $weak_self->{closed};
2640              
2641             # Reset stall timer on write activity
2642 328         1079 $weak_self->_reset_stall_timer;
2643              
2644 328   50     675 my $type = $event->{type} // '';
2645              
2646             # Dev-mode event validation (PAGI spec compliance)
2647 328 50       711 if ($weak_self->{validate_events}) {
2648 0         0 require PAGI::Server::EventValidator;
2649 0         0 PAGI::Server::EventValidator::validate_http_send($event);
2650             }
2651              
2652 328 100       949 if ($type eq 'http.response.start') {
    100          
    100          
    50          
2653 158 50       328 return if $response_started;
2654 158         210 $response_started = 1;
2655 158         335 $weak_self->{response_started} = 1;
2656             $weak_self->{current_connection_state}->_mark_response_started
2657 158 50       926 if $weak_self->{current_connection_state};
2658 158   50     438 $weak_self->{response_status} = $event->{status} // 200; # Track for logging
2659 158   100     627 $expects_trailers = $event->{trailers} // 0;
2660              
2661 158   50     320 my $status = $event->{status} // 200;
2662 158   50     322 my $headers = $event->{headers} // [];
2663              
2664             # Check if we need chunked encoding (no Content-Length)
2665 158         349 my $has_content_length = 0;
2666 158         330 for my $h (@$headers) {
2667 162 100       561 if (lc($h->[0]) eq 'content-length') {
2668 16         21 $has_content_length = 1;
2669 16         30 last;
2670             }
2671             }
2672              
2673             # Add Date header
2674 158         366 my @final_headers = @$headers;
2675 158         1115 push @final_headers, ['date', $weak_self->{protocol}->format_date];
2676              
2677             # For HEAD requests, don't use chunked encoding (no body will be sent)
2678             # For HTTP/1.0, don't use chunked encoding - use Connection: close instead
2679 158 100 100     753 if ($is_head_request || $is_http10) {
2680 4         6 $chunked = 0;
2681 4 100       7 if ($is_http10) {
2682 3 100       7 if (!$has_content_length) {
    100          
2683             # No Content-Length means we can't do keep-alive
2684 1         4 push @final_headers, ['connection', 'close'];
2685             } elsif ($client_wants_keepalive) {
2686             # HTTP/1.0 client requested keep-alive and we can honor it
2687             # Must explicitly acknowledge with Connection: keep-alive
2688 1         3 push @final_headers, ['connection', 'keep-alive'];
2689             }
2690             }
2691             } else {
2692 154         338 $chunked = !$has_content_length;
2693             }
2694              
2695             my $response = $weak_self->{protocol}->serialize_response_start(
2696 158         930 $status, \@final_headers, $chunked, $http_version
2697             );
2698              
2699             # Buffer the headers instead of writing them now; they are flushed
2700             # together with the first body write (or at finalization). This
2701             # coalesces the common "start + complete body" case into a single
2702             # stream write instead of one per headers/chunk/terminator.
2703 157         559 $weak_self->{_resp_pending} = $response;
2704             }
2705             elsif ($type eq 'http.response.body') {
2706 166 50       362 return unless $response_started;
2707 166 100       337 return if $body_complete;
2708              
2709             # For HEAD requests, suppress the body but track completion
2710 165 100       410 if ($is_head_request) {
2711 1   50     3 my $more = $event->{more} // 0;
2712             # HEAD has headers but no body, so flush the buffered headers now.
2713 1         4 $weak_self->_flush_pending_headers;
2714 1 50       194 if (!$more) {
2715 1         2 $body_complete = 1;
2716             }
2717 1         5 return; # Don't send any body for HEAD
2718             }
2719              
2720             # --- BACKPRESSURE CHECK ---
2721             # Wait for buffer to drain if we're above high watermark
2722             # This prevents unbounded memory growth with slow clients
2723 164 50       541 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
2724 0         0 await $weak_self->_wait_for_drain;
2725             # Re-check connection state after await
2726 0 0       0 return Future->done unless $weak_self;
2727 0 0       0 return Future->done if $weak_self->{closed};
2728             }
2729             # --- END BACKPRESSURE CHECK ---
2730              
2731             # Determine body source: body, file, or fh (mutually exclusive)
2732 164         269 my $body = $event->{body};
2733 164         330 my $file = $event->{file};
2734 164         260 my $fh = $event->{fh};
2735 164   100     702 my $offset = $event->{offset} // 0;
2736 164         316 my $length = $event->{length};
2737              
2738 164 100       550 if (defined $file) {
    100          
2739             # File path response - stream from file (async, non-blocking)
2740             # File responses are implicitly complete (more is ignored)
2741 10         31 $weak_self->_flush_pending_headers; # headers before the file body
2742 10         1918 await $weak_self->_send_file_response($file, $offset, $length, $chunked);
2743 10         1612 $body_complete = 1;
2744             }
2745             elsif (defined $fh) {
2746             # Filehandle response - stream from handle (async, non-blocking)
2747             # Filehandle responses are implicitly complete (more is ignored)
2748 6         32 $weak_self->_flush_pending_headers; # headers before the fh body
2749 6         1171 await $weak_self->_send_fh_response($fh, $offset, $length, $chunked);
2750 6         588 $body_complete = 1;
2751             }
2752             else {
2753             # Traditional body response
2754 148   50     497 $body //= '';
2755 148   100     469 my $more = $event->{more} // 0;
2756              
2757 148         322 $weak_self->{_response_size} += length($body);
2758              
2759             # Coalesce any buffered headers, the body (chunk framing if
2760             # chunked), and the final terminator into a single stream write.
2761             # The common start + complete-body response becomes one write
2762             # rather than three.
2763 148         244 my $out = $weak_self->{_resp_pending};
2764 148 100       344 $out = '' unless defined $out;
2765 148         274 $weak_self->{_resp_pending} = undef;
2766              
2767 148 100       372 if ($chunked) {
2768 142 50       284 if (length $body) {
2769 142         571 my $len = sprintf("%x", length($body));
2770 142         347 $out .= "$len\r\n$body\r\n";
2771             }
2772 142 100 100     619 if (!$more && !$expects_trailers) {
2773 131         230 $out .= "0\r\n\r\n";
2774             }
2775             }
2776             else {
2777 6         12 $out .= $body;
2778             }
2779              
2780 148 50       847 $weak_self->{stream}->write($out) if length $out;
2781 148         33592 $weak_self->_notify_transport_write;
2782              
2783             # Handle completion for body responses
2784 148 100       337 if (!$more) {
2785 138         288 $body_complete = 1;
2786             }
2787             }
2788             }
2789             elsif ($type eq 'http.response.trailers') {
2790 1 50       3 return unless $response_started;
2791 1 50       2 return unless $expects_trailers;
2792 1 50       3 return unless $chunked; # Trailers only work with chunked encoding
2793              
2794 1   50     3 my $trailer_headers = $event->{headers} // [];
2795              
2796             # Send final chunk + trailers (prepend any still-buffered headers).
2797 1   50     5 my $trailers = $weak_self->{_resp_pending} // '';
2798 1         2 $weak_self->{_resp_pending} = undef;
2799 1         2 $trailers .= "0\r\n";
2800 1         3 for my $header (@$trailer_headers) {
2801 1         2 my ($name, $value) = @$header;
2802 1         4 $name = _validate_header_name($name);
2803 1         27 $value = _validate_header_value($value);
2804 1         4 $trailers .= "$name: $value\r\n";
2805             }
2806 1         2 $trailers .= "\r\n";
2807              
2808 1         4 $weak_self->{stream}->write($trailers);
2809 1         108 $body_complete = 1;
2810             }
2811             elsif ($type eq 'http.fullflush') {
2812             # Fullflush extension - force immediate TCP buffer flush
2813             # Per spec: servers that don't advertise the extension must reject
2814 3 100       9 unless (exists $weak_self->{extensions}{fullflush}) {
2815 1         12 warn "PAGI: http.fullflush event rejected - extension not enabled\n";
2816 1         13 die "Extension not enabled: fullflush\n";
2817             }
2818              
2819             # Force flush by ensuring TCP_NODELAY and flushing any pending writes
2820 2         9 my $handle = $weak_self->{stream}->write_handle;
2821 2 50 33     23 if ($handle && $handle->can('setsockopt')) {
2822             # Ensure TCP_NODELAY is set to disable Nagle buffering
2823 2         16 require Socket;
2824 2         10 $handle->setsockopt(Socket::IPPROTO_TCP(), Socket::TCP_NODELAY(), 1);
2825             }
2826              
2827             # In IO::Async, writes are queued and sent when the event loop allows.
2828             # The above TCP_NODELAY ensures no Nagle buffering delays.
2829             # For this reference implementation, we return immediately as the
2830             # write buffer will be flushed by the event loop.
2831             }
2832             else {
2833             # Per PAGI spec: servers must raise exceptions for unrecognized event types
2834 0         0 _unrecognized_event_type($type, 'http');
2835             }
2836              
2837 324         2345 return;
2838 172         2473 };
2839             }
2840              
2841             # Flush any response headers buffered by http.response.start that were not yet
2842             # paired with a body write (HEAD/file/fh paths, started-but-incomplete responses).
2843             sub _flush_pending_headers {
2844 174     174   344 my ($self) = @_;
2845 174         352 my $pending = $self->{_resp_pending};
2846 174 100 66     631 return unless defined $pending && length $pending;
2847 18         32 $self->{_resp_pending} = undef;
2848 18         96 $self->{stream}->write($pending);
2849             }
2850              
2851             sub _send_error_response {
2852 23     23   59 my ($self, $status, $message) = @_;
2853              
2854 23 50       78 return if $self->{closed};
2855 23 50       75 return if $self->{response_started};
2856              
2857 23         38 my $body = $message;
2858             my $headers = [
2859             ['content-type', 'text/plain'],
2860             ['content-length', length($body)],
2861 23         207 ['date', $self->{protocol}->format_date],
2862             ];
2863              
2864 23         124 my $response = $self->{protocol}->serialize_response_start($status, $headers, 0);
2865 23         54 $response .= $body;
2866              
2867 23         136 $self->{stream}->write($response);
2868 23         5747 $self->{response_started} = 1;
2869             # A server-synthesized response is still "this request's response started".
2870             $self->{current_connection_state}->_mark_response_started
2871 23 100       170 if $self->{current_connection_state};
2872 23         103 $self->{response_status} = $status; # Track for logging
2873             }
2874              
2875             sub _write_access_log {
2876 207     207   412 my ($self) = @_;
2877              
2878 207 100       621 return unless $self->{access_log};
2879 199 50       541 return unless $self->{current_request};
2880              
2881 199         361 my $request = $self->{current_request};
2882              
2883             # Calculate request duration
2884 199         362 my $duration = 0;
2885 199 50       604 if ($self->{request_start}) {
2886 199         1477 $duration = tv_interval($self->{request_start});
2887             }
2888              
2889             # Per-second cached CLF timestamp
2890 199         3104 my $now = time();
2891 199 100       539 if ($now != $_cached_log_time) {
2892 61         108 $_cached_log_time = $now;
2893 61         329 my @gmt = gmtime($now);
2894 61         458 my @months = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);
2895 61         639 $_cached_log_timestamp = sprintf("%02d/%s/%04d:%02d:%02d:%02d +0000",
2896             $gmt[3], $months[$gmt[4]], $gmt[5] + 1900,
2897             $gmt[2], $gmt[1], $gmt[0]);
2898             }
2899              
2900             my $info = {
2901             client_ip => $self->{client_host} // ($self->{transport_type} eq 'unix' ? 'unix' : '-'),
2902             timestamp => $_cached_log_timestamp,
2903             method => $request->{method} // '-',
2904             path => $request->{raw_path} // '/',
2905             query => $request->{query_string},
2906             http_version => $request->{http_version} // '1.1',
2907             status => $self->{response_status} // '-',
2908             size => $self->{_response_size} // 0,
2909             duration => $duration,
2910 199 50 66     4104 request_headers => $request->{headers} // [],
      50        
      50        
      50        
      50        
      50        
      50        
2911             };
2912              
2913 199         430 my $formatter = $self->{_access_log_formatter};
2914 199 50       420 if ($formatter) {
2915 199         258 print {$self->{access_log}} $formatter->($info), "\n";
  199         871  
2916             }
2917             else {
2918             # Fallback (should not happen with properly initialized server)
2919 0         0 my $path = $info->{path};
2920 0         0 my $query = $info->{query};
2921 0 0 0     0 $path .= "?$query" if defined $query && length $query;
2922 0         0 print {$self->{access_log}} "$info->{client_ip} - - [$info->{timestamp}] \"$info->{method} $path\" $info->{status} $info->{duration}s\n";
  0         0  
2923             }
2924             }
2925              
2926             # Reasons passed to _handle_disconnect only for teardown after a clean finish
2927             # (the app has already returned). They are completions, not abnormal disconnects,
2928             # and must not surface as a disconnect reason to the application.
2929             my %COMPLETION_REASON = map { ($_ => 1) } qw(
2930             request_complete
2931             stream_complete
2932             session_complete
2933             );
2934              
2935             # Build the app-facing websocket.disconnect event for a server-detected close.
2936             # The code and reason come from the close the server initiated; the defaults are
2937             # the RFC 6455 "abnormal closure, no status received" pair (1006 / empty), used
2938             # when the connection dropped with no close handshake (timeout, TCP FIN).
2939             sub _ws_disconnect_event {
2940 36     36   57 my ($self) = @_;
2941             return {
2942             type => 'websocket.disconnect',
2943             code => $self->{ws_disconnect_code} // 1006,
2944 36   100     325 reason => $self->{ws_disconnect_reason} // '',
      100        
2945             };
2946             }
2947              
2948             sub _handle_disconnect {
2949 442     442   880 my ($self, $reason) = @_;
2950              
2951             # Idempotency guard - prevent duplicate disconnect handling
2952             # Multiple paths can trigger disconnect (timeout, protocol error, session end)
2953 442 100       1561 return if $self->{_disconnect_handled};
2954 273         647 $self->{_disconnect_handled} = 1;
2955              
2956             # Cancel any pending drain waiters (backpressure)
2957 273         1589 $self->_cancel_drain_waiters($reason);
2958              
2959             # Auto-detect server shutdown (PAGI spec compliance)
2960             # If no explicit reason and server is shutting down, use server_shutdown
2961 273 50 66     911 if (!$reason && $self->{server} && $self->{server}{shutting_down}) {
      33        
2962 0         0 $reason = 'server_shutdown';
2963             }
2964              
2965             # Default reason is client_closed (TCP FIN received)
2966 273   100     700 $reason //= 'client_closed';
2967              
2968             # A clean completion is not an abnormal disconnect: don't surface its reason.
2969 273         715 my $is_completion = $COMPLETION_REASON{$reason};
2970              
2971             # Mark HTTP connection state as disconnected (abnormal only).
2972             # Only for HTTP - WebSocket/SSE have their own patterns.
2973 273 50 66     2903 if ($self->{current_connection_state} && !$self->{websocket_mode} && !$self->{sse_mode}) {
      66        
2974 30 100       205 $self->{current_connection_state}->_mark_disconnected($reason)
2975             unless $is_completion;
2976             }
2977              
2978             # Record the abnormal reason so the WebSocket disconnect event reports it
2979             # (instead of the old empty string). SSE tracks its own reason at the
2980             # detection sites via sse_disconnect_reason.
2981 273 100 100     977 if ($self->{websocket_mode} && !$is_completion) {
2982 16         36 $self->{ws_disconnect_reason} = $reason;
2983             }
2984              
2985             # Determine disconnect event type based on mode
2986 273         575 my $disconnect_event;
2987 273 100       1164 if ($self->{websocket_mode}) {
    100          
2988 18         92 $disconnect_event = $self->_ws_disconnect_event;
2989             } elsif ($self->{sse_mode}) {
2990             $disconnect_event = {
2991             type => 'sse.disconnect',
2992 15   50     85 reason => $self->{sse_disconnect_reason} // 'client_closed',
2993             };
2994             } else {
2995 240         832 $disconnect_event = { type => 'http.disconnect' };
2996             }
2997              
2998             # Queue disconnect event (do this even if already closed)
2999 273         479 push @{$self->{receive_queue}}, $disconnect_event;
  273         799  
3000              
3001             # Complete any pending receive
3002 273 100 100     1183 if ($self->{receive_pending} && !$self->{receive_pending}->is_ready) {
3003 16         144 $self->{receive_pending}->done($disconnect_event);
3004 16         3635 $self->{receive_pending} = undef;
3005             }
3006             }
3007              
3008             # Send a WebSocket close frame with status code and optional reason
3009             # Per RFC 6455 Section 7.4, common codes:
3010             # 1000 - Normal closure
3011             # 1007 - Invalid frame payload data (e.g., invalid UTF-8)
3012             # 1009 - Message too big
3013             # 1011 - Unexpected condition
3014             sub _send_close_frame {
3015 4     4   17 my ($self, $code, $reason) = @_;
3016 4   50     13 $reason //= '';
3017              
3018 4 50       22 return unless $self->{stream};
3019 4 50       14 return if $self->{close_sent};
3020              
3021             # Remember the wire code so the app-facing websocket.disconnect event reports
3022             # the same code the peer received, rather than the 1006 abnormal-close default.
3023 4         11 $self->{ws_disconnect_code} = $code;
3024              
3025 4         44 my $frame = Protocol::WebSocket::Frame->new(
3026             type => 'close',
3027             buffer => pack('n', $code) . $reason,
3028             );
3029              
3030 4         168 $self->{stream}->write($frame->to_bytes);
3031 4         1193 $self->{close_sent} = 1;
3032             }
3033              
3034             sub _close {
3035 444     444   2113468 my ($self) = @_;
3036              
3037 444 100       2278 return if $self->{closed};
3038 273         527 $self->{closed} = 1;
3039              
3040             # Cancel pending drain waiters early (before other cleanup)
3041 273         758 $self->_cancel_drain_waiters('connection closing');
3042              
3043             # Clean up HTTP/2 per-stream state
3044 273 50       913 if ($self->{h2_streams}) {
3045 273         457 for my $stream (values %{$self->{h2_streams}}) {
  273         1047  
3046 29 100 66     179 if ($stream->{body_pending} && !$stream->{body_pending}->is_ready) {
3047             my $event = $stream->{is_sse}
3048 9 100       120 ? { type => 'sse.disconnect', reason => 'client_closed' }
3049             : { type => 'http.disconnect' };
3050 9         47 $stream->{body_pending}->done($event);
3051             }
3052             # Release producers blocked on per-stream backpressure so they
3053             # don't hang on a connection that is going away.
3054 29         1584 $self->_h2_resolve_stream_drain_waiters($stream);
3055             # Drop (don't fire) the app's on_drain fires: the connection is going
3056             # away, not draining. Also break the $stream <-> transport_state cycle
3057             # so the stream state is freed when h2_streams is deleted below.
3058 29         88 $stream->{transport_drain_fires} = [];
3059 29         270 delete $stream->{transport_state};
3060             }
3061 273         975 delete $self->{h2_streams};
3062             }
3063 273 100       897 if ($self->{h2_session}) {
3064 67         174 eval { $self->{h2_session}->terminate(0) };
  67         507  
3065 67         8105 delete $self->{h2_session};
3066             }
3067              
3068             # Clean up WebSocket frame parser to free memory immediately
3069 273         754 delete $self->{websocket_frame};
3070              
3071             # Remove from server's connection list (O(1) hash delete)
3072 273 50       910 if ($self->{server}) {
3073 273         1147 delete $self->{server}{connections}{refaddr($self)};
3074              
3075             # Signal drain complete if this was the last connection during shutdown
3076 273 100 100     1072 if ($self->{server}{shutting_down} &&
      100        
      66        
3077 137         908 keys %{$self->{server}{connections}} == 0 &&
3078             $self->{server}{drain_complete} &&
3079             !$self->{server}{drain_complete}->is_ready) {
3080 1         16 $self->{server}{drain_complete}->done;
3081             }
3082             }
3083              
3084             # Stop idle timer
3085 273         1530 $self->_stop_idle_timer;
3086              
3087             # Stop stall timer
3088 273         1119 $self->_stop_stall_timer;
3089              
3090             # Stop WS/SSE idle timers
3091 273         1247 $self->_stop_ws_idle_timer;
3092 273         1255 $self->_stop_sse_idle_timer;
3093              
3094             # Stop keepalive timers
3095 273         1249 $self->_stop_ws_keepalive;
3096 273         1129 $self->_stop_sse_keepalive;
3097              
3098             # Note: _close is resource cleanup ONLY. Callers should use
3099             # _handle_disconnect_and_close() which handles both protocol
3100             # notification and cleanup.
3101              
3102             # Determine disconnect event type based on mode
3103 273         522 my $disconnect_event;
3104 273 100       1060 if ($self->{websocket_mode}) {
    100          
3105 18         73 $disconnect_event = $self->_ws_disconnect_event;
3106             } elsif ($self->{sse_mode}) {
3107             $disconnect_event = {
3108             type => 'sse.disconnect',
3109 15   50     71 reason => $self->{sse_disconnect_reason} // 'client_closed',
3110             };
3111             } else {
3112 240         721 $disconnect_event = { type => 'http.disconnect' };
3113             }
3114              
3115             # Cancel any tracked receive Futures that are still pending
3116 273         426 for my $future (@{$self->{receive_futures}}) {
  273         802  
3117 18 50       89 if (!$future->is_ready) {
3118             # Complete with disconnect event instead of cancelling
3119             # This allows the async sub to complete cleanly
3120 0         0 $future->done($disconnect_event);
3121             }
3122             }
3123 273         702 $self->{receive_futures} = [];
3124              
3125 273 50       1012 if ($self->{stream}) {
3126 273         1320 $self->{stream}->close_when_empty;
3127             }
3128             }
3129              
3130             # Combined disconnect and close - use this from callbacks where $weak_self may
3131             # become undefined after _handle_disconnect completes its Future callbacks.
3132             # This method holds a strong reference to $self throughout the operation.
3133             sub _handle_disconnect_and_close {
3134 420     420   1261 my ($self, $reason) = @_;
3135 420         1779 $self->_handle_disconnect($reason);
3136 420         1753 $self->_close;
3137             }
3138              
3139             #
3140             # TLS Support Methods
3141             #
3142              
3143             sub _extract_tls_info {
3144 45     45   92 my ($self) = @_;
3145              
3146 45         91 my $stream = $self->{stream};
3147 45         169 my $handle = $stream->read_handle;
3148              
3149             # Check if handle is an IO::Socket::SSL
3150 45 50 33     608 return unless $handle && $handle->isa('IO::Socket::SSL');
3151              
3152 45         453 my $tls_info = {
3153             server_cert => undef,
3154             client_cert_chain => [],
3155             client_cert_name => undef,
3156             client_cert_error => undef,
3157             tls_version => undef,
3158             cipher_suite => undef,
3159             };
3160              
3161             # Get TLS version - IO::Socket::SSL returns something like 'TLSv1_3'
3162 45 50       308 if (my $version_str = $handle->get_sslversion) {
3163             # Map version string to numeric value per TLS spec
3164 45         1128 my %version_map = (
3165             'SSLv3' => 0x0300,
3166             'TLSv1' => 0x0301,
3167             'TLSv1_1' => 0x0302,
3168             'TLSv1_2' => 0x0303,
3169             'TLSv1_3' => 0x0304,
3170             );
3171 45         148 $tls_info->{tls_version} = $version_map{$version_str};
3172             }
3173              
3174             # Cipher suite (numeric IANA id). Net::SSLeay/IO::Socket::SSL expose only the
3175             # cipher *name*, not the 16-bit id the spec asks for. For TLS 1.3 the OpenSSL
3176             # name IS the IANA name and the registry is frozen at five suites, so we map
3177             # those exactly. For TLS 1.2 the names are OpenSSL-specific (a large, shifting
3178             # set), so we leave cipher_suite undef -- the spec permits undef when the
3179             # server cannot determine the value.
3180 45 50       135 if (my $cipher_name = $handle->get_cipher) {
3181 45         758 my %tls13_cipher_suites = (
3182             'TLS_AES_128_GCM_SHA256' => 0x1301,
3183             'TLS_AES_256_GCM_SHA384' => 0x1302,
3184             'TLS_CHACHA20_POLY1305_SHA256' => 0x1303,
3185             'TLS_AES_128_CCM_SHA256' => 0x1304,
3186             'TLS_AES_128_CCM_8_SHA256' => 0x1305,
3187             );
3188             $tls_info->{cipher_suite} = $tls13_cipher_suites{$cipher_name}
3189 45 50       192 if exists $tls13_cipher_suites{$cipher_name};
3190             }
3191              
3192             # Get server certificate (our certificate)
3193             # IO::Socket::SSL uses sock_certificate() for the server's own cert
3194 45         88 eval {
3195 45         223 my $cert = $handle->sock_certificate;
3196 45 50       597 if ($cert) {
3197 45         427 require Net::SSLeay;
3198 45         1161 $tls_info->{server_cert} = Net::SSLeay::PEM_get_string_X509($cert);
3199             }
3200             };
3201 45 50       136 if ($@) {
3202 0         0 warn "TLS server certificate extraction error: $@\n";
3203             }
3204              
3205             # Get client certificate if provided
3206 45         79 eval {
3207 45         193 my $client_cert = $handle->peer_certificate;
3208 45 100       2139 if ($client_cert) {
3209 8         144 require Net::SSLeay;
3210              
3211             # Get client cert chain
3212 8         24 my @chain;
3213 8         104 push @chain, Net::SSLeay::PEM_get_string_X509($client_cert);
3214              
3215             # Try to get additional certs in chain
3216 8 50       24 if (my $ssl = $handle->_get_ssl_object) {
3217 8         132 my $chain_obj = Net::SSLeay::get_peer_cert_chain($ssl);
3218 8 50       40 if ($chain_obj) {
3219 0         0 for my $i (0 .. Net::SSLeay::sk_X509_num($chain_obj) - 1) {
3220 0         0 my $cert = Net::SSLeay::sk_X509_value($chain_obj, $i);
3221 0 0       0 push @chain, Net::SSLeay::PEM_get_string_X509($cert) if $cert;
3222             }
3223             }
3224             }
3225 8         20 $tls_info->{client_cert_chain} = \@chain;
3226              
3227             # Get client cert DN (Subject)
3228 8         52 my $subject = Net::SSLeay::X509_NAME_oneline(
3229             Net::SSLeay::X509_get_subject_name($client_cert)
3230             );
3231 8 50       36 $tls_info->{client_cert_name} = $subject if $subject;
3232              
3233             # Check for verification errors
3234 8         32 my $verify_result = $handle->get_sslversion_int;
3235             # Actually, use verify_result
3236 8 50       80 if (my $ssl = $handle->_get_ssl_object) {
3237 8         68 my $result = Net::SSLeay::get_verify_result($ssl);
3238 8 50       24 if ($result != 0) { # X509_V_OK = 0
3239 0         0 $tls_info->{client_cert_error} = Net::SSLeay::X509_verify_cert_error_string($result);
3240             }
3241             }
3242             }
3243             };
3244 45 50       145 if ($@) {
3245 0         0 warn "TLS client certificate extraction error: $@\n";
3246             }
3247              
3248 45         115 $self->{tls_info} = $tls_info;
3249             }
3250              
3251             sub _get_scheme {
3252 187     187   376 my ($self) = @_;
3253              
3254 187 100       1725 return $self->{tls_enabled} ? 'https' : 'http';
3255             }
3256              
3257             sub _get_ws_scheme {
3258 39     39   96 my ($self) = @_;
3259              
3260 39 50       405 return $self->{tls_enabled} ? 'wss' : 'ws';
3261             }
3262              
3263             sub _get_extensions_for_scope {
3264 276     276   518 my ($self) = @_;
3265              
3266 276         452 my %extensions = %{$self->{extensions}};
  276         782  
3267              
3268             # Add TLS info to extensions if this is a TLS connection
3269 276 100 66     1416 if ($self->{tls_enabled} && $self->{tls_info}) {
    50          
3270 45         117 $extensions{tls} = $self->{tls_info};
3271             }
3272             # Remove tls extension if not a TLS connection (per spec)
3273             elsif (!$self->{tls_enabled}) {
3274 231         428 delete $extensions{tls};
3275             }
3276              
3277 276         1759 return \%extensions;
3278             }
3279              
3280             #
3281             # SSE (Server-Sent Events) Support Methods
3282             #
3283              
3284 15     15   31 async sub _handle_sse_request {
3285 15         29 my ($self, $request) = @_;
3286              
3287 15         27 $self->{sse_mode} = 1;
3288 15         49 $self->_stop_idle_timer; # SSE connections are long-lived
3289 15         62 $self->_start_sse_idle_timer; # Start SSE-specific idle timer if configured
3290              
3291 15         51 my $scope = $self->_create_sse_scope($request);
3292 15         54 my $receive = $self->_create_sse_receive($request);
3293 15         60 my $send = $self->_create_sse_send($request);
3294              
3295 15         29 eval {
3296 15         71 await $self->{app}->($scope, $receive, $send);
3297             };
3298              
3299 15 50       1308 if (my $error = $@) {
3300             # If SSE not yet started, send HTTP error
3301 0 0       0 if (!$self->{sse_started}) {
3302 0         0 $self->_send_error_response(500, "Internal Server Error");
3303             }
3304 0         0 warn "PAGI application error (SSE): $error\n";
3305             }
3306              
3307             # Send chunked terminator if SSE was started (uses chunked encoding)
3308             # Check both closed flag and that stream is still writable
3309 15 100 33     183 if ($self->{sse_started} && !$self->{closed} &&
      33        
      66        
3310             $self->{stream} && $self->{stream}->write_handle) {
3311 14         103 $self->{stream}->write("0\r\n\r\n");
3312             }
3313              
3314             # Write access log entry (logs at connection close with total duration)
3315 15         1170 $self->_write_access_log;
3316              
3317             # Close connection after SSE stream ends
3318 15         61 $self->_handle_disconnect_and_close('stream_complete');
3319             }
3320              
3321             sub _create_sse_scope {
3322 15     15   32 my ($self, $request) = @_;
3323              
3324             my $scope = {
3325             type => 'sse',
3326             pagi => {
3327             version => '0.3',
3328             spec_version => '0.3',
3329             },
3330             http_version => $request->{http_version},
3331             method => $request->{method},
3332             scheme => $self->_get_scheme,
3333             path => $request->{path},
3334             raw_path => $request->{raw_path},
3335             query_string => $request->{query_string},
3336             root_path => '',
3337             headers => $request->{headers},
3338             (defined $self->{client_host}
3339             ? (client => [$self->{client_host}, $self->{client_port}])
3340             : ()
3341             ),
3342             server => [$self->{server_host}, $self->{server_port}],
3343             # Optimized: avoid hash copy when state is empty (common case)
3344 15         77 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
3345             extensions => $self->_get_extensions_for_scope,
3346             # Outbound flow-control introspection (buffered_amount, watermarks,
3347             # on_high_water/on_drain). Stashed on the connection too, so the send
3348             # path can poke _check_watermarks after each write.
3349 15 50       123 'pagi.transport' => ($self->{current_transport_state} = $self->_h1_transport_state),
    50          
3350             };
3351              
3352 15         41 return $scope;
3353             }
3354              
3355             sub _create_sse_receive {
3356 15     15   31 my ($self, $request) = @_;
3357              
3358 15         27 my $content_length = $request->{content_length};
3359 15   66     45 my $has_body = defined($content_length) && $content_length > 0;
3360 15         26 my $body_complete = 0;
3361 15         20 my $bytes_read = 0;
3362              
3363 15         25 weaken(my $weak_self = $self);
3364              
3365             # Helper to create SSE disconnect event with reason
3366             my $sse_disconnect = sub {
3367             return {
3368             type => 'sse.disconnect',
3369 0 0 0 0   0 reason => ($weak_self ? $weak_self->{sse_disconnect_reason} : undef) // 'client_closed',
3370             };
3371 15         45 };
3372              
3373             return sub {
3374 7 50   7   108 return Future->done($sse_disconnect->())
3375             unless $weak_self;
3376             return Future->done($sse_disconnect->())
3377 7 50       17 if $weak_self->{closed};
3378              
3379 7         8 my $future = (async sub {
3380 7 50       13 return $sse_disconnect->()
3381             unless $weak_self;
3382             return $sse_disconnect->()
3383 7 50       47 if $weak_self->{closed};
3384              
3385             # Check queue first
3386 7 50       10 if (@{$weak_self->{receive_queue}}) {
  7         17  
3387 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3388             }
3389              
3390             # Handle request body for POST/PUT SSE requests
3391 7 100 66     21 if ($has_body && !$body_complete) {
3392 1         2 my $remaining = $content_length - $bytes_read;
3393              
3394             # Wait for data if buffer is empty
3395 1   33     6 while (length($weak_self->{buffer}) == 0 && !$weak_self->{closed} && $remaining > 0) {
      33        
3396 0 0       0 if (!$weak_self->{receive_pending}) {
3397 0         0 $weak_self->{receive_pending} = Future->new;
3398             }
3399 0         0 await $weak_self->{receive_pending};
3400 0         0 $weak_self->{receive_pending} = undef;
3401              
3402 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
3403 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3404             }
3405             }
3406              
3407 1 50       2 return $sse_disconnect->() if $weak_self->{closed};
3408              
3409             # Read available data up to remaining
3410             my $to_read = $remaining < length($weak_self->{buffer})
3411             ? $remaining
3412 1 50       3 : length($weak_self->{buffer});
3413              
3414 1         4 my $chunk = substr($weak_self->{buffer}, 0, $to_read, '');
3415 1         1 $bytes_read += length($chunk);
3416              
3417 1 50       3 my $more = ($bytes_read < $content_length) ? 1 : 0;
3418 1 50       7 $body_complete = 1 if !$more;
3419              
3420             return {
3421 1         10 type => 'sse.request',
3422             body => $chunk,
3423             more => $more,
3424             };
3425             }
3426              
3427             # No body or body complete - return empty body if not yet returned
3428 6 100       13 if (!$body_complete) {
3429 3         10 $body_complete = 1;
3430             return {
3431 3         37 type => 'sse.request',
3432             body => '',
3433             more => 0,
3434             };
3435             }
3436              
3437             # Wait for disconnect
3438 3         6 while (1) {
3439 5 100       6 if (@{$weak_self->{receive_queue}}) {
  5         27  
3440 2         3 return shift @{$weak_self->{receive_queue}};
  2         12  
3441             }
3442              
3443             return $sse_disconnect->()
3444 3 50       7 if $weak_self->{closed};
3445              
3446 3 50       10 if (!$weak_self->{receive_pending}) {
3447 3         10 $weak_self->{receive_pending} = Future->new;
3448             }
3449 3         25 await $weak_self->{receive_pending};
3450 2         197 $weak_self->{receive_pending} = undef;
3451             }
3452 7         74 })->();
3453              
3454             # Track this Future
3455 7         338 push @{$weak_self->{receive_futures}}, $future;
  7         20  
3456 7         10 @{$weak_self->{receive_futures}} = grep { !$_->is_ready } @{$weak_self->{receive_futures}};
  7         42  
  7         20  
  7         18  
3457              
3458 7         27 return $future;
3459 15         111 };
3460             }
3461              
3462             sub _format_sse_event {
3463 49     49   197526 my ($event) = @_;
3464 49         94 my $sse_data = '';
3465              
3466 49 100 66     245 if (defined $event->{event} && length $event->{event}) {
3467             die "Invalid SSE event name: contains newline\n"
3468 17 100       74 if $event->{event} =~ /[\r\n]/;
3469 16         49 $sse_data .= "event: $event->{event}\n";
3470             }
3471              
3472 48   50     129 my $data = $event->{data} // '';
3473 48         16099 for my $line (split /\r?\n|\r/, $data, -1) {
3474 63         260 $sse_data .= "data: $line\n";
3475             }
3476              
3477 48 100 66     247 if (defined $event->{id} && length $event->{id}) {
3478             die "Invalid SSE id: contains newline\n"
3479 7 100       28 if $event->{id} =~ /[\r\n]/;
3480 6         78 $sse_data .= "id: $event->{id}\n";
3481             }
3482              
3483 47 100       125 if (defined $event->{retry}) {
3484             die "Invalid SSE retry: must be a non-negative integer\n"
3485 7 100       48 unless $event->{retry} =~ /\A[0-9]+\z/;
3486 5         27 $sse_data .= "retry: $event->{retry}\n";
3487             }
3488              
3489 45         72 $sse_data .= "\n";
3490 45         113 return $sse_data;
3491             }
3492              
3493             sub _format_sse_comment {
3494 5     5   3793 my ($event) = @_;
3495 5   50     15 my $text = $event->{comment} // '';
3496 5         10 my $formatted = '';
3497 5         54 for my $line (split /\r?\n|\r/, $text, -1) {
3498 9 100       25 $line = ":$line" unless $line =~ /^:/;
3499 9         17 $formatted .= "$line\n";
3500             }
3501 5         9 $formatted .= "\n";
3502 5         11 return $formatted;
3503             }
3504              
3505             sub _create_sse_send {
3506 15     15   31 my ($self, $request) = @_;
3507              
3508 15         23 weaken(my $weak_self = $self);
3509              
3510 34     34   947 return async sub {
3511 34         64 my ($event) = @_;
3512 34 50       82 return Future->done unless $weak_self;
3513 34 50       97 return Future->done if $weak_self->{closed};
3514              
3515             # Reset SSE idle timer on send activity
3516 34         133 $weak_self->_reset_sse_idle_timer;
3517              
3518 34   50     83 my $type = $event->{type} // '';
3519              
3520             # Dev-mode event validation (PAGI spec compliance)
3521 34 50       76 if ($weak_self->{validate_events}) {
3522 0         0 require PAGI::Server::EventValidator;
3523 0         0 PAGI::Server::EventValidator::validate_sse_send($event);
3524             }
3525              
3526 34 100       157 if ($type eq 'sse.start') {
    100          
    100          
    50          
    50          
3527 15 50       57 return if $weak_self->{sse_started};
3528 15         31 $weak_self->{sse_started} = 1;
3529 15         29 $weak_self->{response_started} = 1;
3530              
3531 15   100     38 my $status = $event->{status} // 200;
3532 15         27 $weak_self->{response_status} = $status; # Track for access logging
3533 15   100     42 my $headers = $event->{headers} // [];
3534              
3535             # Ensure Content-Type is text/event-stream
3536 15         21 my $has_content_type = 0;
3537 15         43 for my $h (@$headers) {
3538 7 50       22 if (lc($h->[0]) eq 'content-type') {
3539 7         10 $has_content_type = 1;
3540 7         15 last;
3541             }
3542             }
3543              
3544 15         29 my @final_headers = @$headers;
3545 15 100       36 if (!$has_content_type) {
3546 8         22 push @final_headers, ['content-type', 'text/event-stream'];
3547             }
3548              
3549             # Add Cache-Control and Connection headers for SSE
3550 15         47 push @final_headers, ['cache-control', 'no-cache'];
3551 15         38 push @final_headers, ['connection', 'keep-alive'];
3552 15         74 push @final_headers, ['date', $weak_self->{protocol}->format_date];
3553              
3554             # SSE uses chunked encoding implicitly (no Content-Length)
3555             my $response = $weak_self->{protocol}->serialize_response_start(
3556 15         68 $status, \@final_headers, 1 # chunked = 1
3557             );
3558              
3559 15         95 $weak_self->{stream}->write($response);
3560              
3561             # Set protocol-specific keepalive writer (HTTP/1.1 chunked)
3562             $weak_self->{sse_keepalive_writer} = sub {
3563 0         0 my ($text) = @_;
3564 0 0       0 return unless $weak_self;
3565 0 0       0 return if $weak_self->{closed};
3566 0         0 my $len = sprintf("%x", length($text));
3567 0         0 $weak_self->{stream}->write("$len\r\n$text\r\n");
3568 15         3611 };
3569             }
3570             elsif ($type eq 'sse.send') {
3571 16 50       54 return unless $weak_self->{sse_started};
3572              
3573             # --- BACKPRESSURE CHECK ---
3574 16 50       57 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
3575 0         0 await $weak_self->_wait_for_drain;
3576 0 0       0 return Future->done unless $weak_self;
3577 0 0       0 return Future->done if $weak_self->{closed};
3578             }
3579             # --- END BACKPRESSURE CHECK ---
3580              
3581 16         59 my $sse_data = _format_sse_event($event);
3582              
3583             # Send as chunked data
3584 16         77 my $len = sprintf("%x", length($sse_data));
3585 16         99 $weak_self->{stream}->write("$len\r\n$sse_data\r\n");
3586 16         2008 $weak_self->_notify_transport_write;
3587             }
3588             elsif ($type eq 'sse.comment') {
3589 2 50       4 return unless $weak_self->{sse_started};
3590              
3591 2         7 my $comment = _format_sse_comment($event);
3592              
3593 2         35 my $len = sprintf("%x", length($comment));
3594 2         8 $weak_self->{stream}->write("$len\r\n$comment\r\n");
3595             }
3596             elsif ($type eq 'sse.keepalive') {
3597             # SSE keepalive - starts/stops periodic comment timer
3598 0   0     0 my $interval = $event->{interval} // 0;
3599 0         0 my $comment = $event->{comment};
3600              
3601 0 0       0 if ($interval > 0) {
3602 0         0 $weak_self->_start_sse_keepalive($interval, $comment);
3603             }
3604             else {
3605 0         0 $weak_self->_stop_sse_keepalive;
3606             }
3607             }
3608             elsif ($type eq 'http.fullflush') {
3609             # Fullflush extension - force immediate TCP buffer flush
3610             # Per spec: servers that don't advertise the extension must reject
3611 1 50       3 unless (exists $weak_self->{extensions}{fullflush}) {
3612 0         0 warn "PAGI: http.fullflush event rejected - extension not enabled\n";
3613 0         0 die "Extension not enabled: fullflush\n";
3614             }
3615              
3616             # Force flush by ensuring TCP_NODELAY
3617 1         3 my $handle = $weak_self->{stream}->write_handle;
3618 1 50 33     9 if ($handle && $handle->can('setsockopt')) {
3619 1         8 require Socket;
3620 1         4 $handle->setsockopt(Socket::IPPROTO_TCP(), Socket::TCP_NODELAY(), 1);
3621             }
3622             }
3623             else {
3624             # Per PAGI spec: servers must raise exceptions for unrecognized event types
3625 0         0 _unrecognized_event_type($type, 'sse');
3626             }
3627              
3628 34         436 return;
3629 15         171 };
3630             }
3631              
3632             #
3633             # WebSocket Support Methods
3634             #
3635              
3636             # WebSocket handshake magic GUID per RFC 6455
3637 109     109   1743 use constant WS_GUID => '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';
  109         212  
  109         574039  
3638              
3639 22     22   99 async sub _handle_websocket_request {
3640 22         46 my ($self, $request) = @_;
3641              
3642 22         71 $self->_stop_idle_timer; # WebSocket connections are long-lived
3643 22         114 $self->_start_ws_idle_timer; # Start WebSocket-specific idle timer if configured
3644              
3645 22         108 my $scope = $self->_create_websocket_scope($request);
3646 22         72 my $receive = $self->_create_websocket_receive($request);
3647 22         81 my $send = $self->_create_websocket_send($request);
3648              
3649 22         44 eval {
3650 22         98 await $self->{app}->($scope, $receive, $send);
3651             };
3652              
3653 21 100       1944 if (my $error = $@) {
3654             # If handshake not yet done, send HTTP error
3655 2 50       8 if (!$self->{websocket_accepted}) {
3656 2         8 $self->_send_error_response(500, "Internal Server Error");
3657             }
3658 2         65 warn "PAGI application error (WebSocket): $error\n";
3659             }
3660              
3661             # Write access log entry (logs at connection close with total duration)
3662 21         140 $self->_write_access_log;
3663              
3664             # Close connection after WebSocket session ends
3665 21         128 $self->_handle_disconnect_and_close('session_complete');
3666             }
3667              
3668             sub _create_websocket_scope {
3669 22     22   58 my ($self, $request) = @_;
3670              
3671             # Extract WebSocket key and subprotocols from headers
3672 22         72 my $ws_key;
3673             my @subprotocols;
3674              
3675 22         29 for my $header (@{$request->{headers}}) {
  22         55  
3676 123         177 my ($name, $value) = @$header;
3677 123 100       245 if ($name eq 'sec-websocket-key') {
    100          
3678 22         93 $ws_key = $value;
3679             }
3680             elsif ($name eq 'sec-websocket-protocol') {
3681             # Parse comma-separated list of subprotocols
3682 2         7 push @subprotocols, map { s/^\s+|\s+$//gr } split /,/, $value;
  3         21  
3683             }
3684             }
3685              
3686             # Store ws_key for handshake response
3687 22         49 $self->{ws_key} = $ws_key;
3688              
3689             my $scope = {
3690             type => 'websocket',
3691             pagi => {
3692             version => '0.3',
3693             spec_version => '0.3',
3694             },
3695             http_version => $request->{http_version},
3696             scheme => $self->_get_ws_scheme,
3697             path => $request->{path},
3698             raw_path => $request->{raw_path},
3699             query_string => $request->{query_string},
3700             root_path => '',
3701             headers => $request->{headers},
3702             (defined $self->{client_host}
3703             ? (client => [$self->{client_host}, $self->{client_port}])
3704             : ()
3705             ),
3706             server => [$self->{server_host}, $self->{server_port}],
3707             subprotocols => \@subprotocols,
3708             # Optimized: avoid hash copy when state is empty (common case)
3709 22         98 state => keys %{$self->{state}} ? { %{$self->{state}} } : {},
  0         0  
3710 22         75 extensions => { %{$self->_get_extensions_for_scope}, 'websocket.http.response' => {} },
3711             # Outbound flow-control introspection (buffered_amount, watermarks,
3712             # on_high_water/on_drain). Stashed on the connection too, so the send
3713             # path can poke _check_watermarks after each write.
3714 22 50       150 'pagi.transport' => ($self->{current_transport_state} = $self->_h1_transport_state),
    50          
3715             };
3716              
3717 22         86 return $scope;
3718             }
3719              
3720             sub _create_websocket_receive {
3721 22     22   42 my ($self, $request) = @_;
3722              
3723 22         36 my $connect_sent = 0;
3724 22         35 weaken(my $weak_self = $self);
3725              
3726             return sub {
3727 155 50   155   683259 return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
3728             unless $weak_self;
3729              
3730             # Check queue first - drain queued messages even if closed
3731 155 100       216 if (@{$weak_self->{receive_queue}}) {
  155         344  
3732 113         162 return Future->done(shift @{$weak_self->{receive_queue}});
  113         315  
3733             }
3734              
3735             return Future->done({ type => 'websocket.disconnect', code => 1006, reason => '' })
3736 42 50       92 if $weak_self->{closed};
3737              
3738 42         61 my $future = (async sub {
3739 42 50       68 return { type => 'websocket.disconnect', code => 1006, reason => '' }
3740             unless $weak_self;
3741              
3742             # Check queue first - drain queued messages even if closed
3743 42 50       50 if (@{$weak_self->{receive_queue}}) {
  42         79  
3744 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3745             }
3746              
3747             return { type => 'websocket.disconnect', code => 1006, reason => '' }
3748 42 50       82 if $weak_self->{closed};
3749              
3750             # First call returns websocket.connect
3751 42 100       88 if (!$connect_sent) {
3752 19         25 $connect_sent = 1;
3753 19         153 return { type => 'websocket.connect' };
3754             }
3755              
3756             # If not in WebSocket mode yet (waiting for accept), wait
3757 23   33     82 while (!$weak_self->{websocket_mode} && !$weak_self->{closed}) {
3758 0 0       0 if (!$weak_self->{receive_pending}) {
3759 0         0 $weak_self->{receive_pending} = Future->new;
3760             }
3761 0         0 await $weak_self->{receive_pending};
3762 0         0 $weak_self->{receive_pending} = undef;
3763              
3764 0 0       0 if (@{$weak_self->{receive_queue}}) {
  0         0  
3765 0         0 return shift @{$weak_self->{receive_queue}};
  0         0  
3766             }
3767             }
3768              
3769             return { type => 'websocket.disconnect', code => 1006, reason => '' }
3770 23 50       45 if $weak_self->{closed};
3771              
3772             # Wait for events from frame processing
3773 23         25 while (1) {
3774 46 100       75 if (@{$weak_self->{receive_queue}}) {
  46         140  
3775 23         37 return shift @{$weak_self->{receive_queue}};
  23         117  
3776             }
3777              
3778             return { type => 'websocket.disconnect', code => 1006, reason => '' }
3779 23 50       43 if $weak_self->{closed};
3780              
3781 23 50       50 if (!$weak_self->{receive_pending}) {
3782 23         53 $weak_self->{receive_pending} = Future->new;
3783             }
3784 23         145 await $weak_self->{receive_pending};
3785 23         1791 $weak_self->{receive_pending} = undef;
3786             }
3787 42         243 })->();
3788              
3789             # Track this Future
3790 42         1808 push @{$weak_self->{receive_futures}}, $future;
  42         87  
3791 42         72 @{$weak_self->{receive_futures}} = grep { !$_->is_ready } @{$weak_self->{receive_futures}};
  42         198  
  52         139  
  42         81  
3792              
3793 42         134 return $future;
3794 22         96 };
3795             }
3796              
3797             sub _create_websocket_send {
3798 22     22   61 my ($self, $request) = @_;
3799              
3800 22         34 weaken(my $weak_self = $self);
3801              
3802 42     42   1061 return async sub {
3803 42         64 my ($event) = @_;
3804 42 50       82 return Future->done unless $weak_self;
3805 42 50       128 return Future->done if $weak_self->{closed};
3806              
3807             # Reset WebSocket idle timer on send activity
3808 42         124 $weak_self->_reset_ws_idle_timer;
3809              
3810 42   50     95 my $type = $event->{type} // '';
3811              
3812             # Dev-mode event validation (PAGI spec compliance)
3813 42 50       106 if ($weak_self->{validate_events}) {
3814 0         0 require PAGI::Server::EventValidator;
3815 0         0 PAGI::Server::EventValidator::validate_websocket_send($event);
3816             }
3817              
3818 42 100       118 if ($type eq 'websocket.accept') {
    100          
    100          
    100          
    50          
    0          
3819 20 50       51 return if $weak_self->{websocket_accepted};
3820              
3821             # Complete the WebSocket handshake
3822 20         38 my $ws_key = $weak_self->{ws_key};
3823 20         256 my $accept_key = sha1_base64($ws_key . WS_GUID);
3824             # sha1_base64 doesn't add padding, but WebSocket requires it
3825 20         102 $accept_key .= '=' while length($accept_key) % 4;
3826              
3827 20         73 my @headers = (
3828             "HTTP/1.1 101 Switching Protocols\r\n",
3829             "Upgrade: websocket\r\n",
3830             "Connection: Upgrade\r\n",
3831             "Sec-WebSocket-Accept: $accept_key\r\n",
3832             );
3833              
3834             # Add subprotocol if specified (with validation)
3835 20 100       65 if (my $subprotocol = $event->{subprotocol}) {
3836 1         4 $subprotocol = _validate_subprotocol($subprotocol);
3837 0         0 push @headers, "Sec-WebSocket-Protocol: $subprotocol\r\n";
3838             }
3839              
3840             # Add custom headers if specified (with CRLF injection validation)
3841 19 100       54 if (my $extra_headers = $event->{headers}) {
3842 1         2 for my $h (@$extra_headers) {
3843 1         3 my ($name, $value) = @$h;
3844 1         13 $name = _validate_header_name($name);
3845 1         4 $value = _validate_header_value($value);
3846 0         0 push @headers, "$name: $value\r\n";
3847             }
3848             }
3849              
3850 18         43 push @headers, "\r\n";
3851              
3852 18         121 $weak_self->{stream}->write(join('', @headers));
3853              
3854             # Switch to WebSocket mode
3855 18         3841 $weak_self->{websocket_mode} = 1;
3856 18         36 $weak_self->{websocket_accepted} = 1;
3857             $weak_self->{websocket_frame} = Protocol::WebSocket::Frame->new(
3858             max_payload_size => $weak_self->{max_ws_frame_size},
3859 18         166 );
3860 18         551 $weak_self->{response_status} = 101; # Track for access logging
3861              
3862             # Notify any waiting receive
3863 18 50 33     134 if ($weak_self->{receive_pending} && !$weak_self->{receive_pending}->is_ready) {
3864 0         0 my $f = $weak_self->{receive_pending};
3865 0         0 $weak_self->{receive_pending} = undef;
3866 0         0 $f->done;
3867             }
3868              
3869             # Process any data that arrived before accept
3870 18 50       71 if (length($weak_self->{buffer}) > 0) {
3871 0         0 $weak_self->_process_websocket_frames;
3872             }
3873             }
3874             elsif ($type eq 'websocket.send') {
3875 17 50       70 return unless $weak_self->{websocket_mode};
3876              
3877             # --- BACKPRESSURE CHECK ---
3878 17 50       44 if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
3879 0         0 await $weak_self->_wait_for_drain;
3880 0 0       0 return Future->done unless $weak_self;
3881 0 0       0 return Future->done if $weak_self->{closed};
3882             }
3883             # --- END BACKPRESSURE CHECK ---
3884              
3885 17         18 my $frame;
3886 17 100       32 if (defined $event->{text}) {
    50          
3887             $frame = Protocol::WebSocket::Frame->new(
3888             buffer => $event->{text},
3889 15         43 type => 'text',
3890             );
3891             }
3892             elsif (defined $event->{bytes}) {
3893             $frame = Protocol::WebSocket::Frame->new(
3894             buffer => $event->{bytes},
3895 2         9 type => 'binary',
3896             );
3897             }
3898             else {
3899 0         0 return; # Nothing to send
3900             }
3901              
3902 17         645 my $bytes = $frame->to_bytes;
3903 17         691 $weak_self->{stream}->write($bytes);
3904 17         2402 $weak_self->_notify_transport_write;
3905             }
3906             elsif ($type eq 'websocket.http.response.start') {
3907             # Custom handshake denial (websocket.http.response extension). Only
3908             # valid before accept; buffer the response head until the body
3909             # arrives.
3910 1 50       9 return if $weak_self->{websocket_accepted};
3911 1 50       3 return if $weak_self->{ws_denial_started};
3912 1         2 $weak_self->{ws_denial_started} = 1;
3913 1   50     3 $weak_self->{ws_denial_status} = $event->{status} // 403;
3914             $weak_self->{ws_denial_headers} = [
3915 2         6 map { [_validate_header_name($_->[0]), _validate_header_value($_->[1])] }
3916 1   50     1 @{$event->{headers} // []}
  1         3  
3917             ];
3918 1         2 $weak_self->{ws_denial_body} = '';
3919             }
3920             elsif ($type eq 'websocket.http.response.body') {
3921 1 50       3 return unless $weak_self->{ws_denial_started};
3922 1 50       8 return if $weak_self->{response_started};
3923 1   50     4 $weak_self->{ws_denial_body} .= $event->{body} // '';
3924 1 50       2 return if $event->{more}; # more chunks coming — keep buffering
3925              
3926 1         2 my $status = $weak_self->{ws_denial_status};
3927 1         2 my $body = $weak_self->{ws_denial_body};
3928             my @headers = (
3929 1         7 @{$weak_self->{ws_denial_headers}},
3930             ['content-length', length $body],
3931 1         1 ['date', $weak_self->{protocol}->format_date],
3932             );
3933 1         5 my $response = $weak_self->{protocol}->serialize_response_start($status, \@headers, 0);
3934 1         2 $response .= $body;
3935 1         7 $weak_self->{stream}->write($response);
3936 1         231 $weak_self->{response_started} = 1;
3937 1         2 $weak_self->{response_status} = $status; # access log
3938             # Handshake rejected: close like the bare-403 path (no upgrade).
3939 1         4 $weak_self->_handle_disconnect_and_close('client_closed');
3940             }
3941             elsif ($type eq 'websocket.close') {
3942             # If not accepted yet, send 403 Forbidden
3943 3 100       9 if (!$weak_self->{websocket_accepted}) {
3944 1         5 $weak_self->_send_error_response(403, 'Forbidden');
3945 1         6 return;
3946             }
3947              
3948             # Send close frame
3949 2   50     27 my $code = $event->{code} // 1000;
3950 2   50     11 my $reason = $event->{reason} // '';
3951              
3952 2         24 my $frame = Protocol::WebSocket::Frame->new(
3953             type => 'close',
3954             buffer => pack('n', $code) . $reason,
3955             );
3956              
3957 2         77 $weak_self->{stream}->write($frame->to_bytes);
3958 2         276 $weak_self->{close_sent} = 1;
3959              
3960             # If we received a close frame, close immediately
3961             # Otherwise wait for close from client (handled in frame processing)
3962 2 50       9 if ($weak_self->{close_received}) {
3963 0         0 $weak_self->_handle_disconnect_and_close('client_closed');
3964             }
3965             }
3966             elsif ($type eq 'websocket.keepalive') {
3967 0 0       0 return unless $weak_self->{websocket_mode};
3968              
3969 0   0     0 my $interval = $event->{interval} // 0;
3970 0         0 my $timeout = $event->{timeout};
3971              
3972 0 0       0 if ($interval > 0) {
3973 0         0 $weak_self->_start_ws_keepalive($interval, $timeout);
3974             }
3975             else {
3976 0         0 $weak_self->_stop_ws_keepalive;
3977             }
3978             }
3979             else {
3980             # Per PAGI spec: servers must raise exceptions for unrecognized event types
3981 0         0 _unrecognized_event_type($type, 'websocket');
3982             }
3983              
3984 39         245 return;
3985 22         339 };
3986             }
3987              
3988             sub _process_websocket_frames {
3989 43     43   80 my ($self) = @_;
3990              
3991 43 50       96 return unless $self->{websocket_mode};
3992 43 50       92 return if $self->{closed};
3993              
3994             # Reset WebSocket idle timer on receive activity
3995 43         119 $self->_reset_ws_idle_timer;
3996              
3997 43         60 my $frame = $self->{websocket_frame};
3998              
3999             # Append buffer to frame parser
4000 43         177 $frame->append($self->{buffer});
4001 43         423 $self->{buffer} = '';
4002              
4003             # Process all complete frames - use next_bytes to get raw bytes
4004             # Protocol::WebSocket::Frame->next() decodes as UTF-8, which corrupts binary data
4005 43         117 while (defined(my $bytes = $frame->next_bytes)) {
4006 130         9104 my $opcode = $frame->opcode;
4007              
4008             # RFC 6455 Section 5.2: RSV1-3 MUST be 0 unless extension defines meaning
4009             # PAGI doesn't support compression extensions, so RSV must always be 0
4010 130         525 my $rsv = $frame->rsv;
4011 130 50 33     629 if ($rsv && ref($rsv) eq 'ARRAY') {
4012 130 50       206 if (grep { $_ } @$rsv) {
  390         584  
4013 0         0 $self->_send_close_frame(1002, 'RSV bits must be 0');
4014 0         0 $self->_handle_disconnect_and_close('protocol_error');
4015 0         0 return;
4016             }
4017             }
4018              
4019             # RFC 6455 Section 5.2: Opcodes 3-7 and 11-15 (0xB-0xF) are reserved
4020             # Must fail connection with 1002 Protocol Error
4021 130 50 33     445 if (($opcode >= 3 && $opcode <= 7) || ($opcode >= 11 && $opcode <= 15)) {
      33        
      33        
4022 0         0 $self->_send_close_frame(1002, 'Reserved opcode');
4023 0         0 $self->_handle_disconnect_and_close('protocol_error');
4024 0         0 return;
4025             }
4026              
4027             # RFC 6455 Section 5.5: Control frames (close/ping/pong) MUST have
4028             # payload length <= 125 bytes
4029 130 50 33     515 if (($opcode == 8 || $opcode == 9 || $opcode == 10) && length($bytes) > 125) {
      33        
4030 0         0 $self->_send_close_frame(1002, 'Control frame too large');
4031 0         0 $self->_handle_disconnect_and_close('protocol_error');
4032 0         0 return;
4033             }
4034              
4035 130 100       200 if ($opcode == 1) {
    50          
    0          
    0          
    0          
4036             # Text frame - decode as UTF-8
4037 129         155 my $text = eval { Encode::decode('UTF-8', $bytes, Encode::FB_CROAK) };
  129         520  
4038 129 100       3689 unless (defined $text) {
4039             # Invalid UTF-8 - close with 1007 per RFC 6455
4040 2         9 $self->_send_close_frame(1007, 'Invalid UTF-8');
4041 2         10 $self->_handle_disconnect_and_close('protocol_error');
4042 2         72 return;
4043             }
4044             # Check queue limit before adding (DoS protection)
4045 127 100       135 if (@{$self->{receive_queue}} >= $self->{max_receive_queue}) {
  127         247  
4046 2         14 $self->_send_close_frame(1008, 'Message queue overflow');
4047 2         12 $self->_handle_disconnect_and_close('queue_overflow');
4048 2         46 return;
4049             }
4050 125         141 push @{$self->{receive_queue}}, {
  125         532  
4051             type => 'websocket.receive',
4052             text => $text,
4053             };
4054             }
4055             elsif ($opcode == 2) {
4056             # Binary frame - keep as raw bytes
4057             # Check queue limit before adding (DoS protection)
4058 1 50       2 if (@{$self->{receive_queue}} >= $self->{max_receive_queue}) {
  1         4  
4059 0         0 $self->_send_close_frame(1008, 'Message queue overflow');
4060 0         0 $self->_handle_disconnect_and_close('queue_overflow');
4061 0         0 return;
4062             }
4063 1         2 push @{$self->{receive_queue}}, {
  1         5  
4064             type => 'websocket.receive',
4065             bytes => $bytes,
4066             };
4067             }
4068             elsif ($opcode == 8) {
4069             # Close frame
4070 0         0 $self->{close_received} = 1;
4071 0         0 my ($code, $reason) = (1005, '');
4072              
4073             # RFC 6455 Section 5.5.1: Close frame payload is 0 or >=2 bytes
4074             # 1 byte is invalid
4075 0 0       0 if (length($bytes) == 1) {
4076 0         0 $self->_send_close_frame(1002, 'Invalid close frame');
4077 0         0 $self->_handle_disconnect_and_close('protocol_error');
4078 0         0 return;
4079             }
4080              
4081 0 0       0 if (length($bytes) >= 2) {
4082 0         0 $code = unpack('n', substr($bytes, 0, 2));
4083 0   0     0 $reason = substr($bytes, 2) // '';
4084              
4085             # RFC 6455 Section 7.4.1: Validate close code
4086             # Valid codes: 1000-1003, 1007-1011, 3000-4999
4087             # Invalid: 0-999, 1004-1006, 1012-2999, 5000+
4088 0         0 my $valid_code = 0;
4089 0 0 0     0 if ($code == 1000 || $code == 1001 || $code == 1002 || $code == 1003) {
    0 0        
    0 0        
      0        
      0        
4090 0         0 $valid_code = 1;
4091             }
4092             elsif ($code >= 1007 && $code <= 1011) {
4093 0         0 $valid_code = 1;
4094             }
4095             elsif ($code >= 3000 && $code <= 4999) {
4096 0         0 $valid_code = 1;
4097             }
4098 0 0       0 unless ($valid_code) {
4099 0         0 $self->_send_close_frame(1002, 'Invalid close code');
4100 0         0 $self->_handle_disconnect_and_close('protocol_error');
4101 0         0 return;
4102             }
4103              
4104             # RFC 6455: Close reason must be valid UTF-8
4105 0 0       0 if (length($reason) > 0) {
4106 0         0 my $reason_copy = $reason;
4107 0         0 my $decoded = eval { Encode::decode('UTF-8', $reason_copy, Encode::FB_CROAK) };
  0         0  
4108 0 0       0 unless (defined $decoded) {
4109 0         0 $self->_send_close_frame(1007, 'Invalid UTF-8 in close reason');
4110 0         0 $self->_handle_disconnect_and_close('protocol_error');
4111 0         0 return;
4112             }
4113             }
4114             }
4115              
4116             # If we haven't sent close yet, send it now
4117 0 0       0 if (!$self->{close_sent}) {
4118 0         0 my $close_frame = Protocol::WebSocket::Frame->new(
4119             type => 'close',
4120             buffer => pack('n', $code) . $reason,
4121             );
4122 0         0 $self->{stream}->write($close_frame->to_bytes);
4123 0         0 $self->{close_sent} = 1;
4124             }
4125              
4126 0         0 push @{$self->{receive_queue}}, {
  0         0  
4127             type => 'websocket.disconnect',
4128             code => $code,
4129             reason => $reason,
4130             };
4131             }
4132             elsif ($opcode == 9) {
4133             # Ping - respond with pong (transparent to app)
4134 0         0 my $pong = Protocol::WebSocket::Frame->new(
4135             type => 'pong',
4136             buffer => $bytes,
4137             );
4138 0         0 $self->{stream}->write($pong->to_bytes);
4139             }
4140             elsif ($opcode == 10) {
4141             # Pong - cancel any pending timeout (response to our ping)
4142 0         0 $self->_cancel_ws_pong_timeout;
4143             }
4144             }
4145              
4146             # Notify any waiting receive
4147 38 50 66     473 if ($self->{receive_pending} && !$self->{receive_pending}->is_ready && @{$self->{receive_queue}}) {
  10   66     75  
4148 10         18 my $f = $self->{receive_pending};
4149 10         15 $self->{receive_pending} = undef;
4150 10         25 $f->done;
4151             }
4152             }
4153              
4154             # Async file response - prioritizes speed based on file size:
4155             # 1. Small files (<=64KB): direct in-process read (fastest for small files)
4156             # 2. Large files: async chunked reads via worker pool (non-blocking)
4157 10     10   19 async sub _send_file_response {
4158 10         25 my ($self, $file, $offset, $length, $chunked) = @_;
4159              
4160             # Get file size if length not specified
4161 10         363 my $file_size = -s $file;
4162 10 50       29 die "Cannot stat file $file: $!" unless defined $file_size;
4163 10   66     36 $length //= $file_size - $offset;
4164              
4165 10         25 $self->{_response_size} += $length;
4166              
4167 10         19 my $stream = $self->{stream};
4168              
4169 10 100 66     59 if ($self->{sync_file_threshold} > 0 && $length <= $self->{sync_file_threshold}) {
4170             # Small file fast path: read directly in-process
4171             # For files <= 64KB, a simple read() is fast and avoids async overhead
4172 7 50       308 open my $fh, '<:raw', $file or die "Cannot open file $file: $!";
4173 7 100       31 seek($fh, $offset, 0) if $offset;
4174 7         222 my $bytes_read = read($fh, my $data, $length);
4175 7         66 close $fh;
4176              
4177 7 50       15 die "Failed to read file $file: $!" unless defined $bytes_read;
4178              
4179 7 100       13 if ($chunked) {
4180 1         6 my $len = sprintf("%x", length($data));
4181 1         7 $stream->write("$len\r\n$data\r\n");
4182 1         114 $stream->write("0\r\n\r\n");
4183             }
4184             else {
4185 6         25 $stream->write($data);
4186             }
4187             }
4188             else {
4189             # Large file path: async chunked reads via worker pool
4190 3 50       17 my $loop = $self->{server} ? $self->{server}->loop : undef;
4191 3 50       15 die "No event loop available for async file I/O" unless $loop;
4192              
4193             await PAGI::Server::AsyncFile->read_file_chunked(
4194             $loop, $file,
4195             sub {
4196 6     6   21 my ($chunk) = @_;
4197 6 100       22 if ($chunked) {
4198 2         25 my $len = sprintf("%x", length($chunk));
4199 2         20 $stream->write("$len\r\n$chunk\r\n");
4200             }
4201             else {
4202 4         22 $stream->write($chunk);
4203             }
4204 6         996 return; # Sync callback
4205             },
4206 3         51 offset => $offset,
4207             length => $length,
4208             chunk_size => FILE_CHUNK_SIZE,
4209             );
4210              
4211             # Send final chunk terminator if chunked
4212 3 100       267 if ($chunked) {
4213 1         26 $stream->write("0\r\n\r\n");
4214             }
4215             }
4216             }
4217              
4218             # Async filehandle response - uses worker pool for non-blocking reads
4219             # Note: Can't easily use sendfile for arbitrary filehandles (may not have fd,
4220             # may be pipes, may be in-memory). Falls back to chunked reads.
4221 6     6   10 async sub _send_fh_response {
4222 6         18 my ($self, $fh, $offset, $length, $chunked) = @_;
4223              
4224             # Seek to offset if specified
4225 6 100 66     34 if ($offset && $offset > 0) {
4226 1 50       10 seek($fh, $offset, 0) or die "Cannot seek: $!";
4227             }
4228              
4229             # For filehandles, we can't easily use the worker pool (can't pass fh across fork).
4230             # Use blocking reads in small chunks - not ideal but practical.
4231             # TODO: Consider IO::Async::FileStream for better event loop integration.
4232              
4233 6         10 my $remaining = $length; # undef means read to EOF
4234 6         10 my $stream = $self->{stream};
4235              
4236 6         11 while (1) {
4237 12         14 my $to_read = FILE_CHUNK_SIZE;
4238 12 100       21 if (defined $remaining) {
4239 4 50       9 $to_read = $remaining if $remaining < $to_read;
4240 4 100       8 last if $to_read <= 0;
4241             }
4242              
4243 10         170 my $bytes_read = read($fh, my $chunk, $to_read);
4244              
4245 10 50       41 last if !defined $bytes_read; # Error
4246 10 100       20 last if $bytes_read == 0; # EOF
4247              
4248 6         11 $self->{_response_size} += $bytes_read;
4249              
4250 6 100       14 if ($chunked) {
4251 3         14 my $len = sprintf("%x", length($chunk));
4252 3         48 $stream->write("$len\r\n$chunk\r\n");
4253             }
4254             else {
4255 3         8 $stream->write($chunk);
4256             }
4257              
4258 6 100       537 if (defined $remaining) {
4259 2         5 $remaining -= $bytes_read;
4260             }
4261             }
4262              
4263             # Send final chunk if chunked encoding
4264 6 100       27 if ($chunked) {
4265 3         7 $stream->write("0\r\n\r\n");
4266             }
4267             }
4268              
4269             1;
4270              
4271             __END__