File Coverage

blib/lib/WebService/CEPH.pm
Criterion Covered Total %
statement 149 149 100.0
branch 60 66 90.9
condition 22 27 81.4
subroutine 25 25 100.0
pod 10 10 100.0
total 266 277 96.0


line stmt bran cond sub pod time code
1             =encoding utf8
2              
3             =head1 NAME
4              
5             WebService::CEPH
6              
7             =head1 DESCRIPTION
8              
9             CEPH client for simple workflow, supporting multipart uploads.
10              
11             Clint for CEPH, without a low-level code to communicate with the Amazon S3 library (it is placed in a separate class).
12              
13             Lower-level library is responsible for error handling (exceptions and failed requests retries), unless otherwise guaranteed in this documentation.
14              
15             Constructor parameters:
16              
17             Required parameters:
18              
19             protocol - http/https
20              
21             host - backend host
22              
23             key - access key
24              
25             secret - access secret
26              
27             Optional parameters:
28              
29             bucket - name of the bucket (not needed just to get the list of buckets)
30              
31             driver_name - at the moment only 'NetAmazonS3' is supported
32              
33             multipart_threshold - the size threshold in bytes used for multipart uploads
34              
35             multisegment_threshold - the size threshold in bytes used for multisegment downloads
36              
37             query_string_authentication_host_replace - host/protocol on which to replace the URL in query_string_authentication_uri
38             should start with the protocol (http / https), then the host, at the end optional slash.
39             This parameter is needed if you want to change the host for return to clients (you have a cluster) or protocol (https for external clients)
40              
41             =cut
42              
43             package WebService::CEPH;
44              
45             our $VERSION = '0.017'; # VERSION
46              
47 2     2   185739 use strict;
  2         14  
  2         58  
48 2     2   11 use warnings;
  2         3  
  2         48  
49 2     2   10 use Carp;
  2         3  
  2         121  
50 2     2   987 use WebService::CEPH::NetAmazonS3;
  2         7  
  2         84  
51 2     2   15 use Digest::MD5 qw/md5_hex/;
  2         4  
  2         118  
52 2     2   14 use Fcntl qw/:seek/;
  2         5  
  2         280  
53              
54 2     2   22 use constant MINIMAL_MULTIPART_PART => 5*1024*1024;
  2         5  
  2         3781  
