File Coverage

blib/lib/Forks/Queue/Shmem.pm
Criterion Covered Total %
statement 70 72 97.2
branch 17 26 65.3
condition 8 16 50.0
subroutine 7 7 100.0
pod 1 1 100.0
total 103 122 84.4


line stmt bran cond sub pod time code
1             package Forks::Queue::Shmem;
2 56     56   239800 use base 'Forks::Queue::File';
  56         135  
  56         11445  
3 56     56   368 use strict;
  56         125  
  56         1361  
4 56     56   356 use warnings;
  56         156  
  56         1382  
5 56     56   260 use Carp;
  56         102  
  56         50533  
6              
7             our $VERSION = '0.14';
8             our $DEV_SHM = "/dev/shm";
9             our $DEBUG;
10             *DEBUG = \$Forks::Queue::DEBUG;
11              
12             sub new {
13 43     43 1 2216 my $class = shift;
14 43         320 my %opts = (%Forks::Queue::OPTS, @_);
15              
16 43 50       1005 if (! -d $DEV_SHM) {
17 0         0 croak "\$DEV_SHM not set to a valid shared memory virtual filesystem";
18             }
19              
20 43 100       211 if ($opts{file}) {
21 22   33     254 $opts{loc} //= $opts{file};
22 22         92 $opts{loc} =~ s{.*/(.)}{$1};
23 22         62 $opts{loc} =~ s{/+$}{};
24 22         125 $opts{file} = "$DEV_SHM/" . $opts{loc};
25             } else {
26 21         185 $opts{file} = _impute_file();
27             }
28              
29 43         270 $opts{lock} = $opts{file} . ".lock";
30 43   50     158 $opts{limit} //= -1;
31 43   50     130 $opts{on_limit} //= 'fail';
32 43   50     138 $opts{style} //= 'fifo';
33 43         119 my $list = delete $opts{list};
34              
35 43   50     496 $opts{_header_size} //= 2048;
36 43         127 $opts{_end} = 0; # whether "end" has been called for this obj
37 43         97 $opts{_pos} = 0; # "cursor", index of next item to shift out
38 43         188 $opts{_tell} = $opts{_header_size}; # file position of cursor
39              
40 43         94 $opts{_count} = 0; # index of next item to be appended
41 43         226 $opts{_pids} = { Forks::Queue::File::_PID() => 'P' };
42             # $opts{_pids} = { $$ => 'P' };
43 43         246 $opts{_qid} = Forks::Queue::Util::QID();
44              
45             # how often to refactor the queue file. use small values to keep file
46             # sizes small and large values to improve performance
47 43   50     458 $opts{_maintenance_freq} //= 32;
48              
49             open my $fh5, '>>', $opts{lock}
50 43 50       3315 or die "Forks::Queue::Shmem: ",
51             "failed to create lock file '$opts{lock}': $!";
52 43 50       555 close $fh5 or die;
53              
54 43         919 my $self = bless { %opts }, $class;
55              
56 43 100 66     321 if ($opts{join} && -f $opts{file}) {
57 2         29 $DB::single = 1;
58 2 50       72 open my $fh6, '+<', $opts{file} or die;
59 2         14 $self->{_fh} = *$fh6;
60 2         19 my $fhx = select $fh6; $| = 1; select $fhx;
  2         14  
  2         13  
61 2     2   29 Forks::Queue::File::_SYNC { $self->_read_header } $self;
  2         19  
62             } else {
63 41 100       695 if (-f $opts{file}) {
64 2         960 carp "Forks::Queue: Queue file $opts{file} already exists. ",
65             "Expect trouble if another process created this file.";
66             }
67 41 50       1643 open my $fh8, '>>', $opts{file} or croak(
68             "Forks::Queue: could not create queue file $opts{file}: $!");
69 41 50       446 close $fh8 or croak(
70             "Forks::Queue: bizarre error closing queue file $opts{file} $!");
71              
72 41 50       1312 open my $fh7, '+<', $opts{file} or croak(
73             "Forks::Queue: error re-opening queue file $opts{file} $!");
74              
75 41         233 my $fx = select $fh7;
76 41         171 $| = 1;
77 41         211 select $fx;
78              
79 41         666 $self->{_fh} = *$fh7;
80 41         348 seek $fh7, 0, 0;
81              
82 41         228 $self->{_locked}++;
83 41         585 $self->_write_header;
84 41         432 $self->{_locked}--;
85 41 50       315 if (tell($fh7) < $self->{_header_size}) {
86 41         762 print $fh7 "\0" x ($self->{_header_size} - tell($fh7));
87             }
88             }
89 43 100       198 if (defined($list)) {
90 5 50       23 if (ref($list) eq 'ARRAY') {
91 5         86 $self->push( @$list );
92             } else {
93 0         0 carp "Forks::Queue::new: 'list' option must be an array ref";
94             }
95             }
96              
97 43         435 return $self;
98             }
99              
100             my $id = 0;
101             sub _impute_file {
102 21     21   282 my $base = $0;
103 21         332 $base =~ s{.*[/\\](.)}{$1};
104 21         161 $base =~ s{[/\\]$}{};
105 21         52 $id++;
106 21         292 return "$DEV_SHM/shmq-$$-$id-$base";
107             }
108              
109             1;
110              
111             =head1 NAME
112              
113             Forks::Queue::Shmem - Forks::Queue implementation using shared memory
114              
115             =head1 SYNOPSIS
116              
117             use Forks::Queue::Shmem;
118             $q = Forks::Queue::Shmem->new;
119              
120             use Forks::Queue;
121             $q = Forks::Queue->new( impl => 'Shmem, ... );
122              
123             =head1 VERSION
124              
125             0.14
126              
127             =head1 DESCRIPTION
128              
129             Shared memory implementation of L.
130             Only available on systems that have a C virtual filesystem.
131              
132             A shared memory implementation is appropriate for programs that
133             rapidly update the queue but are not likely to let the size of data
134             in the queue exceed the available memory on the host machine.
135             Use L if you demand high capacity for your queue.
136              
137             See L for the public API to this class.
138              
139             =head2 Constructor options
140              
141             In addition to the standard options described in
142             L, the
143             C constructor also recognizes some
144             additional options:
145              
146             =over 4
147              
148             =item * file
149              
150             The name of the filename to hold the queue data. An absolute
151             pathname should not be provided here. The virtual queue file
152             will reside under the shared memory virtual filesystem
153             (probably C) on your system, if it exists.
154              
155             =item * style
156              
157             =item * limit
158              
159             =item * on_limit
160              
161             =item * join
162              
163             =item * persist
164              
165             See L for descriptions of these options.
166              
167             =item * debug
168              
169             Boolean value to enable or disable debugging on this queue,
170             overriding the value in C<$Forks::Queue::DEBUG>.
171              
172             =back
173              
174             =head1 LICENSE AND COPYRIGHT
175              
176             Copyright (c) 2017-2019, Marty O'Brien.
177              
178             This library is free software; you can redistribute it and/or modify
179             it under the same terms as Perl itself, either Perl version 5.10.1 or,
180             at your option, any later version of Perl 5 you may have available.
181              
182             See http://dev.perl.org/licenses/ for more information.
183              
184             =cut