File Coverage

blib/lib/Net/Async/Webservice/S3.pm
Criterion Covered Total %
statement 370 385 96.1
branch 111 130 85.3
condition 69 109 63.3
subroutine 65 66 98.4
pod 7 7 100.0
total 622 697 89.2


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2013-2021 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::Webservice::S3;
7              
8 10     10   1644818 use strict;
  10         118  
  10         322  
9 10     10   60 use warnings;
  10         20  
  10         315  
10 10     10   57 use base qw( IO::Async::Notifier );
  10         19  
  10         6285  
11 10     10   51090 use 5.010; # //
  10         43  
12              
13             our $VERSION = '0.19';
14              
15 10     10   60 use Carp;
  10         25  
  10         618  
16              
17 10     10   5162 use Digest::HMAC_SHA1;
  10         54425  
  10         532  
18 10     10   83 use Digest::MD5 qw( md5 );
  10         20  
  10         482  
19 10     10   62 use Future 0.26; # ->done, ->fail constructors
  10         212  
  10         288  
20 10     10   5542 use Future::Utils 0.16 qw( repeat try_repeat fmap1 );
  10         23867  
  10         814  
21 10     10   5385 use HTTP::Date qw( time2str );
  10         49110  
  10         727  
22 10     10   595 use HTTP::Request;
  10         17938  
  10         242  
23 10     10   5328 use IO::Async::Timer::Countdown;
  10         18701  
  10         335  
24 10     10   75 use List::Util qw( sum );
  10         19  
  10         695  
25 10     10   5162 use MIME::Base64 qw( encode_base64 );
  10         7130  
  10         671  
26 10     10   82 use Scalar::Util qw( blessed );
  10         21  
  10         484  
27 10     10   76 use URI::Escape qw( uri_escape_utf8 );
  10         25  
  10         521  
28 10     10   7427 use XML::LibXML;
  10         645539  
  10         77  
29 10     10   1791 use XML::LibXML::XPathContext;
  10         28  
  10         393  
30              
31             my $libxml = XML::LibXML->new;
32              
33 10     10   62 use constant DEFAULT_S3_HOST => "s3.amazonaws.com";
  10         24  
  10         55448  
