File Coverage

blib/lib/MongoDB/GridFSBucket/UploadStream.pm
Criterion Covered Total %
statement 54 136 39.7
branch 0 40 0.0
condition 0 11 0.0
subroutine 18 34 52.9
pod 7 8 87.5
total 79 229 34.5


line stmt bran cond sub pod time code
1             # Copyright 2015 - present MongoDB, Inc.
2             #
3             # Licensed under the Apache License, Version 2.0 (the "License");
4             # you may not use this file except in compliance with the License.
5             # You may obtain a copy of the License at
6             #
7             # http://www.apache.org/licenses/LICENSE-2.0
8             #
9             # Unless required by applicable law or agreed to in writing, software
10             # distributed under the License is distributed on an "AS IS" BASIS,
11             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12             # See the License for the specific language governing permissions and
13             # limitations under the License.
14              
15 59     59   426 use strict;
  59         143  
  59         1844  
16 59     59   324 use warnings;
  59         2414  
  59         2133  
17             package MongoDB::GridFSBucket::UploadStream;
18              
19             # ABSTRACT: File handle abstraction for uploading
20              
21 59     59   325 use version;
  59         133  
  59         373  
22             our $VERSION = 'v2.2.1';
23              
24 59     59   4507 use Moo;
  59         134  
  59         2477  
25 59     59   20247 use BSON::Bytes;
  59         163  
  59         4051  
26 59     59   343 use BSON::OID;
  59         148  
  59         1496  
27 59     59   363 use BSON::Time;
  59         162  
  59         1602  
28 59     59   367 use Encode;
  59         153  
  59         5773  
29 59     59   430 use MongoDB::Error;
  59         162  
  59         6849  
30 59     59   430 use Time::HiRes qw/time/;
  59         2125  
  59         2682  
31 59         2626 use Types::Standard qw(
32             Str
33             Maybe
34             HashRef
35             ArrayRef
36             InstanceOf
37 59     59   8366 );
  59         170  
38 59         391 use MongoDB::_Types qw(
39             Boolish
40             NonNegNum
41 59     59   77748 );
  59         2018  
42 59     59   66580 use MongoDB::_Constants;
  59         143  
  59         6766  
43 59     59   390 use Digest::MD5;
  59         134  
  59         3818  
44 59     59   376 use bytes;
  59         129  
  59         471  
45 59     59   3866 use namespace::clean -except => 'meta';
  59         133  
  59         425  
