File Coverage

blib/lib/PAGI/Request/MultipartStream.pm
Criterion Covered Total %
statement 195 201 97.0
branch 82 94 87.2
condition 33 60 55.0
subroutine 32 34 94.1
pod 2 2 100.0
total 344 391 87.9


line stmt bran cond sub pod time code
1             package PAGI::Request::MultipartStream;
2             $PAGI::Request::MultipartStream::VERSION = '0.002000';
3 4     4   471268 use strict;
  4         6  
  4         130  
4 4     4   14 use warnings;
  4         9  
  4         155  
5              
6 4     4   1275 use Future::AsyncAwait;
  4         9567  
  4         22  
7 4     4   214 use Carp qw(croak);
  4         9  
  4         206  
8 4     4   1237 use HTTP::MultiPartParser;
  4         7380  
  4         9584  
9              
10             =head1 NAME
11              
12             PAGI::Request::MultipartStream - Pull-based streaming multipart/form-data engine
13              
14             =head1 SYNOPSIS
15              
16             use PAGI::Request::MultipartStream;
17             use Future::AsyncAwait;
18              
19             # Usually obtained via $req->multipart_stream, not constructed directly:
20             my $stream = $req->multipart_stream;
21              
22             while (defined(my $part = await $stream->next)) {
23             if ($part->is_file) {
24             await $part->stream_to_file($path);
25             }
26             else {
27             my $value = await $part->value; # raw bytes; you decode
28             }
29             }
30              
31             =head1 DESCRIPTION
32              
33             A pull-based streaming parser for C request bodies. Each
34             part of the body is exposed in turn as a L via C,
35             and B: you choose its sink (a
36             file, an object store, an async transform) per part, rather than accepting the
37             buffered, spool-each-upload-to-a-temp-file behaviour of C and
38             C in L.
39              
40             Because you own the sink, it can be fully asynchronous: C awaits a
41             sink that returns a Future, so a slow downstream naturally backpressures the
42             read. This is what the buffered multipart path cannot offer -- its spool to a
43             temp file is blocking.
44              
45             Internally this drives L on demand, bridging its
46             push-based callbacks onto an internal event queue that C and the part
47             methods consume.
48              
49             B An HTTP request body can
50             only be consumed once. Once you create a multipart stream you cannot also call
51             C/C/C/C/C, and a stream cannot be
52             created if the body was already read; see L.
53              
54             =cut
55              
56             our $MAX_FILES = 1000;
57             our $MAX_FIELDS = 1000;
58             our $MAX_FIELD_SIZE = 1024 * 1024; # buffered per-field cap
59             our $MAX_FILE_SIZE = 100 * 1024 * 1024;
60             our $MAX_REQUEST_BODY = 1024 * 1024 * 1024; # defense-in-depth; server max_body_size is primary
61              
62             =head1 CONSTRUCTOR
63              
64             =head2 new
65              
66             my $stream = PAGI::Request::MultipartStream->new(
67             receive => $receive, # required: PAGI receive callback
68             boundary => $boundary, # required: multipart boundary
69             max_files => 1000, # optional limits (defaults shown)
70             max_fields => 1000,
71             max_field_size => 1024 * 1024,
72             max_file_size => 100 * 1024 * 1024,
73             max_request_body => 1024 * 1024 * 1024,
74             );
75              
76             Creates a new streaming multipart engine. Most applications do not call this
77             directly -- they obtain a ready-built stream from
78             L, which extracts the boundary from the
79             request's C and passes through the same limit options.
80              
81             C and C are required. The remaining options cap the body to
82             bound memory and resource use:
83              
84             =over 4
85              
86             =item * C - Maximum number of file parts. Default: 1000.
87              
88             =item * C - Maximum number of non-file (field) parts. Default: 1000.
89              
90             =item * C - Maximum size, in bytes, of any single field part.
91             Default: 1 MiB (1024 * 1024).
92              
93             =item * C - Maximum size, in bytes, of any single file part.
94             Default: 100 MiB (100 * 1024 * 1024).
95              
96             =item * C - Maximum total bytes read from the request body.
97             Default: 1 GiB (1024 * 1024 * 1024). This is a per-stream defence-in-depth
98             cap; the PAGI server's C is the primary aggregate limit on the
99             request body.
100              
101             =back
102              
103             =cut
104              
105             sub new {
106 21     21 1 554971 my ($class, %args) = @_;
107 21 50       85 croak "receive is required" unless $args{receive};
108 21 100 66     307 croak "boundary is required" unless defined $args{boundary} && length $args{boundary};
109             my $self = bless {
110             receive => $args{receive},
111             boundary => $args{boundary},
112             max_files => $args{max_files} // $MAX_FILES,
113             max_fields => $args{max_fields} // $MAX_FIELDS,
114             max_field_size => $args{max_field_size} // $MAX_FIELD_SIZE,
115             max_file_size => $args{max_file_size} // $MAX_FILE_SIZE,
116 20   33     419 max_request_body => $args{max_request_body} // $MAX_REQUEST_BODY,
      66        
      66        
      66        
      66        
117             _queue => [], # FIFO: ['part',\%meta] | ['body',$chunk]
118             _file_count => 0,
119             _field_count => 0,
120             _bytes_total => 0,
121             _cur_is_file => 0,
122             _cur_bytes => 0,
123             _cur_name => undef,
124             _current => undef, # current Part
125             _exhausted => 0,
126             _parser_finished => 0, # guard: finish() is called at most once
127             _failed => undef, # sticky failure message (poisons the stream)
128             }, $class;
129 20         59 $self->{_parser} = $self->_build_parser;
130 20         1510 return $self;
131             }
132              
133             # Parse the on_header arrayref of header lines into
134             # {name,filename,content_type,encoding,headers}. is_file := defined(filename).
135             sub _disposition {
136 24     24   30 my ($lines) = @_;
137              
138             # $lines is an arrayref of raw header lines, e.g.
139             # 'Content-Disposition: form-data; name="x"'
140 24         25 my %headers;
141 24         36 for my $line (@$lines) {
142 35 50       113 if ($line =~ /^([^:]+):\s*(.*)$/) {
143 35         122 $headers{lc($1)} = $2;
144             }
145             }
146              
147 24         40 my $disposition = _parse_content_disposition(\%headers);
148              
149             return {
150             name => $disposition->{name},
151             filename => $disposition->{filename},
152             content_type => $headers{'content-type'} // 'text/plain',
153 24   100     151 encoding => $headers{'content-transfer-encoding'},
154             headers => \%headers,
155             };
156             }
157              
158             sub _parse_content_disposition {
159 24     24   29 my ($headers) = @_;
160 24   50     122 my $cd = $headers->{'content-disposition'} // '';
161              
162 24         25 my %result;
163              
164             # Parse name="value" pairs
165 24         109 while ($cd =~ /(\w+)="([^"]*)"/g) {
166 37         130 $result{$1} = $2;
167             }
168             # Also handle unquoted values
169 24         99 while ($cd =~ /(\w+)=([^;\s"]+)/g) {
170 0   0     0 $result{$1} //= $2;
171             }
172              
173 24         48 return \%result;
174             }
175              
176             sub _build_parser {
177 20     20   33 my ($self) = @_;
178             return HTTP::MultiPartParser->new(
179             boundary => $self->{boundary},
180             on_header => sub {
181 26     26   791 my ($headers) = @_;
182 26 100       50 return if $self->{_failed};
183 24         41 my $meta = _disposition($headers);
184 24 100       71 my $is_file = defined $meta->{filename} ? 1 : 0;
185 24 100       34 if ($is_file) {
186             $self->{_failed} = "Too many file parts (max $self->{max_files})"
187 13 50       35 if ++$self->{_file_count} > $self->{max_files};
188             } else {
189             $self->{_failed} = "Too many field parts (max $self->{max_fields})"
190 11 100       24 if ++$self->{_field_count} > $self->{max_fields};
191             }
192 24 100       35 return if $self->{_failed};
193 23         34 $self->{_cur_is_file} = $is_file;
194 23         25 $self->{_cur_bytes} = 0;
195 23         33 $self->{_cur_name} = $meta->{name};
196 23         26 push @{$self->{_queue}}, ['part', $meta];
  23         99  
197             },
198             on_body => sub {
199 24     24   274 my ($chunk, $final) = @_;
200 24 100       41 return if $self->{_failed};
201 21         30 $self->{_cur_bytes} += length $chunk;
202 21 100       39 my $max = $self->{_cur_is_file} ? $self->{max_file_size} : $self->{max_field_size};
203 21 100       37 if ($self->{_cur_bytes} > $max) {
204             $self->{_failed} = sprintf("%s part '%s' too large (max %d bytes)",
205 3 100 50     23 ($self->{_cur_is_file} ? 'File' : 'Field'), ($self->{_cur_name} // ''), $max);
206 3         7 return; # stop enqueuing -- bounds the queue
207             }
208 18         19 push @{$self->{_queue}}, ['body', $chunk];
  18         47  
209             },
210 2   33 2   44 on_error => sub { $self->{_failed} //= "Multipart parse error: $_[0]" },
211 20         202 );
212             }
213              
214             # Feed exactly one network message. Returns true if it fed data, false at exhaustion.
215 32     32   36 async sub _pump {
216 32         40 my ($self) = @_;
217 32 100       64 return 0 if $self->{_exhausted};
218 25         48 my $msg = await $self->{receive}->();
219 25 100 33     1117 if (!$msg || !$msg->{type} || $msg->{type} eq 'http.disconnect') {
      66        
220 4         7 $self->{_exhausted} = 1;
221 4 100       26 $self->_finish_parser if $self->{_bytes_total} > 0; # 0 bytes => empty stream, clean EOF
222 4         18 return 0;
223             }
224 21 100 66     65 if (defined $msg->{body} && length $msg->{body}) {
225 20         33 $self->{_bytes_total} += length $msg->{body};
226 20 100       41 if ($self->{_bytes_total} > $self->{max_request_body}) {
227 1         3 $self->{_failed} = "Request body exceeded max_request_body ($self->{max_request_body} bytes)";
228 1         3 $self->{_exhausted} = 1;
229 1         3 return 0;
230             }
231 19         49 $self->{_parser}->parse($msg->{body}); # fires callbacks (enqueue + bookkeep)
232             }
233 20 100       306 unless ($msg->{more}) { $self->{_exhausted} = 1; $self->_finish_parser if $self->{_bytes_total} > 0; }
  13 100       25  
  13         37  
234 20         132 return 1;
235             }
236              
237             # Finalize the parser once bytes have been fed and the stream has ended.
238             # HTTP::MultiPartParser->finish on a complete stream (closing boundary already
239             # parsed) is a clean no-op; called mid-part it signals truncation. The parser
240             # routes that end-of-stream condition through on_error (which records into the
241             # sticky _failed) rather than dying, so the eval guard is defence-in-depth in
242             # case finish ever throws. When finish is what introduces the failure we reword
243             # it to a clear "incomplete upload" message; a pre-existing failure (e.g. a
244             # size-limit hit) is preserved untouched.
245             sub _finish_parser {
246 15     15   49 my ($self) = @_;
247 15 50       28 return if $self->{_parser_finished};
248 15         17 $self->{_parser_finished} = 1;
249 15         30 my $had_failure = defined $self->{_failed};
250 15 50 0     21 eval { $self->{_parser}->finish; 1 } or $self->{_failed} //= "Multipart parse error (finish): $@";
  15         43  
  15         169  
251 15 100 100     54 if (!$had_failure && defined $self->{_failed}) { # finish introduced it => truncation
252 2         36 $self->{_failed} = "Incomplete multipart upload: client disconnected or stream ended before the closing boundary";
253             }
254 15         24 return;
255             }
256              
257             =head1 METHODS
258              
259             =head2 next
260              
261             my $part = await $stream->next;
262              
263             Returns a Future resolving to the next L, or C when
264             the stream is exhausted (end of body).
265              
266             Advancing past a part whose body you have not fully consumed auto-drains the
267             remainder of that part first, so you can always loop on C without
268             reading every part. To discard a part deliberately (and signal that intent),
269             call C<< $part->skip >>.
270              
271             Croaks if a size or count limit is breached, or if the upload is truncated
272             (see L).
273              
274             =cut
275              
276 27     27 1 805 async sub next {
277 27         51 my ($self) = @_;
278 27 100       159 croak $self->{_failed} if $self->{_failed};
279 26 100 100     106 if ($self->{_current} && !$self->{_current}{_done}) { await $self->{_current}->skip; } # auto-drain
  7         16  
280 26         160 while (1) {
281 44 100       925 croak $self->{_failed} if $self->{_failed};
282 41   66     43 shift @{$self->{_queue}} while @{$self->{_queue}} && $self->{_queue}[0][0] eq 'body'; # defensive
  41         118  
  0         0  
283 41 100 66     44 if (@{$self->{_queue}} && $self->{_queue}[0][0] eq 'part') {
  41         86  
284 18         19 my (undef, $meta) = @{ shift @{$self->{_queue}} };
  18         20  
  18         33  
285 18         57 $self->{_current} = PAGI::Request::Part->new(stream => $self, meta => $meta);
286 18         73 return $self->{_current};
287             }
288 23 100       47 last unless await $self->_pump;
289             }
290 5 100       202 croak $self->{_failed} if $self->{_failed}; # truncation surfaces via _failed (set by finish)
291 4         9 return undef;
292             }
293              
294             # Next body chunk for the current part: the chunk, or undef when the part ends.
295 28     28   225 async sub _next_chunk {
296 28         35 my ($self) = @_;
297 28         29 while (1) {
298 30 100       222 croak $self->{_failed} if $self->{_failed};
299 29 100       42 if (@{$self->{_queue}}) {
  29         48  
300 20         31 my $kind = $self->{_queue}[0][0];
301 20 100       54 if ($kind eq 'body') { my $ev = shift @{$self->{_queue}}; return $ev->[1]; }
  13         14  
  13         21  
  13         70  
302 7 50       23 return undef if $kind eq 'part'; # next part began -> current done
303             }
304 9 100       23 if (!(await $self->_pump)) {
305 7 100       444 croak $self->{_failed} if $self->{_failed}; # truncation surfaces via _failed (set by finish)
306 5         14 return undef; # clean EOF (complete body, then disconnect)
307             }
308             }
309             }
310              
311             package PAGI::Request::Part;
312 4     4   30 use strict;
  4         6  
  4         78  
313 4     4   12 use warnings;
  4         11  
  4         193  
314              
315 4     4   18 use Future::AsyncAwait;
  4         6  
  4         18  
316 4     4   181 use Carp qw(croak);
  4         5  
  4         224  
317 4     4   19 use Fcntl qw(O_WRONLY O_CREAT O_EXCL O_NOFOLLOW);
  4         5  
  4         4887  
318              
319             =head1 NAME
320              
321             PAGI::Request::Part - A single part of a streaming multipart request
322              
323             =head1 DESCRIPTION
324              
325             A value object representing one part yielded by
326             L. It carries the part's metadata (name,
327             filename, headers) and provides the methods that consume the part's body: pull
328             it chunk by chunk, buffer it whole, or drain it to a sink of your choosing.
329              
330             A part's body must be consumed before the next part is fetched. Calling
331             C<< $stream->next >> while a part is only partially read drains the rest of
332             the current part automatically.
333              
334             =head1 CONSTRUCTOR
335              
336             =head2 new
337              
338             my $part = PAGI::Request::Part->new(stream => $stream, meta => \%meta);
339              
340             Constructs a part bound to its owning stream. Parts are normally created by
341             L, not by application code.
342              
343             =head1 METHODS
344              
345             =head2 name
346              
347             my $name = $part->name;
348              
349             The part's form field name, taken from its C header.
350              
351             =head2 filename
352              
353             my $filename = $part->filename;
354              
355             The part's filename from C, or C for non-file
356             (field) parts.
357              
358             =head2 content_type
359              
360             my $type = $part->content_type;
361              
362             The part's C header. Defaults to C if the part sent
363             no C.
364              
365             =head2 encoding
366              
367             my $encoding = $part->encoding;
368              
369             The part's C header, or C if not present.
370              
371             =head2 headers
372              
373             my $headers = $part->headers;
374              
375             A hashref of all the part's headers, keyed by lower-cased header name.
376              
377             =head2 is_file
378              
379             if ($part->is_file) { ... }
380              
381             True if the part has a filename (i.e. is a file upload), false otherwise.
382              
383             =head2 type
384              
385             my $type = $part->type; # 'file' or 'field'
386              
387             Returns the string C<'file'> for file parts and C<'field'> for non-file parts.
388              
389             =head2 next_chunk
390              
391             my $chunk = await $part->next_chunk;
392              
393             Returns this part's next body chunk as raw bytes, or C once the part's
394             body is exhausted. This is the low-level primitive; C, C,
395             and C are built on it.
396              
397             =head2 value
398              
399             my $bytes = await $part->value;
400              
401             Buffers and returns the part's entire body as B -- no decoding is
402             applied, so a text field encoded as UTF-8 (or any other charset) must be
403             decoded by the caller. Intended for small field parts; the buffered size is
404             bounded by the relevant per-part size limit (C for fields,
405             C for files), which croaks if exceeded.
406              
407             =head2 stream_to
408              
409             my $count = await $part->stream_to($cb);
410             my $count = await $part->stream_to(async sub ($chunk) { await $sink->write($chunk) });
411              
412             Drains the rest of the part to a sink callback, returning the number of bytes
413             processed. The callback is invoked with each chunk of raw bytes and may be
414             asynchronous: if it returns a Future, C awaits it before reading
415             the next chunk, giving the sink natural backpressure over the network read.
416              
417             If the sink callback throws, the error poisons the stream -- a later
418             C<< $stream->next >> will croak -- and the failed part is B auto-drained,
419             since the application has signalled it is aborting. The exception is re-thrown
420             to the caller.
421              
422             =head2 stream_to_file
423              
424             my $count = await $part->stream_to_file($path);
425              
426             Writes the part's body to a B file at C<$path>, returning the number of
427             bytes written. The file is opened with C: the call
428             B, croaking
429             instead. The result of C is checked. On any error -- a write failure, a
430             limit breach, a truncated upload, or a failed C -- the partially
431             written file is unlinked before the method croaks.
432              
433             =head2 skip
434              
435             await $part->skip;
436              
437             Drains and discards any remaining body of this part. Use this to deliberately
438             ignore a part; C<< $stream->next >> would otherwise drain it for you anyway.
439              
440             =head1 LIMITS AND ERRORS
441              
442             Size and count limits are enforced as bytes arrive from the network, before
443             your sink ever sees them, so an oversized part cannot stream partway into your
444             sink before being rejected. A per-part size overflow names the offending part
445             in its error message (for example, C<< File part 'avatar' too large >>).
446             Exceeding C, C, or C likewise causes
447             the next C/consume call to croak.
448              
449             A client disconnect that truncates a part mid-stream croaks with an
450             C<"Incomplete multipart upload"> message (the closing boundary was never
451             seen). By contrast, a complete body -- or an entirely empty one -- followed by
452             a disconnect ends cleanly, with C simply returning C.
453              
454             =head1 SEE ALSO
455              
456             L, L
457              
458             =cut
459              
460 18     18   54 sub new { my ($c,%a)=@_; bless { stream=>$a{stream}, meta=>$a{meta}, _done=>0 }, $c }
  18         78  
461 1     1   4 sub name { $_[0]{meta}{name} }
462 3     3   46 sub filename { $_[0]{meta}{filename} }
463 1     1   3 sub content_type { $_[0]{meta}{content_type} }
464 0     0   0 sub encoding { $_[0]{meta}{encoding} }
465 0     0   0 sub headers { $_[0]{meta}{headers} }
466 4 100   4   87 sub is_file { defined $_[0]{meta}{filename} ? 1 : 0 }
467 2 100   2   47 sub type { $_[0]->is_file ? 'file' : 'field' }
468 7     7   8 async sub skip { my $s=shift; 1 while defined(await $s->{stream}->_next_chunk); $s->{_done}=1; return; }
  7         10  
  7         16  
  7         184  
  7         15  
469              
470 14     14   29 async sub next_chunk {
471 14         19 my ($self) = @_;
472 14 50       31 return undef if $self->{_done};
473 14         34 my $chunk = await $self->{stream}->_next_chunk;
474 11 100       268 $self->{_done} = 1 unless defined $chunk;
475 11         26 return $chunk;
476             }
477              
478 3     3   49 async sub value { # buffer the whole part (small fields). RAW BYTES.
479 3         5 my ($self) = @_;
480 3         6 my $buf = '';
481 3         7 while (defined(my $c = await $self->next_chunk)) { $buf .= $c }
  3         141  
482 3         62 return $buf;
483             }
484              
485 2     2   82 async sub stream_to { # drain to a (possibly async) sink callback
486 2         5 my ($self, $cb) = @_;
487 2 50       6 croak "callback is required" unless $cb;
488 2         3 my $n = 0;
489 2         3 my $ok = eval {
490 2         5 while (defined(my $c = await $self->next_chunk)) {
491 2         43 my $r = $cb->($c);
492 1 50 33     25 await $r if ref $r && $r->can('get'); # allow an async sink (returns a Future)
493 1         12 $n += length $c;
494             }
495 1         22 1;
496             };
497 2 100       10 if (!$ok) {
498 1         1 my $err = $@;
499             # poison the stream so a later ->next croaks; do NOT auto-drain (the app aborted)
500 1   33     8 $self->{stream}{_failed} //= "sink error: $err";
501 1         4 die $err;
502             }
503 1         3 return $n;
504             }
505              
506 4     4   305 async sub stream_to_file {
507 4         9 my ($self, $path) = @_;
508 4 50       43 croak "path is required" unless defined $path;
509 4 100       887 sysopen(my $fh, $path, O_WRONLY|O_CREAT|O_EXCL|O_NOFOLLOW, 0600)
510             or croak "Cannot create $path: $!";
511 3         14 binmode $fh;
512 3         6 my $written = 0;
513 3         5 my $ok = eval {
514 3         12 while (defined(my $c = await $self->next_chunk)) {
515 1 50       38 print $fh $c or die "write to $path failed: $!\n";
516 1         4 $written += length $c;
517             }
518 1         21 1;
519             };
520 3         193 my $err = $@;
521 3         75 my $close_ok = close $fh;
522 3 100       12 if (!$ok) { unlink $path; croak $err; } # write/limit/disconnect error wins
  2         191  
  2         238  
523 1 50       3 unless ($close_ok) { unlink $path; croak "Cannot close $path: $!"; }
  0         0  
  0         0  
524 1         7 return $written;
525             }
526              
527             1;