34              
35             =head1 NAME
36              
37             C - use Amazon's S3 web service with C
38              
39             =head1 SYNOPSIS
40              
41             use IO::Async::Loop;
42             use Net::Async::Webservice::S3;
43              
44             my $loop = IO::Async::Loop->new;
45              
46             my $s3 = Net::Async::Webservice::S3->new(
47             access_key => ...,
48             secret_key => ...,
49             bucket => "my-bucket-here",
50             );
51             $loop->add( $s3 );
52              
53             my $put_f = $s3->put_object(
54             key => "the-key",
55             value => "A new value for the key\n";
56             );
57              
58             my $get_f = $s3->get_object(
59             key => "another-key",
60             );
61              
62             $loop->await_all( $put_f, $get_f );
63              
64             print "The value is:\n", $get_f->get;
65              
66             =head1 DESCRIPTION
67              
68             This module provides a webservice API around Amazon's S3 web service for use
69             in an L-based program. Each S3 operation is represented by a method
70             that returns a L; this future, if successful, will eventually return
71             the result of the operation.
72              
73             =cut
74              
75             sub _init
76             {
77 9     9   56759 my $self = shift;
78 9         34 my ( $args ) = @_;
79              
80 9   33     67 $args->{http} ||= do {
81 0         0 require Net::Async::HTTP;
82 0         0 Net::Async::HTTP->VERSION( '0.33' ); # 'timeout' and 'stall_timeout' failures
83 0         0 my $http = Net::Async::HTTP->new;
84 0         0 $self->add_child( $http );
85 0         0 $http;
86             };
87              
88 9   100     44 $args->{max_retries} //= 3;
89 9   50     75 $args->{list_max_keys} //= 1000;
90 9   50     55 $args->{read_size} //= 64*1024; # 64 KiB
91              
92             # S3 docs suggest > 100MB should use multipart. They don't actually
93             # document what size of parts to use, but we'll use that again.
94 9   50     49 $args->{part_size} //= 100*1024*1024;
95              
96 9   50     71 $args->{host} //= DEFAULT_S3_HOST;
97              
98 9         73 return $self->SUPER::_init( @_ );
99             }
100              
101             =head1 PARAMETERS
102              
103             The following named parameters may be passed to C or C:
104              
105             =head2 http => Net::Async::HTTP
106              
107             Optional. Allows the caller to provide a specific asynchronous HTTP user agent
108             object to use. This will be invoked with a single method, as documented by
109             L:
110              
111             $response_f = $http->do_request( request => $request, ... )
112              
113             If absent, a new instance will be created and added as a child notifier of
114             this object. If a value is supplied, it will be used as supplied and I
115             specifically added as a child notifier. In this case, the caller must ensure
116             it gets added to the underlying L instance, if required.
117              
118             =head2 access_key => STRING
119              
120             =head2 secret_key => STRING
121              
122             The twenty-character Access Key ID and forty-character Secret Key to use for
123             authenticating requests to S3.
124              
125             =head2 ssl => BOOL
126              
127             Optional. If given a true value, will use C URLs over SSL.
128              
129             This setting defaults on, but can be disabled by passing a defined-but-false
130             value (such as C<0>).
131              
132             =head2 bucket => STRING
133              
134             Optional. If supplied, gives the default bucket name to use, at which point it
135             is optional to supply to the remaining methods.
136              
137             =head2 prefix => STRING
138              
139             Optional. If supplied, this prefix string is prepended to any key names passed
140             in methods, and stripped from the response from C. It can be used
141             to keep operations of the object contained within the named key space. If this
142             string is supplied, don't forget that it should end with the path delimiter in
143             use by the key naming scheme (for example C).
144              
145             =head2 host => STRING
146              
147             Optional. Sets the hostname to talk to the S3 service. Usually the default of
148             C is sufficient. This setting allows for communication with
149             other service providers who provide the same API as S3.
150              
151             =head2 max_retries => INT
152              
153             Optional. Maximum number of times to retry a failed operation. Defaults to 3.
154              
155             =head2 list_max_keys => INT
156              
157             Optional. Maximum number of keys at a time to request from S3 for the
158             C method. Larger values may be more efficient as fewer roundtrips
159             will be required per method call. Defaults to 1000.
160              
161             =head2 part_size => INT
162              
163             Optional. Size in bytes to break content for using multipart upload. If an
164             object key's size is no larger than this value, multipart upload will not be
165             used. Defaults to 100 MiB.
166              
167             =head2 read_size => INT
168              
169             Optional. Size in bytes to read per call to the C<$gen_value> content
170             generation function in C. Defaults to 64 KiB. Be aware that too
171             large a value may lead to the C stall timer failing to be invoked on slow
172             enough connections, causing spurious timeouts.
173              
174             =head2 timeout => NUM
175              
176             Optional. If configured, this is passed into individual requests of the
177             underlying C object, except for the actual content C or
178             C operations. It is therefore used by C, C,
179             and the multi-part metadata operations used by C. To apply an
180             overall timeout to an individual C or C operation,
181             pass a specific C argument to those methods specifically.
182              
183             =head2 stall_timeout => NUM
184              
185             Optional. If configured, this is passed into the underlying
186             C object and used for all content uploads and downloads.
187              
188             =head2 put_concurrent => INT
189              
190             Optional. If configured, gives a default value for the C parameter
191             to C.
192              
193             =cut
194              
195             sub configure
196             {
197 19     19 1 8773 my $self = shift;
198 19         100 my %args = @_;
199              
200 19         103 foreach (qw( http access_key secret_key ssl bucket prefix host max_retries
201             list_max_keys part_size read_size timeout stall_timeout
202             put_concurrent )) {
203 266 100       732 exists $args{$_} and $self->{$_} = delete $args{$_};
204             }
205              
206 19         137 $self->SUPER::configure( %args );
207             }
208              
209             =head1 METHODS
210              
211             The following methods all support the following common arguments:
212              
213             =over 8
214              
215             =item timeout => NUM
216              
217             =item stall_timeout => NUM
218              
219             Optional. Passed directly to the underlying C<< Net::Async::HTTP->request >>
220             method.
221              
222             =back
223              
224             The following methods documented with a trailing call to C<< ->get >> return
225             L instances.
226              
227             =cut
228              
229             sub _make_request
230             {
231 84     84   213 my $self = shift;
232 84         498 my %args = @_;
233              
234 84         246 my $method = $args{method};
235 84 50       320 defined $args{content} or croak "Missing 'content'";
236              
237 84         179 my @params;
238 84         167 foreach my $key ( sort keys %{ $args{query_params} } ) {
  84         576  
239 101 100       3946 next unless defined( my $value = $args{query_params}->{$key} );
240 89         254 $key =~ s/_/-/g;
241 89         447 push @params, $key . "=" . uri_escape_utf8( $value, "^A-Za-z0-9_-" );
242             }
243              
244 84   66     1965 my $bucket = $args{bucket} // $self->{bucket};
245 84   66     480 my $path = $args{abs_path} // join "", grep { defined } $self->{prefix}, $args{path};
  142         536  
246              
247 84 50 50     513 my $scheme = ( $self->{ssl} // 1 ) ? "https" : "http";
248              
249 84         154 my $uri;
250 84 50 33     712 if( length $bucket <= 63 and $bucket =~ m{^[A-Z0-9][A-Z0-9.-]+$}i ) {
251 84         461 $uri = "$scheme://$bucket.$self->{host}/$path";
252             }
253             else {
254 0         0 $uri = "$scheme://$self->{host}/$bucket/$path";
255             }
256 84 100       349 $uri .= "?" . join( "&", @params ) if @params;
257              
258 84         229 my $s3 = $self->{s3};
259              
260 84   100     454 my $meta = $args{meta} || {};
261              
262             my @headers = (
263             Date => time2str( time ),
264 84 100       2701 %{ $args{headers} || {} },
265 84         587 ( map { +"X-Amz-Meta-$_" => $meta->{$_} } sort keys %$meta ),
  3         18  
266             );
267              
268 84         811 my $request = HTTP::Request->new( $method, $uri, \@headers, $args{content} );
269 84         99035 $request->content_length( length $args{content} );
270              
271 84         4789 $self->_gen_auth_header( $request, $bucket, $path );
272              
273 84         5151 return $request;
274             }
275              
276             sub _gen_auth_header
277             {
278 84     84   196 my $self = shift;
279 84         280 my ( $request, $bucket, $path ) = @_;
280              
281             # See also
282             # http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
283              
284 84         344 my $canon_resource = "/$bucket/$path";
285 84 100 66     297 if( defined $request->uri->query and length $request->uri->query ) {
286 51         2510 my %params = $request->uri->query_form;
287 51         3920 my %params_to_sign;
288 51         153 foreach (qw( partNumber uploadId )) {
289 102 100       364 $params_to_sign{$_} = $params{$_} if exists $params{$_};
290             }
291              
292 51         110 my @params_to_sign;
293 51         209 foreach ( sort keys %params_to_sign ) {
294 49 50       222 push @params_to_sign, defined $params{$_} ? "$_=$params{$_}" : $_;
295             }
296              
297 51 100       269 $canon_resource .= "?" . join( "&", @params_to_sign ) if @params_to_sign;
298             }
299              
300 84         1161 my %x_amz_headers;
301             $request->scan( sub {
302 197 100   197   5136 $x_amz_headers{lc $_[0]} = $_[1] if $_[0] =~ m/^X-Amz-/i;
303 84         792 });
304              
305 84         643 my $x_amz_headers = "";
306 84         316 $x_amz_headers .= "$_:$x_amz_headers{$_}\n" for sort keys %x_amz_headers;
307              
308 84   100     314 my $buffer = join( "\n",
      100        
      50        
309             $request->method,
310             $request->header( "Content-MD5" ) // "",
311             $request->header( "Content-Type" ) // "",
312             $request->header( "Date" ) // "",
313             $x_amz_headers . $canon_resource );
314              
315 84         12169 my $s3 = $self->{s3};
316              
317 84         588 my $hmac = Digest::HMAC_SHA1->new( $self->{secret_key} );
318 84         4971 $hmac->add( $buffer );
319              
320 84         701 my $access_key = $self->{access_key};
321 84         333 my $authkey = encode_base64( $hmac->digest, "" );
322              
323 84         2977 $request->header( Authorization => "AWS $access_key:$authkey" );
324             }
325              
326             # Turn non-2xx results into errors
327             sub _do_request
328             {
329 84     84   231 my $self = shift;
330 84         447 my ( $request, %args ) = @_;
331              
332             $self->{http}->do_request(
333             request => $request,
334             SSL => ( $request->uri->scheme eq "https" ),
335             %args
336             )->then_with_f( sub {
337 81     81   281355 my ( $f, $resp ) = @_;
338              
339 81         371 my $code = $resp->code;
340 77 100       1277 if( $code !~ m/^2/ ) {
341 11         34 my $message = $resp->message;
342 11         122 $message =~ s/\r$//; # HTTP::Response leaves the \r on this
343              
344 11         81 return Future->fail(
345             "$code $message", http => $resp, $request
346             );
347             }
348              
349 66         282 return $f;
350 84         368 });
351             }
352              
353             # Convert response into an XML XPathContext tree
354             sub _do_request_xpc
355             {
356 31     31   76 my $self = shift;
357 31         104 my ( $request, @args ) = @_;
358              
359             $self->_do_request( $request, @args )->then( sub {
360 26     26   3607 my $resp = shift;
361              
362 26         291 my $xpc = XML::LibXML::XPathContext->new( $libxml->parse_string( $resp->content ) );
363 26         8096 $xpc->registerNs( s3 => "http://s3.amazonaws.com/doc/2006-03-01/" );
364              
365 26         246 return Future->wrap( $xpc );
366 31         186 });
367             }
368              
369             sub _retry
370             {
371 77     77   927 my $self = shift;
372 77         391 my ( $method, @args ) = @_;
373              
374 77         185 my $delay = 0.5;
375              
376 77         191 my $retries = $self->{max_retries};
377             try_repeat {
378 80     80   3118 my ( $prev_f ) = @_;
379              
380             # Add a small delay after failure before retrying
381 80 100       438 my $delay_f =
382             $prev_f ? $self->loop->delay_future( after => ( $delay *= 2 ) )
383             : Future->done;
384              
385 80         3597 $delay_f->then( sub { $self->$method( @args ) } );
  80         4011657  
386             } while => sub {
387 78     78   12648 my $f = shift;
388 78 100       336 my ( $failure, $name, $response, $request ) = $f->failure or return 0; # success
389 14 100 100     329 return 0 if defined $name and $name eq "http" and
      66        
      100        
390             $response and $response->code =~ m/^4/; # don't retry HTTP 4xx
391 9         88 return --$retries;
392 77         888 };
393             }
394              
395             =head2 list_bucket
396              
397             ( $keys, $prefixes ) = $s3->list_bucket( %args )->get
398              
399             Requests a list of the keys in a bucket, optionally within some prefix.
400              
401             Takes the following named arguments:
402              
403             =over 8
404              
405             =item bucket => STR
406              
407             The name of the S3 bucket to query
408              
409             =item prefix => STR
410              
411             =item delimiter => STR
412              
413             Optional. If supplied, the prefix and delimiter to use to divide up the key
414             namespace. Keys will be divided on the C parameter, and only the
415             key space beginning with the given prefix will be queried.
416              
417             =back
418              
419             The Future will return two ARRAY references. The first provides a list of the
420             keys found within the given prefix, and the second will return a list of the
421             common prefixes of further nested keys.
422              
423             Each key in the C<$keys> list is given in a HASH reference containing
424              
425             =over 8
426              
427             =item key => STRING
428              
429             The key's name
430              
431             =item last_modified => STRING
432              
433             The last modification time of the key given in ISO 8601 format
434              
435             =item etag => STRING
436              
437             The entity tag of the key
438              
439             =item size => INT
440              
441             The size of the key's value, in bytes
442              
443             =item storage_class => STRING
444              
445             The S3 storage class of the key
446              
447             =back
448              
449             Each key in the C<$prefixes> list is given as a plain string.
450              
451             =cut
452              
453             sub _list_bucket
454             {
455 12     12   33 my $self = shift;
456 12         69 my %args = @_;
457              
458 12         36 my @keys;
459             my @prefixes;
460              
461             my $f = repeat {
462 13     13   504 my ( $prev_f ) = @_;
463              
464 13 100       44 my $marker = $prev_f ? $prev_f->get : undef;
465              
466             my $req = $self->_make_request(
467             method => "GET",
468             bucket => $args{bucket},
469             abs_path => "",
470             query_params => {
471 26         157 prefix => join( "", grep { defined } $self->{prefix}, $args{prefix} ),
472             delimiter => $args{delimiter},
473             marker => $marker,
474             max_keys => $self->{list_max_keys},
475             },
476 13         64 content => "",
477             );
478              
479             $self->_do_request_xpc( $req,
480             timeout => $args{timeout} // $self->{timeout},
481             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
482             )->then( sub {
483 8         1180 my $xpc = shift;
484              
485 8         15 my $last_key;
486 8         37 foreach my $node ( $xpc->findnodes( ".//s3:Contents" ) ) {
487 4         243 my $key = $xpc->findvalue( ".//s3:Key", $node );
488 4         432 $last_key = $key;
489              
490 4 100       195 $key =~ s/^\Q$self->{prefix}\E// if defined $self->{prefix};
491 4         62 push @keys, {
492             key => $key,
493             last_modified => $xpc->findvalue( ".//s3:LastModified", $node ),
494             etag => $xpc->findvalue( ".//s3:ETag", $node ),
495             size => $xpc->findvalue( ".//s3:Size", $node ),
496             storage_class => $xpc->findvalue( ".//s3:StorageClass", $node ),
497             };
498             }
499              
500 8         1370 foreach my $node ( $xpc->findnodes( ".//s3:CommonPrefixes" ) ) {
501 4         141 my $key = $xpc->findvalue( ".//s3:Prefix", $node );
502              
503 4 100       255 $key =~ s/^\Q$self->{prefix}\E// if defined $self->{prefix};
504 4         81 push @prefixes, $key;
505             }
506              
507 8   66     229 $last_key //= $xpc->findvalue(".//s3:NextMarker");
508              
509 8 100       531 if( $xpc->findvalue( ".//s3:IsTruncated" ) eq "true" ) {
510 1         60 return Future->wrap( $last_key );
511             }
512 7         531 return Future->done;
513 13   100     167 });
      33        
514             } while => sub {
515 13     13   2963 my $f = shift;
516 12 100       143 !$f->failure and $f->get };
  13         71  
517              
518             $f->then( sub {
519 7     7   867 return Future->wrap( \@keys, \@prefixes );
520 12         5534 });
521             }
522              
523             sub list_bucket
524             {
525 9     9 1 6735 my $self = shift;
526 9         38 $self->_retry( "_list_bucket", @_ );
527             }
528              
529             =head2 get_object
530              
531             ( $value, $response, $meta ) = $s3->get_object( %args )
532              
533             Requests the value of a key from a bucket.
534              
535             Takes the following named arguments:
536              
537             =over 8
538              
539             =item bucket => STR
540              
541             The name of the S3 bucket to query
542              
543             =item key => STR
544              
545             The name of the key to query
546              
547             =item on_chunk => CODE
548              
549             Optional. If supplied, this code will be invoked repeatedly on receipt of more
550             bytes of the key's value. It will be passed the L object
551             received in reply to the request, and a byte string containing more bytes of
552             the value. Its return value is not important.
553              
554             $on_chunk->( $header, $bytes )
555              
556             If this is supplied then the key's value will not be accumulated, and the
557             final result of the Future will be an empty string.
558              
559             =item byte_range => STRING
560              
561             Optional. If supplied, is used to set the C request header with
562             C as the units. This gives a range of bytes of the object to fetch,
563             rather than fetching the entire content. The value must be as specified by
564             HTTP/1.1; i.e. a comma-separated list of ranges, where each range specifies a
565             start and optionally an inclusive stop byte index, separated by hypens.
566              
567             =item if_match => STRING
568              
569             Optional. If supplied, is used to set the C request header to the
570             given string, which should be an entity etag. If the requested object no
571             longer has this etag, the request will fail with an C failure whose
572             response code is 412.
573              
574             =back
575              
576             The Future will return a byte string containing the key's value, the
577             L that was received, and a hash reference containing any of
578             the metadata fields, if found in the response. If an C code
579             reference is passed, the C<$value> string will be empty.
580              
581             If the entire content of the object is requested (i.e. if C is not
582             supplied) then stall timeout failures will be handled specially. If a stall
583             timeout happens while receiving the content, the request will be retried using
584             the C header to resume from progress so far. This will be repeated
585             while every attempt still makes progress, and such resumes will not be counted
586             as part of the normal retry count. The resume request also uses C to
587             ensure it only resumes the resource with matching ETag. If a resume request
588             fails for some reason (either because the ETag no longer matches or something
589             else) then this error is ignored, and the original stall timeout failure is
590             returned.
591              
592             =cut
593              
594             sub _head_then_get_object
595             {
596 18     18   48 my $self = shift;
597 18         75 my %args = @_;
598              
599 18         72 my $if_match = $args{if_match};
600 18         36 my $byte_range = $args{byte_range};
601              
602             # TODO: This doesn't handle retries correctly
603             # But that said neither does the rest of this module, wrt: on_chunk streaming
604              
605 18         47 my $on_chunk = delete $args{on_chunk};
606              
607 18         27 my $header;
608 18         91 my $head_future = $self->loop->new_future;
609 18         2826 my $value_future;
610 18         34 my $value_len = 0;
611 18         32 my $stall_failure_f;
612              
613 18         36 my $resume_on_stall = 1;
614              
615             # TODO: Right now I can't be bothered to write the logic required to update
616             # the user-requested byte_range (which may in complex cases contain multiple
617             # discontinuous ranges) after a stall to resume it. This probably could be
618             # done at some stage.
619 18 100       57 $resume_on_stall = 0 if defined $byte_range;
620              
621             ( try_repeat {
622 21 100   21   605 if( my $pos = $value_len ) {
623 3         9 $byte_range = "$pos-";
624             }
625              
626             my $request = $self->_make_request(
627             method => $args{method},
628             bucket => $args{bucket},
629             path => $args{key},
630 21 100       161 headers => {
    100          
631             ( defined $if_match ? ( "If-Match" => $if_match ) : () ),
632             ( defined $byte_range ? ( Range => "bytes=$byte_range" ) : () ),
633             },
634             content => "",
635             );
636              
637             $self->_do_request( $request,
638             timeout => $args{timeout},
639             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
640             on_header => sub {
641 21         51349 my ( $this_header ) = @_;
642 21         63 my $code = $this_header->code;
643              
644 21 100 100     262 if( $head_future->is_cancelled or $code !~ m/^2/ ) {
645             # Just eat the body on cancellation or if it's not a 2xx
646             # For failures this will cause ->on_fail to occur and fail the
647             # $head_future
648             return sub {
649 14 100       163 return if @_;
650 7         29 return $this_header;
651 7         102 };
652             }
653              
654 14         195 my %meta;
655             $this_header->scan( sub {
656 17 100       473 $_[0] =~ m/^X-Amz-Meta-(.*)$/i and $meta{$1} = $_[1];
657 14         110 });
658              
659 14 100       129 if( !$value_future ) {
660             # First response
661 12         29 $header = $this_header;
662             # If we're going to retry this, ensure we only request this exact ETag
663 12   66     68 $if_match ||= $header->header( "ETag" );
664              
665 12         563 $value_future = $head_future->new;
666 12         264 $head_future->done( $value_future, $header, \%meta );
667             }
668              
669             return sub {
670             # Code here could be 200 (OK), 206 (Partial Content) or 204 (No Content)
671 25 50       4178 return $this_header if $code == 204;
672              
673 25 100       70 if( @_ ) {
674 14         31 $value_len += length $_[0];
675 14 100       103 if( $on_chunk ) {
676 3         11 $on_chunk->( $header, @_ );
677             }
678             else {
679 11         63 $header->add_content( $_[0] );
680             }
681 14         281 return;
682             }
683 11         44 return $header; # with no body content
684 14         716 };
685             }
686 21   33     276 );
687             } while => sub {
688 21     21   3130 my ( $prev_f ) = @_;
689             # repeat while it keeps failing with a stall timeout
690 21 100 100     108 return 0 if !$resume_on_stall or !$prev_f->failure;
691              
692 9   50     127 my $op = ( $prev_f->failure )[1] // "";
693 9 100       128 return 0 if $op ne "stall_timeout";
694              
695 3   33     15 $stall_failure_f ||= $prev_f;
696 3         7 return 1;
697             } )->on_done( sub {
698 12     12   1004 my ( $response ) = @_;
699 12 100       40 return if $head_future->is_cancelled;
700              
701 11         86 $value_future->done( $response->content, ( $head_future->get )[1,2] );
702             })->on_fail( sub {
703 6   66 6   627 my $f = $value_future || $head_future;
704             # If we have a $stall_failure_f it means we must have attempted a resume
705             # after a stall timeout, then failed for a different reason. Ignore that
706             # second reason and just pretend to the caller that we stalled.
707 6 100       22 $f->fail( $stall_failure_f ? $stall_failure_f->failure : @_ ) if !$f->is_cancelled;
    100          
708 18         159 });
709              
710 18         5904 return $head_future;
711             }
712              
713             sub get_object
714             {
715 12     12 1 8416 my $self = shift;
716 12         63 my %args = @_;
717              
718             $self->_retry( sub {
719             $self->_head_then_get_object( %args, method => "GET" )
720 12     12   59 ->then( sub { my ( $value_f ) = @_; $value_f }); # wait on the value
  10         877  
  10         24  
721 12         89 });
722             }
723              
724             =head2 head_object
725              
726             ( $response, $meta ) = $s3->head_object( %args )->get
727              
728             Requests the value metadata of a key from a bucket. This is similar to the
729             C method, but uses the C HTTP verb instead of C.
730              
731             Takes the same named arguments as C, but will ignore an
732             C callback, if provided.
733              
734             The Future will return the L object and metadata hash
735             reference, without the content string (as no content is returned to a C
736             request).
737              
738             =cut
739              
740             sub head_object
741             {
742 2     2 1 3121 my $self = shift;
743 2         9 my %args = @_;
744              
745             $self->_retry( sub {
746             $self->_head_then_get_object( %args, method => "HEAD" )
747 1         113 ->then( sub { my ( $value_f ) = @_; $value_f }) # wait on the empty body
  1         4  
748 2     2   10 ->then( sub { shift; Future->wrap( @_ ) }); # remove (empty) value
  1         143  
  1         6  
749 2         19 });
750             }
751              
752             =head2 head_then_get_object
753              
754             ( $value_f, $response, $meta ) = $s3->head_then_get_object( %args )->get
755              
756             ( $value, $response, $meta ) = $value_f->get
757              
758             Performs a C operation similar to C, but allows access to the
759             metadata header before the body content is complete.
760              
761             Takes the same named arguments as C.
762              
763             The returned Future completes as soon as the metadata header has been received
764             and yields a second future (the body future), the L and a hash
765             reference containing the metadata fields. The body future will eventually
766             yield the actual body, along with another copy of the response and metadata
767             hash reference.
768              
769             =cut
770              
771             sub head_then_get_object
772             {
773 4     4 1 5777 my $self = shift;
774 4         20 $self->_retry( "_head_then_get_object", @_, method => "GET" );
775             }
776              
777             =head2 put_object
778              
779             ( $etag, $length ) = $s3->put_object( %args ) ==> ( $etag, $length )
780              
781             Sets a new value for a key in the bucket.
782              
783             Takes the following named arguments:
784              
785             =over 8
786              
787             =item bucket => STRING
788              
789             The name of the S3 bucket to put the value in
790              
791             =item key => STRING
792              
793             The name of the key to put the value in
794              
795             =item value => STRING
796              
797             =item value => Future giving STRING
798              
799             Optional. If provided, gives a byte string as the new value for the key or a
800             L which will eventually yield such.
801              
802             =item value => CODE
803              
804             =item value_length => INT
805              
806             Alternative form of C, which is a C reference to a generator
807             function. It will be called repeatedly to generate small chunks of content,
808             being passed the position and length it should yield.
809              
810             $chunk = $value->( $pos, $len )
811              
812             Typically this can be provided either by a C operation on a larger
813             string buffer, or a C and C operation on a filehandle.
814              
815             In normal operation the function will just be called in a single sweep in
816             contiguous regions up to the extent given by C. If however, the
817             MD5sum check fails at the end of upload, it will be called again to retry the
818             operation. The function must therefore be prepared to be invoked multiple
819             times over its range.
820              
821             =item value => Future giving ( CODE, INT )
822              
823             Alternative form of C, in which a C eventually yields the value
824             generation C reference and length. The C reference is invoked as
825             documented above.
826              
827             ( $gen_value, $value_len ) = $value->get;
828              
829             $chunk = $gen_value->( $pos, $len );
830              
831             =item gen_parts => CODE
832              
833             Alternative to C in the case of larger values, and implies the use of
834             multipart upload. Called repeatedly to generate successive parts of the
835             upload. Each time C is called it should return one of the forms of
836             C given above; namely, a byte string, a C reference and size
837             pair, or a C which will eventually yield either of these forms.
838              
839             ( $value ) = $gen_parts->()
840              
841             ( $gen_value, $value_length ) = $gen_parts->()
842              
843             ( $value_f ) = $gen_parts->(); $value = $value_f->get
844             ( $gen_value, $value_length ) = $value_f->get
845              
846             Each case is analogous to the types that the C key can take.
847              
848             =item meta => HASH
849              
850             Optional. If provided, gives additional user metadata fields to set on the
851             object, using the C fields.
852              
853             =item timeout => NUM
854              
855             Optional. For single-part uploads, this sets the C argument to use
856             for the actual C request. For multi-part uploads, this argument is
857             currently ignored.
858              
859             =item meta_timeout => NUM
860              
861             Optional. For multipart uploads, this sets the C argument to use for
862             the initiate and complete requests, overriding a configured C.
863             Ignored for single-part uploads.
864              
865             =item part_timeout => NUM
866              
867             Optional. For multipart uploads, this sets the C argument to use for
868             the individual part C requests. Ignored for single-part uploads.
869              
870             =item on_write => CODE
871              
872             Optional. If provided, this code will be invoked after each successful
873             C call on the underlying filehandle when writing actual file
874             content, indicating that the data was at least written as far as the
875             kernel. It will be passed the total byte length that has been written for this
876             call to C. By the time the call has completed, this will be the
877             total written length of the object.
878              
879             $on_write->( $bytes_written )
880              
881             Note that because of retries it is possible this count will decrease, if a
882             part has to be retried due to e.g. a failing MD5 checksum.
883              
884             =item concurrent => INT
885              
886             Optional. If present, gives the number of parts to upload concurrently. If
887             absent, a default of 1 will apply (i.e. no concurrency).
888              
889             =back
890              
891             The Future will return a string containing the S3 ETag of the newly-set key,
892             and the length of the value in bytes.
893              
894             For single-part uploads the ETag will be the MD5 sum in hex, surrounded by
895             quote marks. For multi-part uploads this is a string in a different form,
896             though details of its generation are not specified by S3.
897              
898             The returned MD5 sum from S3 during upload will be checked against an
899             internally-generated MD5 sum of the content that was sent, and an error result
900             will be returned if these do not match.
901              
902             =cut
903              
904             sub _md5sum_wrap
905             {
906 35     35   85 my $content = shift;
907 35         124 my @args = my ( $md5ctx, $posref, $content_length, $read_size ) = @_;
908              
909 35 100 66     217 if( !defined $content or !ref $content ) {
    50 0        
    0          
910 29         80 my $len = $content_length - $$posref;
911 29 50       76 if( defined $content ) {
912 29 50       88 substr( $content, $len ) = "" if length $content > $len;
913             }
914             else {
915 0         0 $content = "\0" x $len;
916             }
917              
918 29         173 $md5ctx->add( $content );
919 29         52 $$posref += length $content;
920              
921 29         167 return $content;
922             }
923             elsif( ref $content eq "CODE" ) {
924             return sub {
925 12 100   12   4762 return undef if $$posref >= $content_length;
926              
927 6         18 my $len = $content_length - $$posref;
928 6 50       19 $len = $read_size if $len > $read_size;
929              
930 6         24 my $chunk = $content->( $$posref, $len );
931 6         45 return _md5sum_wrap( $chunk, @args );
932             }
933 6         52 }
934             elsif( blessed $content and $content->isa( "Future" ) ) {
935             return $content->transform(
936             done => sub {
937 0     0   0 my ( $chunk ) = @_;
938 0         0 return _md5sum_wrap( $chunk, @args );
939             },
940 0         0 );
941             }
942             else {
943 0         0 die "TODO: md5sum wrap ref $content";
944             }
945             }
946              
947             sub _put_object
948             {
949 29     29   67 my $self = shift;
950 29         194 my %args = @_;
951              
952 29         231 my $md5ctx = Digest::MD5->new;
953 29         71 my $on_write = $args{on_write};
954 29         64 my $pos = 0;
955              
956             # Make $content definitely a Future
957             Future->wrap( delete $args{content} )->then( sub {
958 29     29   3389 my ( $content ) = @_;
959             my $content_length = @_ > 1 ? $_[1] :
960             ref $content ? $args{content_length} :
961 29 100       149 length $content;
    100          
962 29 50       96 defined $content_length or die "TODO: referential content $content needs length";
963              
964 29         174 my $request = $self->_make_request(
965             %args,
966             method => "PUT",
967             content => "", # Doesn't matter, it'll be ignored
968             );
969              
970 29         138 $request->content_length( $content_length );
971 29         1167 $request->content( "" );
972              
973             $self->_do_request( $request,
974             timeout => $args{timeout},
975             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
976             expect_continue => 1,
977 29 100 33     911 request_body => _md5sum_wrap( $content, $md5ctx, \$pos, $content_length, $self->{read_size} ),
978             ( $on_write ? ( on_body_write => $on_write ) : () ),
979             );
980             })->then( sub {
981 27     27   3555 my $resp = shift;
982              
983 27 50       170 defined( my $etag = $resp->header( "ETag" ) ) or
984             return Future->fail( "Response did not provide an ETag header", s3 => $resp );
985              
986             # Amazon S3 currently documents that the returned ETag header will be
987             # the MD5 hash of the content, surrounded in quote marks. We'd better
988             # hope this continues to be true... :/
989 27 50       1824 my ( $got_md5 ) = lc($etag) =~ m/^"([0-9a-f]{32})"$/ or
990             return Future->fail( "Returned ETag ($etag) does not look like an MD5 sum", s3 => $resp );
991              
992 27         229 my $expect_md5 = lc($md5ctx->hexdigest);
993              
994 27 50       116 if( $got_md5 ne $expect_md5 ) {
995 0         0 return Future->fail( "Returned MD5 hash ($got_md5) did not match expected ($expect_md5)", s3 => $resp );
996             }
997              
998 27         150 return Future->wrap( $etag, $pos );
999 29         101 });
1000             }
1001              
1002             sub _initiate_multipart_upload
1003             {
1004 9     9   22 my $self = shift;
1005 9         54 my %args = @_;
1006              
1007             my $req = $self->_make_request(
1008             method => "POST",
1009             bucket => $args{bucket},
1010             path => "$args{key}?uploads",
1011             content => "",
1012             meta => $args{meta},
1013 9         89 );
1014              
1015             $self->_do_request_xpc( $req,
1016             timeout => $args{timeout},
1017             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
1018             )->then( sub {
1019 9     9   1516 my $xpc = shift;
1020              
1021 9         75 my $id = $xpc->findvalue( ".//s3:InitiateMultipartUploadResult/s3:UploadId" );
1022 9         1380 return Future->wrap( $id );
1023 9   33     81 });
1024             }
1025              
1026             sub _complete_multipart_upload
1027             {
1028 9     9   34 my $self = shift;
1029 9         59 my %args = @_;
1030              
1031             my $req = $self->_make_request(
1032             method => "POST",
1033             bucket => $args{bucket},
1034             path => $args{key},
1035             content => $args{content},
1036             query_params => {
1037             uploadId => $args{id},
1038             },
1039             headers => {
1040             "Content-Type" => "application/xml",
1041 9         170 "Content-MD5" => encode_base64( md5( $args{content} ), "" ),
1042             },
1043             );
1044              
1045             $self->_do_request_xpc( $req,
1046             timeout => $args{timeout},
1047             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
1048             )->then( sub {
1049 9     9   1363 my $xpc = shift;
1050              
1051 9         49 my $etag = $xpc->findvalue( ".//s3:CompleteMultipartUploadResult/s3:ETag" );
1052 9         938 return Future->wrap( $etag );
1053 9   33     98 });
1054             }
1055              
1056             sub _put_object_multipart
1057             {
1058 9     9   28 my $self = shift;
1059 9         39 my ( $gen_parts, %args ) = @_;
1060              
1061 9         27 my $on_write = $args{on_write};
1062              
1063 9         24 my $id;
1064              
1065 9         18 my $written_committed = 0; # bytes written in committed parts
1066 9         23 my %written_tentative; # {$part_num} => bytes written so far in this part
1067              
1068             $self->_retry( "_initiate_multipart_upload",
1069             timeout => $args{meta_timeout} // $self->{timeout},
1070             %args
1071             )->then( sub {
1072 9     9   993 ( $id ) = @_;
1073              
1074 9         22 my $part_num = 0;
1075             ( fmap1 {
1076 20         330 my ( $part_num, $content, %moreargs ) = @{$_[0]};
  20         76  
1077              
1078             $self->_retry( "_put_object",
1079             bucket => $args{bucket},
1080             path => $args{key},
1081             content => $content,
1082             query_params => {
1083             partNumber => $part_num,
1084             uploadId => $id,
1085             },
1086             timeout => $args{part_timeout},
1087             ( $on_write ?
1088             ( on_write => sub {
1089 6         3650 $written_tentative{$part_num} = $_[0];
1090 6         157 $on_write->( $written_committed + sum values %written_tentative );
1091             } ) : () ),
1092             %moreargs,
1093             )->then( sub {
1094 20         2092 my ( $etag, $len ) = @_;
1095 20         57 $written_committed += $len;
1096 20         55 delete $written_tentative{$part_num};
1097 20         101 return Future->wrap( [ $part_num, $etag ] );
1098 20 100       278 });
1099             } generate => sub {
1100 31 100       4968 my @content = $gen_parts->() or return;
1101 20         61 $part_num++;
1102 20 100       119 return [ $part_num, $content[0] ] if @content == 1;
1103 3 50 33     35 return [ $part_num, $content[0], content_length => $content[1] ] if @content == 2 and ref $content[0] eq "CODE";
1104             # It's possible that $content[0] is a 100MiByte string. Best not to
1105             # interpolate that into $@ or we'll risk OOM
1106             die "Unsure what to do with gen_part result " .
1107 0 0       0 join( ", ", map { ref $_ ? $_ : "<".length($_)." bytes>" } @content );
  0         0  
1108 9   66     207 }, concurrent => $args{concurrent} // $self->{put_concurrent} );
1109             })->then( sub {
1110 9     9   1279 my @etags = @_;
1111              
1112 9         157 my $doc = XML::LibXML::Document->new( "1.0", "UTF-8" );
1113 9         213 $doc->addChild( my $root = $doc->createElement( "CompleteMultipartUpload" ) );
1114              
1115             #add content
1116 9         70 foreach ( @etags ) {
1117 20         378 my ( $part_num, $etag ) = @$_;
1118              
1119 20         195 $root->addChild( my $part = $doc->createElement('Part') );
1120 20         53 $part->appendTextChild( PartNumber => $part_num );
1121 20         421 $part->appendTextChild( ETag => $etag );
1122             }
1123              
1124             $self->_retry( "_complete_multipart_upload",
1125             %args,
1126             id => $id,
1127             content => $doc->toString,
1128             timeout => $args{meta_timeout} // $self->{timeout},
1129             )->then( sub {
1130 9         966 my ( $etag ) = @_;
1131 9         42 return Future->wrap( $etag, $written_committed );
1132 9   100     159 });
1133 9   100     88 });
1134             }
1135              
1136             sub put_object
1137             {
1138 18     18 1 14049 my $self = shift;
1139 18         104 my %args = @_;
1140              
1141 18         40 my $gen_parts;
1142 18 100       87 if( $gen_parts = delete $args{gen_parts} ) {
1143             # OK
1144             }
1145             else {
1146 9   66     50 my $content_length = $args{value_length} // length $args{value};
1147              
1148 9         20 my $part_size = $self->{part_size};
1149              
1150 9 50       36 if( $content_length > $part_size * 10_000 ) {
    100          
1151 0         0 croak "Cannot upload content in more than 10,000 parts - consider using a larger part_size";
1152             }
1153             elsif( $content_length > $part_size ) {
1154             $gen_parts = sub {
1155 3 100   3   17 return unless length $args{value};
1156 2         11 return substr( $args{value}, 0, $part_size, "" );
1157 1         6 };
1158             }
1159             else {
1160 8         26 my @parts = ( [ delete $args{value}, $content_length ] );
1161 8 100   16   37 $gen_parts = sub { return unless @parts; @{ shift @parts } };
  16         45  
  8         25  
  8         28  
1162             }
1163             }
1164              
1165 18         38 my @parts;
1166 18         53 push @parts, [ $gen_parts->() ];
1167              
1168             # Ensure first part is a Future then unfuture it
1169 18         160 Future->wrap( @{$parts[0]} )->then( sub {
1170 18 100   18   2951 @{$parts[0]} = @_ if @_;
  17         51  
1171              
1172 18         61 push @parts, [ $gen_parts->() ];
1173              
1174 18 100       68 if( @{ $parts[1] } ) {
  18 100       62  
1175             # There are at least two parts; we'd better use multipart upload
1176 9 100       94 $self->_put_object_multipart( sub { @parts ? @{ shift @parts } : goto &$gen_parts }, %args );
  31         166  
  18         118  
1177             }
1178 9         25 elsif( @{ $parts[0] } ) {
1179             # There is exactly one part
1180 8         11 my ( $content, $content_length ) = @{ shift @parts };
  8         20  
1181             $self->_retry( "_put_object",
1182             bucket => $args{bucket},
1183             path => $args{key},
1184             content => $content,
1185             content_length => $content_length,
1186             meta => $args{meta},
1187 8         48 %args,
1188             );
1189             }
1190             else {
1191             # There are no parts at all - create an empty object
1192             $self->_retry( "_put_object",
1193             bucket => $args{bucket},
1194             path => $args{key},
1195             content => "",
1196             meta => $args{meta},
1197 1         9 %args,
1198             );
1199             }
1200 18         81 });
1201             }
1202              
1203             =head2 delete_object
1204              
1205             $s3->delete_object( %args )->get
1206              
1207             Deletes a key from the bucket.
1208              
1209             Takes the following named arguments:
1210              
1211             =over 8
1212              
1213             =item bucket => STRING
1214              
1215             The name of the S3 bucket to put the value in
1216              
1217             =item key => STRING
1218              
1219             The name of the key to put the value in
1220              
1221             =back
1222              
1223             The Future will return nothing.
1224              
1225             =cut
1226              
1227             sub _delete_object
1228             {
1229 3     3   6 my $self = shift;
1230 3         14 my %args = @_;
1231              
1232             my $request = $self->_make_request(
1233             method => "DELETE",
1234             bucket => $args{bucket},
1235             path => $args{key},
1236 3         16 content => "",
1237             );
1238              
1239             $self->_do_request( $request,
1240             timeout => $args{timeout} // $self->{timeout},
1241             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
1242             )->then( sub {
1243 1     1   146 return Future->done;
1244 3   100     39 });
      33        
1245             }
1246              
1247             sub delete_object
1248             {
1249 3     3 1 303 my $self = shift;
1250 3         15 $self->_retry( "_delete_object", @_ );
1251             }
1252              
1253             =head1 SPONSORS
1254              
1255             Parts of this code were paid for by
1256              
1257             =over 2
1258              
1259             =item *
1260              
1261             SocialFlow L
1262              
1263             =item *
1264              
1265             Shadowcat Systems L
1266              
1267             =item *
1268              
1269             Deriv L
1270              
1271             =back
1272              
1273             =head1 AUTHOR
1274              
1275             Paul Evans
1276              
1277             =cut
1278              
1279             0x55AA;