46              
47             #pod =attr chunk_size_bytes
48             #pod
49             #pod The number of bytes per chunk. Defaults to the C of the
50             #pod originating bucket object.
51             #pod
52             #pod This will be stored in the C field of the file document on
53             #pod a successful upload.
54             #pod
55             #pod =cut
56              
57             has chunk_size_bytes => (
58             is => 'ro',
59             isa => NonNegNum,
60             default => 255 * 1024,
61             );
62              
63             #pod =attr filename
64             #pod
65             #pod The filename to store the file under. Note that filenames are NOT necessarily unique.
66             #pod
67             #pod This will be stored in the C field of the file document on
68             #pod a successful upload.
69             #pod
70             #pod =cut
71              
72             has filename => (
73             is => 'ro',
74             isa => Str,
75             );
76              
77             #pod =attr metadata
78             #pod
79             #pod An optional hashref for storing arbitrary metadata about the file.
80             #pod
81             #pod If defined, this will be stored in the C field of the file
82             #pod document on a successful upload.
83             #pod
84             #pod =cut
85              
86             has metadata => (
87             is => 'ro',
88             isa => Maybe [HashRef],
89             );
90              
91             #pod =attr content_type (DEPRECATED)
92             #pod
93             #pod An optional MIME type. This field should only be used for backwards
94             #pod compatibility with older GridFS implementations. New applications should
95             #pod store the content type in the metadata hash if needed.
96             #pod
97             #pod If defined, this will be stored in the C field of the file
98             #pod document on a successful upload.
99             #pod
100             #pod =cut
101              
102             has content_type => (
103             is => 'ro',
104             isa => Str,
105             );
106              
107             #pod =attr aliases (DEPRECATED)
108             #pod
109             #pod An optional array of aliases. This field should only be used for backwards
110             #pod compatibility with older GridFS implementations. New applications should
111             #pod store aliases in the metadata hash if needed.
112             #pod
113             #pod If defined, this will be stored in the C field of the file
114             #pod document on a successful upload.
115             #pod
116             #pod =cut
117              
118             has aliases => (
119             is => 'ro',
120             isa => ArrayRef [Str],
121             );
122              
123             has _bucket => (
124             is => 'ro',
125             isa => InstanceOf ['MongoDB::GridFSBucket'],
126             required => 1,
127             );
128              
129             #pod =method id
130             #pod
131             #pod $id = $stream->id;
132             #pod
133             #pod The id of the file created by the stream. It will be stored in the C<_id>
134             #pod field of the file document on a successful upload. Some upload methods
135             #pod require specifying an id at upload time. Defaults to a newly-generated
136             #pod L or BSON codec specific equivalent.
137             #pod
138             #pod =cut
139              
140             has id => (
141             is => 'lazy',
142             );
143              
144             sub _build_id {
145 0     0     my $self = shift;
146 0           my $creator = $self->_bucket->bson_codec->can("create_oid");
147 0 0         return $creator ? $creator->() : BSON::OID->new();
148             }
149              
150             has _closed => (
151             is => 'rwp',
152             isa => Boolish,
153             default => 0,
154             );
155              
156             has _buffer => (
157             is => 'rwp',
158             isa => Str,
159             default => '',
160             );
161              
162             has _length => (
163             is => 'rwp',
164             isa => NonNegNum,
165             default => 0,
166             );
167              
168             has _md5 => (
169             is => 'lazy',
170             isa => InstanceOf ['Digest::MD5'],
171             );
172              
173             sub _build__md5 {
174 0     0     return Digest::MD5->new;
175             }
176              
177             has _chunk_buffer_length => (
178             is => 'lazy',
179             isa => NonNegNum,
180             );
181              
182             sub _build__chunk_buffer_length {
183 0     0     my ($self) = @_;
184 0           my $docsize = $self->chunk_size_bytes + 36;
185 0           return MAX_GRIDFS_BATCH_SIZE - $docsize;
186             }
187              
188             has _current_chunk_n => (
189             is => 'rwp',
190             isa => NonNegNum,
191             default => 0,
192             );
193              
194             #pod =method fh
195             #pod
196             #pod my $fh = $stream->fh;
197             #pod print $fh, 'test data...';
198             #pod close $fh
199             #pod
200             #pod Returns a new file handle tied to this instance of UploadStream that can be
201             #pod operated on with the built-in functions C, C, C,
202             #pod C and C.
203             #pod
204             #pod B:
205             #pod
206             #pod Allowing one of these tied filehandles to fall out of scope will NOT cause
207             #pod close to be called. This is due to the way tied file handles are
208             #pod implemented in Perl. For close to be called implicitly, all tied
209             #pod filehandles and the original object must go out of scope.
210             #pod
211             #pod Each file handle retrieved this way is tied back to the same object, so
212             #pod calling close on multiple tied file handles and/or the original object will
213             #pod have the same effect as calling close on the original object multiple
214             #pod times.
215             #pod
216             #pod =cut
217              
218             sub fh {
219 0     0 1   my ($self) = @_;
220 0           my $fh = IO::Handle->new();
221 0           tie *$fh, 'MongoDB::GridFSBucket::UploadStream', $self;
222 0           return $fh;
223             }
224              
225             sub _flush_chunks {
226 0     0     my ( $self, $all ) = @_;
227 0           my @chunks = ();
228 0           my $data;
229 0   0       while ( length $self->{_buffer} >= $self->chunk_size_bytes
      0        
230             || ( $all && length $self->{_buffer} > 0 ) )
231             {
232 0           $data = substr $self->{_buffer}, 0, $self->chunk_size_bytes, '';
233              
234 0           push @chunks,
235             {
236             files_id => $self->id,
237             n => int( $self->_current_chunk_n ),
238             data => BSON::Bytes->new( data => $data ),
239             };
240 0           $self->{_current_chunk_n} += 1;
241             }
242 0 0         if ( scalar(@chunks) ) {
243 0           eval { $self->_bucket->_chunks->insert_many( \@chunks ) };
  0            
244 0 0         if ($@) {
245 0           MongoDB::GridFSError->throw("Error inserting chunks: $@");
246             }
247             }
248             }
249              
250             sub _write_data {
251 0     0     my ( $self, $data ) = @_;
252 0           Encode::_utf8_off($data); # force it to bytes for transmission
253 0           $self->{_buffer} .= $data;
254 0           $self->{_length} += length $data;
255 0 0         $self->_md5->add($data) unless $self->_bucket->disable_md5;
256 0 0         $self->_flush_chunks if length $self->{_buffer} >= $self->_chunk_buffer_length;
257             }
258              
259             #pod =method abort
260             #pod
261             #pod $stream->abort;
262             #pod
263             #pod Aborts the upload by deleting any chunks already uploaded to the database
264             #pod and closing the stream.
265             #pod
266             #pod =cut
267              
268             sub abort {
269 0     0 1   my ($self) = @_;
270 0 0         if ( $self->_closed ) {
271 0           warn 'Attempted to abort an already closed UploadStream';
272 0           return;
273             }
274              
275 0           $self->_bucket->_chunks->delete_many( { files_id => $self->id } );
276 0           $self->_set__closed(1);
277             }
278              
279             #pod =method close
280             #pod
281             #pod $file_doc = $stream->close;
282             #pod
283             #pod Closes the stream and flushes any remaining data to the database. Once this is
284             #pod done a file document is created in the GridFS bucket, making the uploaded file
285             #pod visible in subsequent queries or downloads.
286             #pod
287             #pod On success, the file document hash reference is returned as a convenience.
288             #pod
289             #pod B
290             #pod
291             #pod =for :list
292             #pod * Calling close will also cause any tied file handles created for the
293             #pod stream to also close.
294             #pod * C will be automatically called when a stream object is destroyed.
295             #pod When called this way, any errors thrown will not halt execution.
296             #pod * Calling C repeatedly will warn.
297             #pod
298             #pod =cut
299              
300             sub close {
301 0     0 1   my ($self) = @_;
302 0 0         if ( $self->_closed ) {
303 0           warn 'Attempted to close an already closed MongoDB::GridFSBucket::UploadStream';
304 0           return;
305             }
306 0           $self->_flush_chunks(1);
307 0 0         my $filedoc = {
308             _id => $self->id,
309             length => $self->_length,
310             chunkSize => $self->chunk_size_bytes,
311             uploadDate => BSON::Time->new(),
312             filename => $self->filename,
313             ( $self->_bucket->disable_md5 ? () : (md5 => $self->_md5->hexdigest) ),
314             };
315 0 0         $filedoc->{'contentType'} = $self->content_type if $self->content_type;
316 0 0         $filedoc->{'metadata'} = $self->metadata if $self->metadata;
317 0 0         $filedoc->{'aliases'} = $self->aliases if $self->aliases;
318 0           eval { $self->_bucket->_files->insert_one($filedoc) };
  0            
319 0 0         if ($@) {
320 0           MongoDB::GridFSError->throw("Error inserting file document: $@");
321             }
322 0           $self->_set__closed(1);
323 0           return $filedoc;
324             }
325              
326             #pod =method fileno
327             #pod
328             #pod if ( $stream->fileno ) { ... }
329             #pod
330             #pod Works like the builtin C, but it returns -1 if the stream is open
331             #pod and undef if closed.
332             #pod
333             #pod =cut
334              
335             sub fileno {
336 0     0 1   my ($self) = @_;
337 0 0         return if $self->_closed;
338 0           return -1;
339             }
340              
341             #pod =method print
342             #pod
343             #pod $stream->print(@data);
344             #pod
345             #pod Works like the builtin C.
346             #pod
347             #pod =cut
348              
349             sub print {
350 0     0 1   my $self = shift;
351 0 0         return if $self->_closed;
352 0 0         my $fsep = defined($,) ? $, : '';
353 0 0         my $osep = defined($\) ? $\ : '';
354 0           my $output = join( $fsep, @_ ) . $osep;
355 0           $self->_write_data($output);
356 0           return 1;
357             }
358              
359             #pod =method printf
360             #pod
361             #pod $stream->printf($format, @data);
362             #pod
363             #pod Works like the builtin C.
364             #pod
365             #pod =cut
366              
367             sub printf {
368 0     0 1   my $self = shift;
369 0           my $format = shift;
370 0           local $\;
371 0           $self->print( sprintf( $format, @_ ) );
372             }
373              
374             #pod =method syswrite
375             #pod
376             #pod $stream->syswrite($buffer);
377             #pod $stream->syswrite($buffer, $length);
378             #pod $stream->syswrite($buffer, $length, $offset);
379             #pod
380             #pod Works like the builtin C.
381             #pod
382             #pod =cut
383              
384             sub syswrite {
385 0     0 1   my ( $self, $buff, $len, $offset ) = @_;
386 0           my $bufflen = length $buff;
387              
388 0 0         $len = $bufflen unless defined $len;
389 0 0         if ( $len < 0 ) {
390 0           MongoDB::UsageError->throw(
391             'Negative length passed to MongoDB::GridFSBucket::DownloadStream->read');
392             }
393              
394 0   0       $offset ||= 0;
395              
396 0           local $\;
397 0           $self->print( substr( $buff, $offset, $len ) );
398             }
399              
400             sub DEMOLISH {
401 0     0 0   my ($self) = @_;
402 0 0         $self->close unless $self->_closed;
403             }
404              
405             sub TIEHANDLE {
406 0     0     my ( $class, $self ) = @_;
407 0           return $self;
408             }
409              
410             sub BINMODE {
411 0     0     my ( $self, $mode ) = @_;
412 0 0 0       if ( !$mode || $mode eq ':raw' ) {
413 0           return 1;
414             }
415 0           $! = "binmode for " . __PACKAGE__ . " only supports :raw mode.";
416             return
417 0           }
418              
419             {
420 59     59   162301 no warnings 'once';
  59         189  
  59         6305  
421             *PRINT = \&print;
422             *PRINTF = \&printf;
423             *WRITE = \&syswrite;
424             *CLOSE = \&close;
425             *FILENO = \&fileno;
426             }
427              
428             my @unimplemented = qw(
429             EOF
430             GETC
431             READ
432             READLINE
433             SEEK
434             TELL
435             );
436              
437             for my $u (@unimplemented) {
438 59     59   439 no strict 'refs';
  59         140  
  59         6101  
439             my $l = lc($u);
440             *{$u} = sub {
441 0     0     MongoDB::UsageError->throw( "$l() not available on " . __PACKAGE__ );
442             };
443             }
444              
445             1;
446              
447             __END__