55              
56 42 100   42   747 sub _check_ascii_key { confess "Key should be ASCII-only" unless $_[0] !~ /[^\x00-\x7f]/ }
57              
58             =head2 new
59              
60             Constructor. See the parameters above.
61              
62             =cut
63              
64             sub new {
65 11     11 1 54427 my ($class, %args) = @_;
66              
67 11         42 my $self = bless +{}, $class;
68              
69             # mandatory
70             $self->{$_} = delete $args{$_} // confess "Missing $_"
71 11   66     838 for (qw/protocol host key secret/);
72             # optional
73 7         16 for (qw/bucket driver_name multipart_threshold multisegment_threshold query_string_authentication_host_replace/) {
74 35 100       77 if (defined(my $val = delete $args{$_})) {
75 5         12 $self->{$_} = $val;
76             }
77             }
78              
79 7 100       16 confess "Unused arguments: @{[ %args]}" if %args;
  1         168  
80              
81 6   100     29 $self->{driver_name} ||= "NetAmazonS3";
82 6   100     22 $self->{multipart_threshold} ||= MINIMAL_MULTIPART_PART;
83 6   100     21 $self->{multisegment_threshold} ||= MINIMAL_MULTIPART_PART;
84              
85             confess "multipart_threshold should be greater or eq. MINIMAL_MULTIPART_PART (5Mb) (now multipart_threshold=$self->{multipart_threshold}"
86 6 100       185 if $self->{multipart_threshold} < MINIMAL_MULTIPART_PART;
87              
88 5         13 my $driver_class = __PACKAGE__."::".$self->{driver_name}; # should be loaded via "use" at top of file
89 5         10 $self->{driver} = $driver_class->new(map { $_ => $self->{$_} } qw/protocol host key secret bucket/ );
  25         61  
90              
91 5         225 $self;
92             }
93              
94             =head2 upload
95              
96             Uploads the file into CEPH. If the file already exists, it is replaced. If the data is larger than a certain size, multipart upload is started. Returns nothing
97              
98             Parameters:
99              
100             0th - $self
101              
102             1-st - key name
103              
104             2-nd - scalar, key data
105              
106             3-rd - Content-type. If undef, the default binary / octet-stream is used
107              
108             4-th - ACL. If undef, the default private is used
109              
110             =cut
111              
112             sub upload {
113 7     7 1 25580 my ($self, $key) = (shift, shift); # after these params: $_[0] - data, $_[1] - Content-type, $_[2] - ACL
114 7     10   57 $self->_upload($key, sub { substr($_[0], $_[1], $_[2]) }, length($_[0]), md5_hex($_[0]), $_[1], $_[2], $_[0]);
  10         33  
115             }
116              
117             =head2 upload_from_file
118              
119             Same as upload method, but reads from file.
120              
121             Parameters:
122              
123             0th - $self
124              
125             1-st - key name
126              
127             2-nd - file name (if scalar), otherwise opens filehandle
128              
129             3-rd - Content-type. If undef, the default binary / octet-stream is used
130              
131             4-th - ACL. If undef, the default private is used
132              
133             Double walks through the file, calculating md5. The file should not be a pipe, its size should not vary.
134              
135             =cut
136              
137             sub upload_from_file {
138 7     7 1 26739 my ($self, $key, $fh_or_filename, $content_type, $acl ) = @_;
139              
140 7         13 my $fh = do {
141 7 100       18 if (ref $fh_or_filename) {
142 3         9 $fh_or_filename
143             }
144             else {
145 4         148 open my $f, "<", $fh_or_filename;
146 4         21 binmode $f;
147 4         14 $f;
148             }
149             };
150              
151 7         51 my $md5 = Digest::MD5->new;
152 7         133 $md5->addfile($fh);
153 7         67 seek($fh, 0, SEEK_SET);
154              
155             $self->_upload(
156             $key,
157 15   33 15   132 sub { read($_[0], my $data, $_[2]) // confess "Error reading data $!\n"; $data },
  15         113  
158 7         140 -s $fh,
159             $md5->hexdigest,
160             $content_type,
161             $acl,
162             $fh
163             );
164             }
165              
166             =head2 _upload
167              
168             Private method for upload/upload_from_file
169              
170             Parameters
171              
172             1) self
173              
174             2) key
175              
176             3) iterator with interface (data, offset, length). "data" must correspond to the last parameter of this function (ie (6))
177              
178             4) data length
179              
180             5) previously calculated md5 from the data
181              
182             6) Content-type. If undef, the default binary / octet-stream is used
183              
184             7) ACL. If undef, the default private is used
185              
186             8) data. or a scalar. or filehandle
187              
188             =cut
189              
190              
191             sub _upload {
192             # after that $_[0] is data (scalar or filehandle)
193 14     14   59 my ($self, $key, $iterator, $length, $md5_hex, $content_type, $acl) = (shift, shift, shift, shift, shift, shift, shift);
194              
195 14 50       46 confess "Bucket name is required" unless $self->{bucket};
196              
197 14         37 _check_ascii_key($key);
198              
199 13 100       38 if ($length > $self->{multipart_threshold}) {
200 8         32 my $multipart = $self->{driver}->initiate_multipart_upload($key, $md5_hex, $content_type, $acl);
201              
202 8         383 my $len = $length;
203 8         11 my $offset = 0;
204 8         13 my $part = 0;
205 8         22 while ($offset < $len) {
206 20         58 my $chunk = $iterator->($_[0], $offset, $self->{multipart_threshold});
207              
208 20         81 $self->{driver}->upload_part($multipart, ++$part, $chunk);
209              
210 20         14419 $offset += $self->{multipart_threshold};
211             }
212 8         32 $self->{driver}->complete_multipart_upload($multipart);
213             }
214             else {
215 5         18 $self->{driver}->upload_single_request($key, $iterator->($_[0], 0, $length), $content_type, $acl);
216             }
217              
218 13         653 return;
219             }
220              
221             =head2 download
222              
223             Downloads data from an object named $key and returns it. If the object does not exist, it returns undef.
224              
225             If the size of the object is actually greater than multisegment_threshold, the object will be downloaded by several requests with heading Range
226             (ie, multi segment download).
227              
228             At the moment there is a workaround for the bug http://lists.ceph.com/pipermail/ceph-users-ceph.com/2016-June/010704.html,
229             in connection with this, an extra HTTP request is always made - the request for the length of the file. Plus Race condition is not excluded.
230              
231             =cut
232              
233             sub download {
234 14     14 1 39861 my ($self, $key) = @_;
235 14         28 my $data;
236             # workaround for CEPH bug http://lists.ceph.com/pipermail/ceph-users-ceph.com/2016-June/010704.html
237 14         40 my $cephsize = $self->size($key);
238 14 100 66     624 if (defined($cephsize) && $cephsize == 0) {
239 1         6 return '';
240             } else {
241             # / workaround for CEPH bug
242 13 100   14   152 _download($self, $key, sub { $data .= $_[0] }) or return;
  14         26  
243 6         43 return $data;
244             }
245             }
246              
247             =head2 download_to_file
248              
249             Downloads the data of the object with the name $key to the file $fh_or_filename.
250             If the object does not exist, it returns undef
251             (in this case the output file will be corrupted and, possibly, partially written due to the case of race condition - delete this data yourself,
252             or delete it using the download method). Otherwise, returns the size of the written data.
253             The output file is opened in overwrite mode, if this is the file name, if it is a filehandle, it is trimmed to zero length and written from the beginning.
254              
255             If the size of the object is actually greater than multisegment_threshold, the object will be downloaded by several requests with heading Range
256             (ie, multi segment download).
257              
258             At the moment there is a workaround for the bug http://lists.ceph.com/pipermail/ceph-users-ceph.com/2016-June/010704.html,
259             in connection with this, an extra HTTP request is always made - the request for the length of the file. Plus Race condition is not excluded.
260              
261             =cut
262              
263             sub download_to_file {
264 8     8 1 20425 my ($self, $key, $fh_or_filename) = @_;
265              
266 8         13 my $fh = do {
267 8 100       21 if (ref $fh_or_filename) {
268 1         30 seek($fh_or_filename, SEEK_SET, 0);
269 1         42 truncate($fh_or_filename, 0);
270 1         5 $fh_or_filename
271             }
272             else {
273 7         573 open my $f, ">", $fh_or_filename;
274 7         34 binmode $f;
275 7         23 $f;
276             }
277             };
278              
279             # workaround for CEPH bug http://lists.ceph.com/pipermail/ceph-users-ceph.com/2016-June/010704.html
280 8         43 my $cephsize = $self->size($key);
281 8 100 66     374 if (defined($cephsize) && $cephsize == 0) {
282 1         19 return 0;
283             }
284             else {
285             # / workaround for CEPH bug
286 7         13 my $size = 0;
287             _download($self, $key, sub {
288 10     10   13 $size += length($_[0]);
289 10 50       110 print $fh $_[0] or confess "Error writing to file $!"
290 7 100       38 }) or return;
291 6         418 return $size;
292             }
293             }
294              
295             =head2 _download
296              
297             Private method for download/download_to_file
298              
299             Parameters:
300              
301             1) self
302              
303             2) key name
304              
305             3) appender - the closure in which the data for recording will be transmitted. it should accumulate them somewhere to itself or write to a file that it knows.
306              
307             =cut
308              
309             sub _download {
310 20     20   42 my ($self, $key, $appender) = @_;
311              
312 20 50       146 confess "Bucket name is required" unless $self->{bucket};
313              
314 20         54 _check_ascii_key($key);
315              
316 19         31 my $offset = 0;
317 19         28 my $check_md5 = undef;
318 19         92 my $md5 = Digest::MD5->new;
319 19         36 my $got_etag = undef;
320 19         29 while() {
321 29         102 my ($dataref, $bytesleft, $etag, $custom_md5) = $self->{driver}->download_with_range($key, $offset, $offset + $self->{multisegment_threshold});
322              
323             # Return undef if object not found
324             # If object suddently disappeared during multisegment download, it means someone deleted it. So we have to return undef it this case.
325             # However, when downloading to file, some data could have been already written to file. Remove this file by yourself.
326 29 100       25067 return unless ($dataref);
327              
328 26 100       63 if (defined $got_etag) {
329             # Someone replaced file during download process. According to HTTP, ETag must be different for different files
330             #(though it does not have to be the same for same files).
331             # Throw an exception in this case...
332             # TODO: retry requests if file changed during download
333 9 100       123 confess "File changed during download. Race condition. Please retry request"
334             unless $got_etag eq $etag;
335             }
336             else {
337 17         31 $got_etag = $etag;
338             }
339              
340             # Check md5 if ETag was normal with md5 (it was not a multipart upload)
341 25 100       52 if (!defined $check_md5) {
342 17         97 my ($etag_md5) = $etag =~ /^([0-9a-f]+)$/;
343              
344 17 100 100     198 confess "ETag looks like valid md5 and x-amz-meta-md5 presents but they do not match"
      100        
345             if ($etag_md5 && $custom_md5 && $etag_md5 ne $custom_md5);
346 16 100       45 if ($etag_md5) {
    100          
347 13         27 $check_md5 = $etag_md5;
348             } elsif ($custom_md5) {
349 1         3 $check_md5 = $custom_md5;
350             } else {
351 2         6 $check_md5 = 0;
352             }
353             }
354 24 100       50 if ($check_md5) {
355 22         86 $md5->add($$dataref);
356             }
357              
358 24         39 $offset += length($$dataref);
359 24         58 $appender->($$dataref);
360 24 100       67 last unless $bytesleft;
361             };
362 14 100       27 if ($check_md5) {
363 13         51 my $got_md5 = $md5->hexdigest;
364 13 100       313 confess "MD5 missmatch, got $got_md5, expected $check_md5" unless $got_md5 eq $check_md5;
365             }
366 12         56 1;
367             }
368              
369             =head2 size
370              
371             Returns the size of the object named $key in bytes, if the key does not exist, returns undef
372              
373             =cut
374              
375             sub size {
376 2     2   5043 my ($self, $key) = @_;
377              
378 2 50       10 confess "Bucket name is required" unless $self->{bucket};
379              
380 2         7 _check_ascii_key($key);
381              
382 1         4 $self->{driver}->size($key);
383             }
384              
385             =head2 delete
386              
387             Removes an object named $key, returns nothing. If the object does not exist, it does not produce an error
388              
389             =cut
390              
391             sub delete {
392 2     2 1 3557 my ($self, $key) = @_;
393              
394 2 50       9 confess "Bucket name is required" unless $self->{bucket};
395              
396 2         6 _check_ascii_key($key);
397              
398 1         5 $self->{driver}->delete($key);
399             }
400              
401             =head2 query_string_authentication_uri
402              
403             Returns Query String Authentication URL for key $key, with expire $expires
404              
405             $expires - epoch time. But a low-level library can accept other formats. make sure that the input data is validated and you transfer exactly epoch
406              
407             Replaces the host if there is a query_string_authentication_host_replace option (see the constructor)
408              
409             =cut
410              
411             sub query_string_authentication_uri {
412 4     4 1 6675 my ($self, $key, $expires) = @_;
413              
414 4         12 _check_ascii_key($key);
415 4 50       12 $expires or confess "Missing expires";
416              
417 4         16 my $uri = $self->{driver}->query_string_authentication_uri($key, $expires);
418 4 100       158 if ($self->{query_string_authentication_host_replace}) {
419 3         6 my $replace = $self->{query_string_authentication_host_replace};
420 3 100       15 $replace .= '/' unless $replace =~ m!/$!;
421 3         19 $uri =~ s!^https?://[^/]+/!$replace!;
422             }
423 4         19 $uri;
424             }
425              
426             =head2 get_buckets_list
427              
428             Returns buckets list
429              
430             WARNING
431              
432             The method throws error "Attribute (owner_id) does not pass the type constraint because:
433             Validation failed for 'OwnerId'"
434              
435             Notifications sent to the developers:
436             http://tracker.ceph.com/issues/16806
437             https://github.com/rustyconover/net-amazon-s3/issues/18
438              
439             =cut
440              
441             sub get_buckets_list {
442 1     1 1 2175 my ($self) = @_;
443              
444 1         4 return $self->{driver}->get_buckets_list;
445             }
446              
447             =head2 list_multipart_uploads
448              
449             Returns the list of multipart downloads in a bucket
450              
451             =cut
452              
453             sub list_multipart_uploads {
454 2     2 1 3819 my ($self) = @_;
455              
456 2 100       107 confess "Bucket name is required" unless $self->{bucket};
457              
458 1         5 return $self->{driver}->list_multipart_uploads();
459             }
460              
461             =head2 delete_multipart_upload
462              
463             Deletes multipart download in bucket
464              
465             Positional parameters: $key, $upload_id
466              
467             Returns nothing
468              
469             =cut
470              
471             sub delete_multipart_upload {
472 4     4 1 8642 my ( $self, $key, $upload_id ) = @_;
473              
474 4 100       113 confess "Bucket name is required" unless $self->{bucket};
475 3 100 100     208 confess "key and upload ID is required" unless $key && $upload_id;
476              
477 1         4 $self->{driver}->delete_multipart_upload($key, $upload_id);
478             }
479              
480             1;