File Coverage

blib/lib/Forks/Queue/Shmem.pm
Criterion Covered Total %
statement 71 73 97.2
branch 17 26 65.3
condition 8 16 50.0
subroutine 7 7 100.0
pod 1 1 100.0
total 104 123 84.5


line stmt bran cond sub pod time code
1             package Forks::Queue::Shmem;
2 56     56   232084 use base 'Forks::Queue::File';
  56         128  
  56         10643  
3 56     56   319 use strict;
  56         84  
  56         1102  
4 56     56   221 use warnings;
  56         84  
  56         1266  
5 56     56   241 use Carp;
  56         131  
  56         44050  
6              
7             our $VERSION = '0.13';
8             our $DEV_SHM = "/dev/shm";
9             our $DEBUG;
10             *DEBUG = \$Forks::Queue::DEBUG;
11              
12             sub new {
13 43     43 1 12875 my $class = shift;
14 43         450 my %opts = (%Forks::Queue::OPTS, @_);
15              
16 43 50       2264 if (! -d $DEV_SHM) {
17 0         0 croak "\$DEV_SHM not set to a valid shared memory virtual filesystem";
18             }
19              
20 43 100       223 if ($opts{file}) {
21 22   33     174 $opts{loc} //= $opts{file};
22 22         104 $opts{loc} =~ s{.*/(.)}{$1};
23 22         48 $opts{loc} =~ s{/+$}{};
24 22         117 $opts{file} = "$DEV_SHM/" . $opts{loc};
25             } else {
26 21         171 $opts{file} = _impute_file();
27             }
28              
29 43         240 $opts{lock} = $opts{file} . ".lock";
30 43   50     164 $opts{limit} //= -1;
31 43   50     121 $opts{on_limit} //= 'fail';
32 43   50     138 $opts{style} //= 'fifo';
33 43         83 my $list = delete $opts{list};
34              
35 43         126 my $fh;
36              
37 43   50     420 $opts{_header_size} //= 2048;
38 43         96 $opts{_end} = 0; # whether "end" has been called for this obj
39 43         129 $opts{_pos} = 0; # "cursor", index of next item to shift out
40 43         79 $opts{_tell} = $opts{_header_size}; # file position of cursor
41              
42 43         96 $opts{_count} = 0; # index of next item to be appended
43 43         230 $opts{_pids} = { Forks::Queue::File::_PID() => 'P' };
44             # $opts{_pids} = { $$ => 'P' };
45 43         294 $opts{_qid} = Forks::Queue::Util::QID();
46              
47             # how often to refactor the queue file. use small values to keep file
48             # sizes small and large values to improve performance
49 43   50     338 $opts{_maintenance_freq} //= 32;
50              
51             open $fh, '>>', $opts{lock}
52 43 50       3188 or die "Forks::Queue::Shmem: ",
53             "failed to create lock file '$opts{lock}': $!";
54 43 50       837 close $fh or die;
55              
56 43         761 my $self = bless { %opts }, $class;
57              
58 43 100 66     293 if ($opts{join} && -f $opts{file}) {
59 2         23 $DB::single = 1;
60 2 50       58 open $fh, '+<', $opts{file} or die;
61 2         15 $self->{_fh} = *$fh;
62 2         10 my $fhx = select $fh; $| = 1; select $fhx;
  2         12  
  2         13  
63 2     2   27 Forks::Queue::File::_SYNC { $self->_read_header } $self;
  2         21  
64             } else {
65 41 100       524 if (-f $opts{file}) {
66 2         1264 carp "Forks::Queue: Queue file $opts{file} already exists. ",
67             "Expect trouble if another process created this file.";
68             }
69 41 50       1327 open $fh, '>>', $opts{file} or croak(
70             "Forks::Queue: could not create queue file $opts{file}: $!");
71 41 50       616 close $fh or croak(
72             "Forks::Queue: bizarre error closing queue file $opts{file} $!");
73              
74 41 50       921 open $fh, '+<', $opts{file} or croak(
75             "Forks::Queue: error re-opening queue file $opts{file} $!");
76              
77 41         263 my $fx = select $fh;
78 41         144 $| = 1;
79 41         175 select $fx;
80              
81 41         436 $self->{_fh} = *$fh;
82 41         315 seek $fh, 0, 0;
83              
84 41         181 $self->{_locked}++;
85 41         547 $self->_write_header;
86 41         412 $self->{_locked}--;
87 41 50       358 if (tell($fh) < $self->{_header_size}) {
88 41         474 print $fh "\0" x ($self->{_header_size} - tell($fh));
89             }
90             }
91 43 100       165 if (defined($list)) {
92 5 50       18 if (ref($list) eq 'ARRAY') {
93 5         53 $self->push( @$list );
94             } else {
95 0         0 carp "Forks::Queue::new: 'list' option must be an array ref";
96             }
97             }
98              
99 43         348 return $self;
100             }
101              
102             my $id = 0;
103             sub _impute_file {
104 21     21   148 my $base = $0;
105 21         311 $base =~ s{.*[/\\](.)}{$1};
106 21         140 $base =~ s{[/\\]$}{};
107 21         53 $id++;
108 21         282 return "$DEV_SHM/shmq-$$-$id-$base";
109             }
110              
111             1;
112              
113             =head1 NAME
114              
115             Forks::Queue::Shmem - Forks::Queue implementation using shared memory
116              
117             =head1 SYNOPSIS
118              
119             use Forks::Queue::Shmem;
120             $q = Forks::Queue::Shmem->new;
121              
122             use Forks::Queue;
123             $q = Forks::Queue->new( impl => 'Shmem, ... );
124              
125             =head1 VERSION
126              
127             0.13
128              
129             =head1 DESCRIPTION
130              
131             Shared memory implementation of L.
132             Only available on systems that have a C virtual filesystem.
133              
134             A shared memory implementation is appropriate for programs that
135             rapidly update the queue but are not likely to let the size of data
136             in the queue exceed the available memory on the host machine.
137             Use L if you demand high capacity for your queue.
138              
139             See L for the public API to this class.
140              
141             =head2 Constructor options
142              
143             In addition to the standard options described in
144             L, the
145             C constructor also recognizes some
146             additional options:
147              
148             =over 4
149              
150             =item * file
151              
152             The name of the filename to hold the queue data. An absolute
153             pathname should not be provided here. The virtual queue file
154             will reside under the shared memory virtual filesystem
155             (probably C) on your system, if it exists.
156              
157             =item * style
158              
159             =item * limit
160              
161             =item * on_limit
162              
163             =item * join
164              
165             =item * persist
166              
167             See L for descriptions of these options.
168              
169             =item * debug
170              
171             Boolean value to enable or disable debugging on this queue,
172             overriding the value in C<$Forks::Queue::DEBUG>.
173              
174             =back
175              
176             =head1 LICENSE AND COPYRIGHT
177              
178             Copyright (c) 2017-2019, Marty O'Brien.
179              
180             This library is free software; you can redistribute it and/or modify
181             it under the same terms as Perl itself, either Perl version 5.10.1 or,
182             at your option, any later version of Perl 5 you may have available.
183              
184             See http://dev.perl.org/licenses/ for more information.
185              
186             =cut