File Coverage

blib/lib/WebService/CEPH.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             =encoding utf8
2              
3             =head1 NAME
4              
5             WebService::CEPH
6              
7             =head1 DESCRIPTION
8              
9             CEPH client for simple workflow, supporting multipart uploads. Most docs are in Russian.
10              
11             Клинт для CEPH, без низкоуровневого кода для общения с библиотекой Amazon S3
12             (она вынесена в отдельный класс).
13              
14             Обработка ошибок (исключения их тип итп; повторы неудачных запросов) - на совести более низкоуровневой библиотеки,
15             если иное не гарантируется в этой документации.
16              
17             Параметры конструктора:
18              
19             Обязательные параметры:
20              
21             protocol - http/https
22              
23             host - хост бэкэнда
24              
25             key - ключ для входа
26              
27             secret - secret для входа
28              
29             Необязательные параметры:
30              
31             bucket - имя бакета (не нужен только для получения списка бакетов)
32              
33             driver_name - в данный момент только 'NetAmazonS3'
34              
35             multipart_threshold - после какого размера файла (в байтах) начинать multipart upload
36              
37             multisegment_threshold - после какого размера файла (в байтах) будет multisegment download
38              
39             query_string_authentication_host_replace - протокол-хост на который заменять URL в query_string_authentication_uri
40             должен начинаться с протокола (http/https), затем хост, на конце может быть, а может не быть слэша.
41             нужен если вы хотите сменить хост для отдачи клиентам (у вас кластер) или протокол (https внешним клиентам)
42              
43             =cut
44              
45             package WebService::CEPH;
46              
47             our $VERSION = '0.015'; # VERSION
48              
49 2     2   114044 use strict;
  2         10  
  2         52  
50 2     2   8 use warnings;
  2         4  
  2         40  
51 2     2   8 use Carp;
  2         3  
  2         90  
52 2     2   538 use WebService::CEPH::NetAmazonS3;
  0            
  0            
