File Coverage

blib/lib/Net/Async/Webservice/S3.pm
Criterion Covered Total %
statement 46 48 95.8
branch n/a
condition n/a
subroutine 16 16 100.0
pod n/a
total 62 64 96.8


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2013-2014 -- leonerd@leonerd.org.uk
5              
6             package Net::Async::Webservice::S3;
7              
8 10     10   1493652 use strict;
  10         24  
  10         394  
9 10     10   59 use warnings;
  10         20  
  10         352  
10 10     10   67 use base qw( IO::Async::Notifier );
  10         59  
  10         10899  
11              
12             our $VERSION = '0.18';
13              
14 10     10   31847 use Carp;
  10         23  
  10         626  
15              
16 10     10   8367 use Digest::HMAC_SHA1;
  10         351855  
  10         499  
17 10     10   96 use Digest::MD5 qw( md5 );
  10         770  
  10         588  
18 10     10   2462 use Future 0.21; # ->then_with_f
  10         25339  
  10         357  
19 10     10   10075 use Future::Utils 0.16 qw( repeat try_repeat fmap1 );
  10         24155  
  10         831  
20 10     10   316909 use HTTP::Date qw( time2str );
  10         226912  
  10         845  
21 10     10   9288 use HTTP::Request;
  10         72161  
  10         291  
22 10     10   9193 use IO::Async::Timer::Countdown;
  10         22092  
  10         337  
23 10     10   78 use List::Util qw( sum );
  10         31  
  10         1252  
24 10     10   9575 use MIME::Base64 qw( encode_base64 );
  10         8509  
  10         773  
25 10     10   67 use Scalar::Util qw( blessed );
  10         19  
  10         729  
26 10     10   64 use URI::Escape qw( uri_escape_utf8 );
  10         18  
  10         547  
27 10     10   17472 use XML::LibXML;
  0            
  0            