53             use Digest::MD5 qw/md5_hex/;
54             use Fcntl qw/:seek/;
55              
56             use constant MINIMAL_MULTIPART_PART => 5*1024*1024;
57              
58             sub _check_ascii_key { confess "Key should be ASCII-only" unless $_[0] !~ /[^\x00-\x7f]/ }
59              
60             =head2 new
61              
62             Конструктор. Параметры см. выше.
63              
64             =cut
65              
66             sub new {
67             my ($class, %args) = @_;
68              
69             my $self = bless +{}, $class;
70              
71             # mandatory
72             $self->{$_} = delete $args{$_} // confess "Missing $_"
73             for (qw/protocol host key secret/);
74             # optional
75             for (qw/bucket driver_name multipart_threshold multisegment_threshold query_string_authentication_host_replace/) {
76             if (defined(my $val = delete $args{$_})) {
77             $self->{$_} = $val;
78             }
79             }
80              
81             confess "Unused arguments: @{[ %args]}" if %args;
82              
83             $self->{driver_name} ||= "NetAmazonS3";
84             $self->{multipart_threshold} ||= MINIMAL_MULTIPART_PART;
85             $self->{multisegment_threshold} ||= MINIMAL_MULTIPART_PART;
86              
87             confess "multipart_threshold should be greater or eq. MINIMAL_MULTIPART_PART (5Mb) (now multipart_threshold=$self->{multipart_threshold}"
88             if $self->{multipart_threshold} < MINIMAL_MULTIPART_PART;
89              
90             my $driver_class = __PACKAGE__."::".$self->{driver_name}; # should be loaded via "use" at top of file
91             $self->{driver} = $driver_class->new(map { $_ => $self->{$_} } qw/protocol host key secret bucket/ );
92              
93             $self;
94             }
95              
96              
97              
98              
99             =head2 upload
100              
101             Загружает файл в CEPH. Если файл уже существует - он заменяется.
102             Если данные больше определённого размера, происходим multipart upload
103             Ничего не возвращает
104              
105             Параметры:
106              
107             0-й - $self
108              
109             1-й - имя ключа
110              
111             2-й - скаляр, данные ключа
112              
113             3-й - Content-type. Если undef, используется дефолтный binary/octet-stream
114              
115             =cut
116              
117             sub upload {
118             my ($self, $key) = (shift, shift); # после этого $_[0] - данные, $_[1] - Content-type
119             $self->_upload($key, sub { substr($_[0], $_[1], $_[2]) }, length($_[0]), md5_hex($_[0]), $_[1], $_[0]);
120             }
121              
122             =head2 upload_from_file
123              
124             То же, что upload, но происходит чтение из файла.
125              
126             Параметры:
127              
128             0-й - $self
129              
130             1-й - имя ключа
131              
132             2-й - имя файла (если скаляр), иначе открытый filehandle
133              
134             3-й - Content-type. Если undef, используется дефолтный binary/octet-stream
135              
136             Дваждый проходит по файлу, высчитывая md5. Файл не должен быть пайпом, его размер не должен меняться.
137              
138             =cut
139              
140             sub upload_from_file {
141             my ($self, $key, $fh_or_filename, $content_type) = @_;
142             my $fh = do {
143             if (ref $fh_or_filename) {
144             $fh_or_filename
145             }
146             else {
147             open my $f, "<", $fh_or_filename;
148             binmode $f;
149             $f;
150             }
151             };
152              
153             my $md5 = Digest::MD5->new;
154             $md5->addfile($fh);
155             seek($fh, 0, SEEK_SET);
156              
157             $self->_upload(
158             $key,
159             sub { read($_[0], my $data, $_[2]) // confess "Error reading data $!\n"; $data },
160             -s $fh, $md5->hexdigest, $content_type, $fh
161             );
162             }
163              
164             =head2 _upload
165              
166             Приватный метод для upload/upload_from_file
167              
168             Параметры
169              
170             1) self
171              
172             2) ключ
173              
174             3) итератор с интерфейсом (данные, оффсет, длина). "данные" должны соответствовать последнему
175             параметру этой функции (Ñ‚.е. (6))
176              
177             4) длина данных
178              
179             5) заранее высчитанный md5 от данных
180              
181             6) Content-type. Если undef, используется дефолтный binary/octet-stream
182              
183             7) данные. или скаляр. или filehandle
184              
185             =cut
186              
187              
188             sub _upload {
189             # after that $_[0] is data (scalar or filehandle)
190             my ($self, $key, $iterator, $length, $md5_hex, $content_type) = (shift, shift, shift, shift, shift, shift);
191              
192             confess "Bucket name is required" unless $self->{bucket};
193              
194             _check_ascii_key($key);
195              
196             if ($length > $self->{multipart_threshold}) {
197             my $multipart = $self->{driver}->initiate_multipart_upload($key, $md5_hex, $content_type);
198              
199             my $len = $length;
200             my $offset = 0;
201             my $part = 0;
202             while ($offset < $len) {
203             my $chunk = $iterator->($_[0], $offset, $self->{multipart_threshold});
204              
205             $self->{driver}->upload_part($multipart, ++$part, $chunk);
206              
207             $offset += $self->{multipart_threshold};
208             }
209             $self->{driver}->complete_multipart_upload($multipart);
210             }
211             else {
212             $self->{driver}->upload_single_request($key, $iterator->($_[0], 0, $length), $content_type);
213             }
214              
215             return;
216             }
217              
218             =head2 download
219              
220             Скачивает данные объекта с именем $key и возвращает их.
221             Если объект не существует, возвращает undef.
222              
223             Если размер объекта по факту окажется больше multisegment_threshold,
224             объект будет скачан несколькими запросами с заголовком Range (Ñ‚.е. multi segment download).
225              
226             Ð’ данный момент есть workaround для бага http://lists.ceph.com/pipermail/ceph-users-ceph.com/2016-June/010704.html,
227             в связи с ним всегда делается лишний HTTP запрос - запрос длины файла. Плюс не исключён Race condition.
228              
229             =cut
230              
231             sub download {
232             my ($self, $key) = @_;
233             my $data;
234             # workaround for CEPH bug http://lists.ceph.com/pipermail/ceph-users-ceph.com/2016-June/010704.html
235             my $cephsize = $self->size($key);
236             if (defined($cephsize) && $cephsize == 0) {
237             return '';
238             } else {
239             # / workaround for CEPH bug
240             _download($self, $key, sub { $data .= $_[0] }) or return;
241             return $data;
242             }
243             }
244              
245             =head2 download_to_file
246              
247             Скачивает данные объекта с именем $key в файл $fh_or_filename.
248             Если объект не существует, возвращает undef (при этом выходной файл всё равно будет испорчен и, возможно,
249             частично записан в случае race condition - удаляйте эти данные сами; если удалять тяжело - пользуйтесь
250             методом download)
251             Иначе возвращает размер записанных данных.
252              
253             Выходной файл открывается в режиме перезаписи, если это имя файла, если это filehandle,
254             то образается на нулевую длину и пишется с начала.
255              
256             Если размер объекта по факту окажется больше multisegment_threshold,
257             объект будет скачан несколькими запросами с заголовком Range (Ñ‚.е. multi segment download).
258              
259             Ð’ данный момент есть workaround для бага http://lists.ceph.com/pipermail/ceph-users-ceph.com/2016-June/010704.html,
260             в связи с ним всегда делается лишний HTTP запрос - запрос длины файла. Плюс не исключён Race condition.
261              
262             =cut
263              
264             sub download_to_file {
265             my ($self, $key, $fh_or_filename) = @_;
266              
267             my $fh = do {
268             if (ref $fh_or_filename) {
269             seek($fh_or_filename, SEEK_SET, 0);
270             truncate($fh_or_filename, 0);
271             $fh_or_filename
272             }
273             else {
274             open my $f, ">", $fh_or_filename;
275             binmode $f;
276             $f;
277             }
278             };
279              
280             # workaround for CEPH bug http://lists.ceph.com/pipermail/ceph-users-ceph.com/2016-June/010704.html
281             my $cephsize = $self->size($key);
282             if (defined($cephsize) && $cephsize == 0) {
283             return 0;
284             }
285             else {
286             # / workaround for CEPH bug
287             my $size = 0;
288             _download($self, $key, sub {
289             $size += length($_[0]);
290             print $fh $_[0] or confess "Error writing to file $!"
291             }) or return;
292             return $size;
293             }
294             }
295              
296             =head2 _download
297              
298             Приватный метод для download/download_to_file
299              
300             Параметры:
301              
302             1) self
303              
304             2) имя ключа
305              
306             3) appender - замыкание в которое будут передаваться данные для записи. оно должно аккумулировать их куда-то
307             себе или писать в файл, который оно само знает.
308              
309             =cut
310              
311             sub _download {
312             my ($self, $key, $appender) = @_;
313              
314             confess "Bucket name is required" unless $self->{bucket};
315              
316             _check_ascii_key($key);
317              
318             my $offset = 0;
319             my $check_md5 = undef;
320             my $md5 = Digest::MD5->new;
321             my $got_etag = undef;
322             while() {
323             my ($dataref, $bytesleft, $etag, $custom_md5) = $self->{driver}->download_with_range($key, $offset, $offset + $self->{multisegment_threshold});
324              
325             # Если объект не найден - возвращаем undef
326             # даже если при мультисегментном скачивании объект неожиданно исчез на каком-то сегменте, значит
327             # его кто-то удалил, нужно всё же вернуть undef
328             # При этом, при скачивании в файл, часть данных может быть уже записана. Удаляйте их сами.
329             return unless ($dataref);
330              
331             if (defined $got_etag) {
332             # Во время скачивания, кто-то подменил файл (ETag изменился), Ð’ соотв. с HTTP, ETag гарантированно
333             # будет разным для разных файлов (но не факт что одинаковым для одинаковых)
334             # Ð’ этом случае падаем.. Наверное можно в будущем делать retry запросов..
335             confess "File changed during download. Race condition. Please retry request"
336             unless $got_etag eq $etag;
337             }
338             else {
339             $got_etag = $etag;
340             }
341              
342             # Проверяем md5 только если ETag "нормальный" с md5 (был не multipart upload)
343             if (!defined $check_md5) {
344             my ($etag_md5) = $etag =~ /^([0-9a-f]+)$/;
345              
346             confess "ETag looks like valid md5 and x-amz-meta-md5 presents but they do not match"
347             if ($etag_md5 && $custom_md5 && $etag_md5 ne $custom_md5);
348             if ($etag_md5) {
349             $check_md5 = $etag_md5;
350             } elsif ($custom_md5) {
351             $check_md5 = $custom_md5;
352             } else {
353             $check_md5 = 0;
354             }
355             }
356             if ($check_md5) {
357             $md5->add($$dataref);
358             }
359              
360             $offset += length($$dataref);
361             $appender->($$dataref);
362             last unless $bytesleft;
363             };
364             if ($check_md5) {
365             my $got_md5 = $md5->hexdigest;
366             confess "MD5 missmatch, got $got_md5, expected $check_md5" unless $got_md5 eq $check_md5;
367             }
368             1;
369             }
370              
371             =head2 size
372              
373             Возвращает размер объекта с именем $key в байтах,
374             если ключ не существует, возвращает undef
375              
376             =cut
377              
378             sub size {
379             my ($self, $key) = @_;
380              
381             confess "Bucket name is required" unless $self->{bucket};
382              
383             _check_ascii_key($key);
384              
385             $self->{driver}->size($key);
386             }
387              
388             =head2 delete
389              
390             Удаляет объект с именем $key, ничего не возвращает. Если объект
391             не существует, не выдаёт ошибку
392              
393             =cut
394              
395             sub delete {
396             my ($self, $key) = @_;
397              
398             confess "Bucket name is required" unless $self->{bucket};
399              
400             _check_ascii_key($key);
401              
402             $self->{driver}->delete($key);
403             }
404              
405             =head2 query_string_authentication_uri
406              
407             Возвращает Query String Authentication URL для ключа $key, с экспайром $expires
408              
409             $expires - epoch время. но низкоуровневая библиотека может принимать другие форматы. убедитесь
410             что входные данные валидированы и вы передаёте именно epoch
411              
412             Заменяет хост, если есть опция query_string_authentication_host_replace (см. конструктор)
413              
414             =cut
415              
416             sub query_string_authentication_uri {
417             my ($self, $key, $expires) = @_;
418              
419             _check_ascii_key($key);
420             $expires or confess "Missing expires";
421              
422             my $uri = $self->{driver}->query_string_authentication_uri($key, $expires);
423             if ($self->{query_string_authentication_host_replace}) {
424             my $replace = $self->{query_string_authentication_host_replace};
425             $replace .= '/' unless $replace =~ m!/$!;
426             $uri =~ s!^https?://[^/]+/!$replace!;
427             }
428             $uri;
429             }
430              
431             =head2 get_buckets_list
432              
433             Returns buckets list
434              
435             WARNING
436              
437             Метод падает c ошибкой
438             Attribute (owner_id) does not pass the type constraint because: Validation failed for 'OwnerId'
439             Уведомления направлены разрабтчикам:
440             http://tracker.ceph.com/issues/16806 и https://github.com/rustyconover/net-amazon-s3/issues/18
441              
442             =cut
443              
444             sub get_buckets_list {
445             my ($self) = @_;
446              
447             return $self->{driver}->get_buckets_list;
448             }
449              
450             =head2 list_multipart_uploads
451              
452             Возвращает список multipart загрузок в бакете
453              
454             =cut
455              
456             sub list_multipart_uploads {
457             my ($self) = @_;
458              
459             confess "Bucket name is required" unless $self->{bucket};
460              
461             return $self->{driver}->list_multipart_uploads();
462             }
463              
464             =head2 delete_multipart_upload
465              
466             Удаляет multipart загрузку в бакете
467              
468             Параметры позиционные: $key, $upload_id
469              
470             Ничего не возвращает
471              
472             =cut
473              
474             sub delete_multipart_upload {
475             my ( $self, $key, $upload_id ) = @_;
476              
477             confess "Bucket name is required" unless $self->{bucket};
478             confess "key and upload ID is required" unless $key && $upload_id;
479              
480             $self->{driver}->delete_multipart_upload($key, $upload_id);
481             }
482              
483             1;