28             use XML::LibXML::XPathContext;
29              
30             my $libxml = XML::LibXML->new;
31              
32             use constant DEFAULT_S3_HOST => "s3.amazonaws.com";
33              
34             =head1 NAME
35              
36             C - use Amazon's S3 web service with C
37              
38             =head1 SYNOPSIS
39              
40             use IO::Async::Loop;
41             use Net::Async::Webservice::S3;
42              
43             my $loop = IO::Async::Loop->new;
44              
45             my $s3 = Net::Async::Webservice::S3->new(
46             access_key => ...,
47             secret_key => ...,
48             bucket => "my-bucket-here",
49             );
50             $loop->add( $s3 );
51              
52             my $put_f = $s3->put_object(
53             key => "the-key",
54             value => "A new value for the key\n";
55             );
56              
57             my $get_f = $s3->get_object(
58             key => "another-key",
59             );
60              
61             $loop->await_all( $put_f, $get_f );
62              
63             print "The value is:\n", $get_f->get;
64              
65             =head1 DESCRIPTION
66              
67             This module provides a webservice API around Amazon's S3 web service for use
68             in an L-based program. Each S3 operation is represented by a method
69             that returns a L; this future, if successful, will eventually return
70             the result of the operation.
71              
72             =cut
73              
74             sub _init
75             {
76             my $self = shift;
77             my ( $args ) = @_;
78              
79             $args->{http} ||= do {
80             require Net::Async::HTTP;
81             Net::Async::HTTP->VERSION( '0.33' ); # 'timeout' and 'stall_timeout' failures
82             my $http = Net::Async::HTTP->new;
83             $self->add_child( $http );
84             $http;
85             };
86              
87             $args->{max_retries} //= 3;
88             $args->{list_max_keys} //= 1000;
89             $args->{read_size} //= 64*1024; # 64 KiB
90              
91             # S3 docs suggest > 100MB should use multipart. They don't actually
92             # document what size of parts to use, but we'll use that again.
93             $args->{part_size} //= 100*1024*1024;
94              
95             $args->{host} //= DEFAULT_S3_HOST;
96              
97             return $self->SUPER::_init( @_ );
98             }
99              
100             =head1 PARAMETERS
101              
102             The following named parameters may be passed to C or C:
103              
104             =over 8
105              
106             =item http => Net::Async::HTTP
107              
108             Optional. Allows the caller to provide a specific asynchronous HTTP user agent
109             object to use. This will be invoked with a single method, as documented by
110             L:
111              
112             $response_f = $http->do_request( request => $request, ... )
113              
114             If absent, a new instance will be created and added as a child notifier of
115             this object. If a value is supplied, it will be used as supplied and I
116             specifically added as a child notifier. In this case, the caller must ensure
117             it gets added to the underlying L instance, if required.
118              
119             =item access_key => STRING
120              
121             =item secret_key => STRING
122              
123             The twenty-character Access Key ID and forty-character Secret Key to use for
124             authenticating requests to S3.
125              
126             =item ssl => BOOL
127              
128             Optional. If given a true value, will use C URLs over SSL. Defaults to
129             off. This feature requires the optional L module if using
130             L.
131              
132             =item bucket => STRING
133              
134             Optional. If supplied, gives the default bucket name to use, at which point it
135             is optional to supply to the remaining methods.
136              
137             =item prefix => STRING
138              
139             Optional. If supplied, this prefix string is prepended to any key names passed
140             in methods, and stripped from the response from C. It can be used
141             to keep operations of the object contained within the named key space. If this
142             string is supplied, don't forget that it should end with the path delimiter in
143             use by the key naming scheme (for example C).
144              
145             =item host => STRING
146              
147             Optional. Sets the hostname to talk to the S3 service. Usually the default of
148             C is sufficient. This setting allows for communication with
149             other service providers who provide the same API as S3.
150              
151             =item max_retries => INT
152              
153             Optional. Maximum number of times to retry a failed operation. Defaults to 3.
154              
155             =item list_max_keys => INT
156              
157             Optional. Maximum number of keys at a time to request from S3 for the
158             C method. Larger values may be more efficient as fewer roundtrips
159             will be required per method call. Defaults to 1000.
160              
161             =item part_size => INT
162              
163             Optional. Size in bytes to break content for using multipart upload. If an
164             object key's size is no larger than this value, multipart upload will not be
165             used. Defaults to 100 MiB.
166              
167             =item read_size => INT
168              
169             Optional. Size in bytes to read per call to the C<$gen_value> content
170             generation function in C. Defaults to 64 KiB. Be aware that too
171             large a value may lead to the C stall timer failing to be invoked on slow
172             enough connections, causing spurious timeouts.
173              
174             =item timeout => NUM
175              
176             Optional. If configured, this is passed into individual requests of the
177             underlying C object, except for the actual content C or
178             C operations. It is therefore used by C, C,
179             and the multi-part metadata operations used by C. To apply an
180             overall timeout to an individual C or C operation,
181             pass a specific C argument to those methods specifically.
182              
183             =item stall_timeout => NUM
184              
185             Optional. If configured, this is passed into the underlying
186             C object and used for all content uploads and downloads.
187              
188             =item put_concurrent => INT
189              
190             Optional. If configured, gives a default value for the C parameter
191             to C.
192              
193             =back
194              
195             =cut
196              
197             sub configure
198             {
199             my $self = shift;
200             my %args = @_;
201              
202             foreach (qw( http access_key secret_key ssl bucket prefix host max_retries
203             list_max_keys part_size read_size timeout stall_timeout
204             put_concurrent )) {
205             exists $args{$_} and $self->{$_} = delete $args{$_};
206             }
207              
208             $self->SUPER::configure( %args );
209             }
210              
211             =head1 METHODS
212              
213             The following methods all support the following common arguments:
214              
215             =over 8
216              
217             =item timeout => NUM
218              
219             =item stall_timeout => NUM
220              
221             Optional. Passed directly to the underlying C<< Net::Async::HTTP->request >>
222             method.
223              
224             =back
225              
226             Each method below that yields a C is documented in the form
227              
228             $s3->METHOD( ARGS ) ==> YIELDS
229              
230             Where the C part indicates the values that will eventually be returned
231             by the C method on the returned Future object, if it succeeds.
232              
233             =cut
234              
235             sub _make_request
236             {
237             my $self = shift;
238             my %args = @_;
239              
240             my $method = $args{method};
241             defined $args{content} or croak "Missing 'content'";
242              
243             my @params;
244             foreach my $key ( sort keys %{ $args{query_params} } ) {
245             next unless defined( my $value = $args{query_params}->{$key} );
246             $key =~ s/_/-/g;
247             push @params, $key . "=" . uri_escape_utf8( $value, "^A-Za-z0-9_-" );
248             }
249              
250             my $bucket = $args{bucket} // $self->{bucket};
251             my $path = $args{abs_path} // join "", grep { defined } $self->{prefix}, $args{path};
252              
253             my $scheme = $self->{ssl} ? "https" : "http";
254              
255             my $uri;
256             if( length $bucket <= 63 and $bucket =~ m{^[A-Z0-9][A-Z0-9.-]+$}i ) {
257             $uri = "$scheme://$bucket.$self->{host}/$path";
258             }
259             else {
260             $uri = "$scheme://$self->{host}/$bucket/$path";
261             }
262             $uri .= "?" . join( "&", @params ) if @params;
263              
264             my $s3 = $self->{s3};
265              
266             my $meta = $args{meta} || {};
267              
268             my @headers = (
269             Date => time2str( time ),
270             %{ $args{headers} || {} },
271             ( map { +"X-Amz-Meta-$_" => $meta->{$_} } sort keys %$meta ),
272             );
273              
274             my $request = HTTP::Request->new( $method, $uri, \@headers, $args{content} );
275             $request->content_length( length $args{content} );
276              
277             $self->_gen_auth_header( $request, $bucket, $path );
278              
279             return $request;
280             }
281              
282             sub _gen_auth_header
283             {
284             my $self = shift;
285             my ( $request, $bucket, $path ) = @_;
286              
287             # See also
288             # http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#ConstructingTheAuthenticationHeader
289              
290             my $canon_resource = "/$bucket/$path";
291             if( defined $request->uri->query and length $request->uri->query ) {
292             my %params = $request->uri->query_form;
293             my %params_to_sign;
294             foreach (qw( partNumber uploadId )) {
295             $params_to_sign{$_} = $params{$_} if exists $params{$_};
296             }
297              
298             my @params_to_sign;
299             foreach ( sort keys %params_to_sign ) {
300             push @params_to_sign, defined $params{$_} ? "$_=$params{$_}" : $_;
301             }
302              
303             $canon_resource .= "?" . join( "&", @params_to_sign ) if @params_to_sign;
304             }
305              
306             my %x_amz_headers;
307             $request->scan( sub {
308             $x_amz_headers{lc $_[0]} = $_[1] if $_[0] =~ m/^X-Amz-/i;
309             });
310              
311             my $x_amz_headers = "";
312             $x_amz_headers .= "$_:$x_amz_headers{$_}\n" for sort keys %x_amz_headers;
313              
314             my $buffer = join( "\n",
315             $request->method,
316             $request->header( "Content-MD5" ) // "",
317             $request->header( "Content-Type" ) // "",
318             $request->header( "Date" ) // "",
319             $x_amz_headers . $canon_resource );
320              
321             my $s3 = $self->{s3};
322              
323             my $hmac = Digest::HMAC_SHA1->new( $self->{secret_key} );
324             $hmac->add( $buffer );
325              
326             my $access_key = $self->{access_key};
327             my $authkey = encode_base64( $hmac->digest, "" );
328              
329             $request->header( Authorization => "AWS $access_key:$authkey" );
330             }
331              
332             # Turn non-2xx results into errors
333             sub _do_request
334             {
335             my $self = shift;
336             my ( $request, %args ) = @_;
337              
338             $self->{http}->do_request(
339             request => $request,
340             SSL => ( $request->uri->scheme eq "https" ),
341             %args
342             )->then_with_f( sub {
343             my ( $f, $resp ) = @_;
344              
345             my $code = $resp->code;
346             if( $code !~ m/^2/ ) {
347             my $message = $resp->message;
348             $message =~ s/\r$//; # HTTP::Response leaves the \r on this
349              
350             return Future->new->fail(
351             "$code $message", http => $resp, $request
352             );
353             }
354              
355             return $f;
356             });
357             }
358              
359             # Convert response into an XML XPathContext tree
360             sub _do_request_xpc
361             {
362             my $self = shift;
363             my ( $request, @args ) = @_;
364              
365             $self->_do_request( $request, @args )->then( sub {
366             my $resp = shift;
367              
368             my $xpc = XML::LibXML::XPathContext->new( $libxml->parse_string( $resp->content ) );
369             $xpc->registerNs( s3 => "http://s3.amazonaws.com/doc/2006-03-01/" );
370              
371             return Future->wrap( $xpc );
372             });
373             }
374              
375             sub _retry
376             {
377             my $self = shift;
378             my ( $method, @args ) = @_;
379              
380             my $delay = 0.5;
381              
382             my $retries = $self->{max_retries};
383             try_repeat {
384             my ( $prev_f ) = @_;
385              
386             # Add a small delay after failure before retrying
387             my $delay_f =
388             $prev_f ? $self->loop->delay_future( after => ( $delay *= 2 ) )
389             : Future->new->done;
390              
391             $delay_f->then( sub { $self->$method( @args ) } );
392             } while => sub {
393             my $f = shift;
394             my ( $failure, $name, $response, $request ) = $f->failure or return 0; # success
395             return 0 if defined $name and $name eq "http" and
396             $response and $response->code =~ m/^4/; # don't retry HTTP 4xx
397             return --$retries;
398             };
399             }
400              
401             =head2 $s3->list_bucket( %args ) ==> ( $keys, $prefixes )
402              
403             Requests a list of the keys in a bucket, optionally within some prefix.
404              
405             Takes the following named arguments:
406              
407             =over 8
408              
409             =item bucket => STR
410              
411             The name of the S3 bucket to query
412              
413             =item prefix => STR
414              
415             =item delimiter => STR
416              
417             Optional. If supplied, the prefix and delimiter to use to divide up the key
418             namespace. Keys will be divided on the C parameter, and only the
419             key space beginning with the given prefix will be queried.
420              
421             =back
422              
423             The Future will return two ARRAY references. The first provides a list of the
424             keys found within the given prefix, and the second will return a list of the
425             common prefixes of further nested keys.
426              
427             Each key in the C<$keys> list is given in a HASH reference containing
428              
429             =over 8
430              
431             =item key => STRING
432              
433             The key's name
434              
435             =item last_modified => STRING
436              
437             The last modification time of the key given in ISO 8601 format
438              
439             =item etag => STRING
440              
441             The entity tag of the key
442              
443             =item size => INT
444              
445             The size of the key's value, in bytes
446              
447             =item storage_class => STRING
448              
449             The S3 storage class of the key
450              
451             =back
452              
453             Each key in the C<$prefixes> list is given as a plain string.
454              
455             =cut
456              
457             sub _list_bucket
458             {
459             my $self = shift;
460             my %args = @_;
461              
462             my @keys;
463             my @prefixes;
464              
465             my $f = repeat {
466             my ( $prev_f ) = @_;
467              
468             my $marker = $prev_f ? $prev_f->get : undef;
469              
470             my $req = $self->_make_request(
471             method => "GET",
472             bucket => $args{bucket},
473             abs_path => "",
474             query_params => {
475             prefix => join( "", grep { defined } $self->{prefix}, $args{prefix} ),
476             delimiter => $args{delimiter},
477             marker => $marker,
478             max_keys => $self->{list_max_keys},
479             },
480             content => "",
481             );
482              
483             $self->_do_request_xpc( $req,
484             timeout => $args{timeout} // $self->{timeout},
485             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
486             )->then( sub {
487             my $xpc = shift;
488              
489             my $last_key;
490             foreach my $node ( $xpc->findnodes( ".//s3:Contents" ) ) {
491             my $key = $xpc->findvalue( ".//s3:Key", $node );
492             $last_key = $key;
493              
494             $key =~ s/^\Q$self->{prefix}\E// if defined $self->{prefix};
495             push @keys, {
496             key => $key,
497             last_modified => $xpc->findvalue( ".//s3:LastModified", $node ),
498             etag => $xpc->findvalue( ".//s3:ETag", $node ),
499             size => $xpc->findvalue( ".//s3:Size", $node ),
500             storage_class => $xpc->findvalue( ".//s3:StorageClass", $node ),
501             };
502             }
503              
504             foreach my $node ( $xpc->findnodes( ".//s3:CommonPrefixes" ) ) {
505             my $key = $xpc->findvalue( ".//s3:Prefix", $node );
506              
507             $key =~ s/^\Q$self->{prefix}\E// if defined $self->{prefix};
508             push @prefixes, $key;
509             }
510              
511             if( $xpc->findvalue( ".//s3:IsTruncated" ) eq "true" ) {
512             return Future->wrap( $last_key );
513             }
514             return Future->new->done;
515             });
516             } while => sub {
517             my $f = shift;
518             !$f->failure and $f->get };
519              
520             $f->then( sub {
521             return Future->wrap( \@keys, \@prefixes );
522             });
523             }
524              
525             sub list_bucket
526             {
527             my $self = shift;
528             $self->_retry( "_list_bucket", @_ );
529             }
530              
531             =head2 $s3->get_object( %args ) ==> ( $value, $response, $meta )
532              
533             Requests the value of a key from a bucket.
534              
535             Takes the following named arguments:
536              
537             =over 8
538              
539             =item bucket => STR
540              
541             The name of the S3 bucket to query
542              
543             =item key => STR
544              
545             The name of the key to query
546              
547             =item on_chunk => CODE
548              
549             Optional. If supplied, this code will be invoked repeatedly on receipt of more
550             bytes of the key's value. It will be passed the L object
551             received in reply to the request, and a byte string containing more bytes of
552             the value. Its return value is not important.
553              
554             $on_chunk->( $header, $bytes )
555              
556             If this is supplied then the key's value will not be accumulated, and the
557             final result of the Future will be an empty string.
558              
559             =item byte_range => STRING
560              
561             Optional. If supplied, is used to set the C request header with
562             C as the units. This gives a range of bytes of the object to fetch,
563             rather than fetching the entire content. The value must be as specified by
564             HTTP/1.1; i.e. a comma-separated list of ranges, where each range specifies a
565             start and optionally an inclusive stop byte index, separated by hypens.
566              
567             =item if_match => STRING
568              
569             Optional. If supplied, is used to set the C request header to the
570             given string, which should be an entity etag. If the requested object no
571             longer has this etag, the request will fail with an C failure whose
572             response code is 412.
573              
574             =back
575              
576             The Future will return a byte string containing the key's value, the
577             L that was received, and a hash reference containing any of
578             the metadata fields, if found in the response. If an C code
579             reference is passed, the C<$value> string will be empty.
580              
581             If the entire content of the object is requested (i.e. if C is not
582             supplied) then stall timeout failures will be handled specially. If a stall
583             timeout happens while receiving the content, the request will be retried using
584             the C header to resume from progress so far. This will be repeated
585             while every attempt still makes progress, and such resumes will not be counted
586             as part of the normal retry count. The resume request also uses C to
587             ensure it only resumes the resource with matching ETag. If a resume request
588             fails for some reason (either because the ETag no longer matches or something
589             else) then this error is ignored, and the original stall timeout failure is
590             returned.
591              
592             =cut
593              
594             sub _head_then_get_object
595             {
596             my $self = shift;
597             my %args = @_;
598              
599             my $if_match = $args{if_match};
600             my $byte_range = $args{byte_range};
601              
602             # TODO: This doesn't handle retries correctly
603             # But that said neither does the rest of this module, wrt: on_chunk streaming
604              
605             my $on_chunk = delete $args{on_chunk};
606              
607             my $header;
608             my $head_future = $self->loop->new_future;
609             my $value_future;
610             my $value_len = 0;
611             my $stall_failure_f;
612              
613             my $resume_on_stall = 1;
614              
615             # TODO: Right now I can't be bothered to write the logic required to update
616             # the user-requested byte_range (which may in complex cases contain multiple
617             # discontinuous ranges) after a stall to resume it. This probably could be
618             # done at some stage.
619             $resume_on_stall = 0 if defined $byte_range;
620              
621             ( try_repeat {
622             if( my $pos = $value_len ) {
623             $byte_range = "$pos-";
624             }
625              
626             my $request = $self->_make_request(
627             method => $args{method},
628             bucket => $args{bucket},
629             path => $args{key},
630             headers => {
631             ( defined $if_match ? ( "If-Match" => $if_match ) : () ),
632             ( defined $byte_range ? ( Range => "bytes=$byte_range" ) : () ),
633             },
634             content => "",
635             );
636              
637             $self->_do_request( $request,
638             timeout => $args{timeout},
639             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
640             on_header => sub {
641             my ( $this_header ) = @_;
642             my $code = $this_header->code;
643              
644             if( $head_future->is_cancelled or $code !~ m/^2/ ) {
645             # Just eat the body on cancellation or if it's not a 2xx
646             # For failures this will cause ->on_fail to occur and fail the
647             # $head_future
648             return sub {
649             return if @_;
650             return $this_header;
651             };
652             }
653              
654             my %meta;
655             $this_header->scan( sub {
656             $_[0] =~ m/^X-Amz-Meta-(.*)$/i and $meta{$1} = $_[1];
657             });
658              
659             if( !$value_future ) {
660             # First response
661             $header = $this_header;
662             # If we're going to retry this, ensure we only request this exact ETag
663             $if_match ||= $header->header( "ETag" );
664              
665             $value_future = $head_future->new;
666             $head_future->done( $value_future, $header, \%meta );
667             }
668              
669             return sub {
670             # Code here could be 200 (OK), 206 (Partial Content) or 204 (No Content)
671             return $this_header if $code == 204;
672              
673             if( @_ ) {
674             $value_len += length $_[0];
675             if( $on_chunk ) {
676             $on_chunk->( $header, @_ );
677             }
678             else {
679             $header->add_content( $_[0] );
680             }
681             return;
682             }
683             return $header; # with no body content
684             };
685             }
686             );
687             } while => sub {
688             my ( $prev_f ) = @_;
689             # repeat while it keeps failing with a stall timeout
690             return 0 if !$resume_on_stall or !$prev_f->failure;
691              
692             my $op = ( $prev_f->failure )[1] // "";
693             return 0 if $op ne "stall_timeout";
694              
695             $stall_failure_f ||= $prev_f;
696             return 1;
697             } )->on_done( sub {
698             my ( $response ) = @_;
699             return if $head_future->is_cancelled;
700              
701             $value_future->done( $response->content, ( $head_future->get )[1,2] );
702             })->on_fail( sub {
703             my $f = $value_future || $head_future;
704             # If we have a $stall_failure_f it means we must have attempted a resume
705             # after a stall timeout, then failed for a different reason. Ignore that
706             # second reason and just pretend to the caller that we stalled.
707             $f->fail( $stall_failure_f ? $stall_failure_f->failure : @_ ) if !$f->is_cancelled;
708             });
709              
710             return $head_future;
711             }
712              
713             sub get_object
714             {
715             my $self = shift;
716             my %args = @_;
717              
718             $self->_retry( sub {
719             $self->_head_then_get_object( %args, method => "GET" )
720             ->then( sub { my ( $value_f ) = @_; $value_f }); # wait on the value
721             });
722             }
723              
724             =head2 $s3->head_object( %args ) ==> ( $response, $meta )
725              
726             Requests the value metadata of a key from a bucket. This is similar to the
727             C method, but uses the C HTTP verb instead of C.
728              
729             Takes the same named arguments as C, but will ignore an
730             C callback, if provided.
731              
732             The Future will return the L object and metadata hash
733             reference, without the content string (as no content is returned to a C
734             request).
735              
736             =cut
737              
738             sub head_object
739             {
740             my $self = shift;
741             my %args = @_;
742              
743             $self->_retry( sub {
744             $self->_head_then_get_object( %args, method => "HEAD" )
745             ->then( sub { my ( $value_f ) = @_; $value_f }) # wait on the empty body
746             ->then( sub { shift; Future->wrap( @_ ) }); # remove (empty) value
747             });
748             }
749              
750             =head2 $s3->head_then_get_object( %args ) ==> ( $value_f, $response, $meta )
751              
752             Performs a C operation similar to C, but allows access to the
753             metadata header before the body content is complete.
754              
755             Takes the same named arguments as C.
756              
757             The returned Future completes as soon as the metadata header has been received
758             and yields a second future (the body future), the L and a hash
759             reference containing the metadata fields. The body future will eventually
760             yield the actual body, along with another copy of the response and metadata
761             hash reference.
762              
763             $value_f ==> $value, $response, $meta
764              
765             =cut
766              
767             sub head_then_get_object
768             {
769             my $self = shift;
770             $self->_retry( "_head_then_get_object", @_, method => "GET" );
771             }
772              
773             =head2 $s3->put_object( %args ) ==> ( $etag, $length )
774              
775             Sets a new value for a key in the bucket.
776              
777             Takes the following named arguments:
778              
779             =over 8
780              
781             =item bucket => STRING
782              
783             The name of the S3 bucket to put the value in
784              
785             =item key => STRING
786              
787             The name of the key to put the value in
788              
789             =item value => STRING
790              
791             =item value => Future giving STRING
792              
793             Optional. If provided, gives a byte string as the new value for the key or a
794             L which will eventually yield such.
795              
796             =item value => CODE
797              
798             =item value_length => INT
799              
800             Alternative form of C, which is a C reference to a generator
801             function. It will be called repeatedly to generate small chunks of content,
802             being passed the position and length it should yield.
803              
804             $chunk = $value->( $pos, $len )
805              
806             Typically this can be provided either by a C operation on a larger
807             string buffer, or a C and C operation on a filehandle.
808              
809             In normal operation the function will just be called in a single sweep in
810             contiguous regions up to the extent given by C. If however, the
811             MD5sum check fails at the end of upload, it will be called again to retry the
812             operation. The function must therefore be prepared to be invoked multiple
813             times over its range.
814              
815             =item value => Future giving ( CODE, INT )
816              
817             Alternative form of C, in which a C eventually yields the value
818             generation C reference and length. The C reference is invoked as
819             documented above.
820              
821             ( $gen_value, $value_len ) = $value->get;
822              
823             $chunk = $gen_value->( $pos, $len );
824              
825             =item gen_parts => CODE
826              
827             Alternative to C in the case of larger values, and implies the use of
828             multipart upload. Called repeatedly to generate successive parts of the
829             upload. Each time C is called it should return one of the forms of
830             C given above; namely, a byte string, a C reference and size
831             pair, or a C which will eventually yield either of these forms.
832              
833             ( $value ) = $gen_parts->()
834              
835             ( $gen_value, $value_length ) = $gen_parts->()
836              
837             ( $value_f ) = $gen_parts->(); $value = $value_f->get
838             ( $gen_value, $value_length ) = $value_f->get
839              
840             Each case is analogous to the types that the C key can take.
841              
842             =item meta => HASH
843              
844             Optional. If provided, gives additional user metadata fields to set on the
845             object, using the C fields.
846              
847             =item timeout => NUM
848              
849             Optional. For single-part uploads, this sets the C argument to use
850             for the actual C request. For multi-part uploads, this argument is
851             currently ignored.
852              
853             =item meta_timeout => NUM
854              
855             Optional. For multipart uploads, this sets the C argument to use for
856             the initiate and complete requests, overriding a configured C.
857             Ignored for single-part uploads.
858              
859             =item part_timeout => NUM
860              
861             Optional. For multipart uploads, this sets the C argument to use for
862             the individual part C requests. Ignored for single-part uploads.
863              
864             =item on_write => CODE
865              
866             Optional. If provided, this code will be invoked after each successful
867             C call on the underlying filehandle when writing actual file
868             content, indicating that the data was at least written as far as the
869             kernel. It will be passed the total byte length that has been written for this
870             call to C. By the time the call has completed, this will be the
871             total written length of the object.
872              
873             $on_write->( $bytes_written )
874              
875             Note that because of retries it is possible this count will decrease, if a
876             part has to be retried due to e.g. a failing MD5 checksum.
877              
878             =item concurrent => INT
879              
880             Optional. If present, gives the number of parts to upload concurrently. If
881             absent, a default of 1 will apply (i.e. no concurrency).
882              
883             =back
884              
885             The Future will return a string containing the S3 ETag of the newly-set key,
886             and the length of the value in bytes.
887              
888             For single-part uploads the ETag will be the MD5 sum in hex, surrounded by
889             quote marks. For multi-part uploads this is a string in a different form,
890             though details of its generation are not specified by S3.
891              
892             The returned MD5 sum from S3 during upload will be checked against an
893             internally-generated MD5 sum of the content that was sent, and an error result
894             will be returned if these do not match.
895              
896             =cut
897              
898             sub _md5sum_wrap
899             {
900             my $content = shift;
901             my @args = my ( $md5ctx, $posref, $content_length, $read_size ) = @_;
902              
903             if( !defined $content or !ref $content ) {
904             my $len = $content_length - $$posref;
905             if( defined $content ) {
906             substr( $content, $len ) = "" if length $content > $len;
907             }
908             else {
909             $content = "\0" x $len;
910             }
911              
912             $md5ctx->add( $content );
913             $$posref += length $content;
914              
915             return $content;
916             }
917             elsif( ref $content eq "CODE" ) {
918             return sub {
919             return undef if $$posref >= $content_length;
920              
921             my $len = $content_length - $$posref;
922             $len = $read_size if $len > $read_size;
923              
924             my $chunk = $content->( $$posref, $len );
925             return _md5sum_wrap( $chunk, @args );
926             }
927             }
928             elsif( blessed $content and $content->isa( "Future" ) ) {
929             return $content->transform(
930             done => sub {
931             my ( $chunk ) = @_;
932             return _md5sum_wrap( $chunk, @args );
933             },
934             );
935             }
936             else {
937             die "TODO: md5sum wrap ref $content";
938             }
939             }
940              
941             sub _put_object
942             {
943             my $self = shift;
944             my %args = @_;
945              
946             my $md5ctx = Digest::MD5->new;
947             my $on_write = $args{on_write};
948             my $pos = 0;
949              
950             # Make $content definitely a Future
951             Future->wrap( delete $args{content} )->then( sub {
952             my ( $content ) = @_;
953             my $content_length = @_ > 1 ? $_[1] :
954             ref $content ? $args{content_length} :
955             length $content;
956             defined $content_length or die "TODO: referential content $content needs length";
957              
958             my $request = $self->_make_request(
959             %args,
960             method => "PUT",
961             content => "", # Doesn't matter, it'll be ignored
962             );
963              
964             $request->content_length( $content_length );
965             $request->content( "" );
966              
967             $self->_do_request( $request,
968             timeout => $args{timeout},
969             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
970             expect_continue => 1,
971             request_body => _md5sum_wrap( $content, $md5ctx, \$pos, $content_length, $self->{read_size} ),
972             ( $on_write ? ( on_body_write => $on_write ) : () ),
973             );
974             })->then( sub {
975             my $resp = shift;
976              
977             defined( my $etag = $resp->header( "ETag" ) ) or
978             return Future->new->die( "Response did not provide an ETag header" );
979              
980             # Amazon S3 currently documents that the returned ETag header will be
981             # the MD5 hash of the content, surrounded in quote marks. We'd better
982             # hope this continues to be true... :/
983             my ( $got_md5 ) = lc($etag) =~ m/^"([0-9a-f]{32})"$/ or
984             return Future->new->die( "Returned ETag ($etag) does not look like an MD5 sum", $resp );
985              
986             my $expect_md5 = lc($md5ctx->hexdigest);
987              
988             if( $got_md5 ne $expect_md5 ) {
989             return Future->new->die( "Returned MD5 hash ($got_md5) did not match expected ($expect_md5)", $resp );
990             }
991              
992             return Future->wrap( $etag, $pos );
993             });
994             }
995              
996             sub _initiate_multipart_upload
997             {
998             my $self = shift;
999             my %args = @_;
1000              
1001             my $req = $self->_make_request(
1002             method => "POST",
1003             bucket => $args{bucket},
1004             path => "$args{key}?uploads",
1005             content => "",
1006             meta => $args{meta},
1007             );
1008              
1009             $self->_do_request_xpc( $req,
1010             timeout => $args{timeout},
1011             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
1012             )->then( sub {
1013             my $xpc = shift;
1014              
1015             my $id = $xpc->findvalue( ".//s3:InitiateMultipartUploadResult/s3:UploadId" );
1016             return Future->wrap( $id );
1017             });
1018             }
1019              
1020             sub _complete_multipart_upload
1021             {
1022             my $self = shift;
1023             my %args = @_;
1024              
1025             my $req = $self->_make_request(
1026             method => "POST",
1027             bucket => $args{bucket},
1028             path => $args{key},
1029             content => $args{content},
1030             query_params => {
1031             uploadId => $args{id},
1032             },
1033             headers => {
1034             "Content-Type" => "application/xml",
1035             "Content-MD5" => encode_base64( md5( $args{content} ), "" ),
1036             },
1037             );
1038              
1039             $self->_do_request_xpc( $req,
1040             timeout => $args{timeout},
1041             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
1042             )->then( sub {
1043             my $xpc = shift;
1044              
1045             my $etag = $xpc->findvalue( ".//s3:CompleteMultipartUploadResult/s3:ETag" );
1046             return Future->wrap( $etag );
1047             });
1048             }
1049              
1050             sub _put_object_multipart
1051             {
1052             my $self = shift;
1053             my ( $gen_parts, %args ) = @_;
1054              
1055             my $on_write = $args{on_write};
1056              
1057             my $id;
1058              
1059             my $written_committed = 0; # bytes written in committed parts
1060             my %written_tentative; # {$part_num} => bytes written so far in this part
1061              
1062             $self->_retry( "_initiate_multipart_upload",
1063             timeout => $args{meta_timeout} // $self->{timeout},
1064             %args
1065             )->then( sub {
1066             ( $id ) = @_;
1067              
1068             my $part_num = 0;
1069             ( fmap1 {
1070             my ( $part_num, $content, %moreargs ) = @{$_[0]};
1071              
1072             $self->_retry( "_put_object",
1073             bucket => $args{bucket},
1074             path => $args{key},
1075             content => $content,
1076             query_params => {
1077             partNumber => $part_num,
1078             uploadId => $id,
1079             },
1080             timeout => $args{part_timeout},
1081             ( $on_write ?
1082             ( on_write => sub {
1083             $written_tentative{$part_num} = $_[0];
1084             $on_write->( $written_committed + sum values %written_tentative );
1085             } ) : () ),
1086             %moreargs,
1087             )->then( sub {
1088             my ( $etag, $len ) = @_;
1089             $written_committed += $len;
1090             delete $written_tentative{$part_num};
1091             return Future->wrap( [ $part_num, $etag ] );
1092             });
1093             } generate => sub {
1094             my @content = $gen_parts->() or return;
1095             $part_num++;
1096             return [ $part_num, $content[0] ] if @content == 1;
1097             return [ $part_num, $content[0], content_length => $content[1] ] if @content == 2 and ref $content[0] eq "CODE";
1098             # It's possible that $content[0] is a 100MiByte string. Best not to
1099             # interpolate that into $@ or we'll risk OOM
1100             die "Unsure what to do with gen_part result " .
1101             join( ", ", map { ref $_ ? $_ : "<".length($_)." bytes>" } @content );
1102             }, concurrent => $args{concurrent} // $self->{put_concurrent} );
1103             })->then( sub {
1104             my @etags = @_;
1105              
1106             my $doc = XML::LibXML::Document->new( "1.0", "UTF-8" );
1107             $doc->addChild( my $root = $doc->createElement( "CompleteMultipartUpload" ) );
1108              
1109             #add content
1110             foreach ( @etags ) {
1111             my ( $part_num, $etag ) = @$_;
1112              
1113             $root->addChild( my $part = $doc->createElement('Part') );
1114             $part->appendTextChild( PartNumber => $part_num );
1115             $part->appendTextChild( ETag => $etag );
1116             }
1117              
1118             $self->_retry( "_complete_multipart_upload",
1119             %args,
1120             id => $id,
1121             content => $doc->toString,
1122             timeout => $args{meta_timeout} // $self->{timeout},
1123             )->then( sub {
1124             my ( $etag ) = @_;
1125             return Future->wrap( $etag, $written_committed );
1126             });
1127             });
1128             }
1129              
1130             sub put_object
1131             {
1132             my $self = shift;
1133             my %args = @_;
1134              
1135             my $gen_parts;
1136             if( $gen_parts = delete $args{gen_parts} ) {
1137             # OK
1138             }
1139             else {
1140             my $content_length = $args{value_length} // length $args{value};
1141              
1142             my $part_size = $self->{part_size};
1143              
1144             if( $content_length > $part_size * 10_000 ) {
1145             croak "Cannot upload content in more than 10,000 parts - consider using a larger part_size";
1146             }
1147             elsif( $content_length > $part_size ) {
1148             $gen_parts = sub {
1149             return unless length $args{value};
1150             return substr( $args{value}, 0, $part_size, "" );
1151             };
1152             }
1153             else {
1154             my @parts = ( [ delete $args{value}, $content_length ] );
1155             $gen_parts = sub { return unless @parts; @{ shift @parts } };
1156             }
1157             }
1158              
1159             my @parts;
1160             push @parts, [ $gen_parts->() ];
1161              
1162             # Ensure first part is a Future then unfuture it
1163             Future->wrap( @{$parts[0]} )->then( sub {
1164             @{$parts[0]} = @_ if @_;
1165              
1166             push @parts, [ $gen_parts->() ];
1167              
1168             if( @{ $parts[1] } ) {
1169             # There are at least two parts; we'd better use multipart upload
1170             $self->_put_object_multipart( sub { @parts ? @{ shift @parts } : goto &$gen_parts }, %args );
1171             }
1172             elsif( @{ $parts[0] } ) {
1173             # There is exactly one part
1174             my ( $content, $content_length ) = @{ shift @parts };
1175             $self->_retry( "_put_object",
1176             bucket => $args{bucket},
1177             path => $args{key},
1178             content => $content,
1179             content_length => $content_length,
1180             meta => $args{meta},
1181             %args,
1182             );
1183             }
1184             else {
1185             # There are no parts at all - create an empty object
1186             $self->_retry( "_put_object",
1187             bucket => $args{bucket},
1188             path => $args{key},
1189             content => "",
1190             meta => $args{meta},
1191             %args,
1192             );
1193             }
1194             });
1195             }
1196              
1197             =head2 $s3->delete_object( %args ) ==> ()
1198              
1199             Deletes a key from the bucket.
1200              
1201             Takes the following named arguments:
1202              
1203             =over 8
1204              
1205             =item bucket => STRING
1206              
1207             The name of the S3 bucket to put the value in
1208              
1209             =item key => STRING
1210              
1211             The name of the key to put the value in
1212              
1213             =back
1214              
1215             The Future will return nothing.
1216              
1217             =cut
1218              
1219             sub _delete_object
1220             {
1221             my $self = shift;
1222             my %args = @_;
1223              
1224             my $request = $self->_make_request(
1225             method => "DELETE",
1226             bucket => $args{bucket},
1227             path => $args{key},
1228             content => "",
1229             );
1230              
1231             $self->_do_request( $request,
1232             timeout => $args{timeout} // $self->{timeout},
1233             stall_timeout => $args{stall_timeout} // $self->{stall_timeout},
1234             )->then( sub {
1235             return Future->new->done;
1236             });
1237             }
1238              
1239             sub delete_object
1240             {
1241             my $self = shift;
1242             $self->_retry( "_delete_object", @_ );
1243             }
1244              
1245             =head1 SPONSORS
1246              
1247             Parts of this code were paid for by
1248              
1249             =over 2
1250              
1251             =item *
1252              
1253             SocialFlow L
1254              
1255             =item *
1256              
1257             Shadowcat Systems L
1258              
1259             =back
1260              
1261             =head1 AUTHOR
1262              
1263             Paul Evans
1264              
1265             =cut
1266              
1267             0x55AA;