File Coverage

blib/lib/IPC/Queue/Duplex.pm
Criterion Covered Total %
statement 52 84 61.9
branch 10 30 33.3
condition 2 12 16.6
subroutine 9 11 81.8
pod 5 5 100.0
total 78 142 54.9


line stmt bran cond sub pod time code
1             # -*-cperl-*-
2             #
3             # IPC::Queue::Duplex - Filesystem based request / response queue
4             # Copyright (c) Ashish Gulhati
5             #
6             # $Id: lib/IPC/Queue/Duplex.pm v1.009 Tue Oct 16 21:48:32 PDT 2018 $
7              
8             package IPC::Queue::Duplex;
9              
10 3     3   163790 use warnings;
  3         22  
  3         81  
11 3     3   14 use strict;
  3         3  
  3         45  
12 3     3   1775 use File::Temp;
  3         52000  
  3         185  
13 3     3   1216 use File::Copy qw(cp);
  3         5489  
  3         141  
14 3     3   1105 use IPC::Queue::Duplex::Job;
  3         7  
  3         82  
15 3     3   15 use Fcntl qw(:flock);
  3         6  
  3         2115  
16              
17             our ( $VERSION ) = '$Revision: 1.009 $' =~ /\s+([\d\.]+)/;
18              
19             sub new {
20 4     4 1 162 my ($class, %args) = @_;
21 4 50 33     112 return unless $args{Dir} and -d $args{Dir};
22 4         22 bless { Dir => $args{Dir} }, $class;
23             }
24              
25             sub add {
26 10     10 1 3901 my ($self, $jobstr) = @_;
27 10 50       71 return unless $jobstr;
28 10         117 my $job = File::Temp->new( DIR => $self->{Dir}, SUFFIX => '.iqd');
29 10         4211 print $job "$jobstr\n";
30             # $job->unlink_on_destroy(0);
31 10         79 $job->close; my $filename = $job->filename; $filename =~ s/\.iqd/.job/;
  10         329  
  10         107  
32 10         24 cp($job->filename,$filename);
33 10         3320 bless { File => $filename }, 'IPC::Queue::Duplex::Job';
34             }
35              
36             sub addfile {
37 0     0 1 0 my ($self, $filename, $jobstr) = @_;
38 0 0 0     0 return unless $filename and $jobstr;
39 0         0 my $filetemp = $filename; $filetemp .= '.iqdtmp';
  0         0  
40 0         0 open JOBFILE, ">$filetemp";
41 0         0 print JOBFILE "$jobstr\n";
42 0         0 close JOBFILE;
43 0         0 rename $filetemp, $filename;
44 0         0 bless { File => $filename }, 'IPC::Queue::Duplex::Job';
45             }
46              
47             sub get {
48 623     623 1 4253 my $self = shift;
49 623         914 my $filework = '';
50 623         778 while (1) {
51 623         957 my $filename; my $oldestage;
52 623         42948 foreach my $file (glob("$self->{Dir}/*.job")) {
53 10 50 33     87 if( !defined($filename) or -M $file > $oldestage ) {
54 10         30 $filename = $file;
55 10         291 $oldestage = -M $file;
56             }
57             }
58 623 100       2717 last unless $filename;
59 10 50       125 next unless -e $filename;
60 10         463 open(my $jobfh, "+<", $filename);
61 10 50       126 close $jobfh, next unless flock($jobfh, LOCK_EX | LOCK_NB);
62 10         27 $filework = $filename; $filework =~ s/\.job/.wrk/;
  10         76  
63 10 50       677 close $jobfh, last if rename $filename,$filework;
64             }
65 623 100       1200 if ($filework) {
66 10         251 open (JOB, $filework);
67 10         169 my $jobstr = ; chomp $jobstr;
  10         28  
68 10         359 close JOB;
69 10         141 return bless { File => $filework, Request => $jobstr }, 'IPC::Queue::Duplex::Job';
70             }
71 613         1753 return undef;
72             }
73              
74             sub getresponse {
75 0     0 1   my $self = shift;
76 0           my ($filename, $filefin);
77 0           while (1) {
78 0           my $oldestage;
79 0           foreach my $file (glob("$self->{Dir}/*.fin")) {
80 0 0 0       if( !defined($filename) or -M $file > $oldestage ) {
81 0           $filename = $file;
82 0           $oldestage = -M $file;
83             }
84             }
85 0 0         last unless $filename;
86 0 0         next unless -e $filename;
87 0           open(my $jobfh, "+<", $filename);
88 0 0         close $jobfh, next unless flock($jobfh, LOCK_EX | LOCK_NB);
89 0           $filefin = $filename; $filefin =~ s/\.fin/.rsp/;
  0            
90 0 0         close $jobfh, last if rename $filename,$filefin;
91             }
92 0 0         if ($filefin) {
93 0           open (JOBFIN, $filefin);
94 0           my $response = ; chomp $response;
  0            
95 0           close JOBFIN; unlink $filefin;
  0            
96 0           return bless { File => $filename, Response => $response }, 'IPC::Queue::Duplex::Job';
97             }
98 0           return undef;
99             }
100              
101             1; # End of IPC::Queue::Duplex
102              
103             =head1 NAME
104              
105             IPC::Queue::Duplex - Filesystem based request / response queue
106              
107             =head1 VERSION
108              
109             $Revision: 1.009 $
110             $Date: Tue Oct 16 21:48:32 PDT 2018 $
111              
112             =head1 SYNOPSIS
113              
114             (Enqueuer)
115              
116             use IPC::Queue::Duplex;
117              
118             my $client = new IPC::Queue::Duplex (Dir => $dir);
119             my $job = $client->add($jobstr);
120             my $response = $job->response;
121              
122             (Worker)
123              
124             use IPC::Queue::Duplex;
125              
126             my $server = new IPC::Queue::Duplex (Dir => $dir);
127             my $job = $server->get:
128             process_job($job);
129             $job->finish($result);
130              
131             =head1 METHODS
132              
133             =head2 new
134              
135             Creates and returns a new IPC::Queue::Duplex object. Requires one
136             named parameter:
137              
138             =over
139              
140             Dir - The directory that will contain the queue for this object. It's
141             important to use a unique directory per queue,
142              
143             =back
144              
145             =head2 add
146              
147             Adds a job to the queue and returns an IPC::Queue::Duplex::Job
148             object. A single aregument is required, the job request as a string.
149              
150             =head2 get
151              
152             Gets a job from the queue and returns an IPC::Queue::Duplex::Job
153             object. Returns undef if there is no job waiting.
154              
155             =head2 addfile
156              
157             Adds a job to the queue, with an explicitly provided filename, and
158             returns an IPC::Queue::Duplex::Job object. Two arguments are required:
159             the filename to use for the job, and the job request string, in that
160             order.
161              
162             =head2 getresponse
163              
164             Get a response from the queue. Normally a requester would hold on to
165             their job object and get the response by calling the response method
166             on that object. However, a requester can can also handle responses
167             asynchronously and call this method to get the next waiting response
168             instead. Returns an IPC::Queue::Duplex::Job object, or undef if there
169             is no response on the queue.
170              
171             =head1 AUTHOR
172              
173             Ashish Gulhati, C<< >>
174              
175             =head1 BUGS
176              
177             Please report any bugs or feature requests to C, or through
178             the web interface at L. I will be notified, and then you'll
179             automatically be notified of progress on your bug as I make changes.
180              
181             =head1 SUPPORT
182              
183             You can find documentation for this module with the perldoc command.
184              
185             perldoc IPC::Queue::Duplex
186              
187             You can also look for information at:
188              
189             =over 4
190              
191             =item * RT: CPAN's request tracker
192              
193             L
194              
195             =item * AnnoCPAN: Annotated CPAN documentation
196              
197             L
198              
199             =item * CPAN Ratings
200              
201             L
202              
203             =item * Search CPAN
204              
205             L
206              
207             =back
208              
209             =head1 LICENSE AND COPYRIGHT
210              
211             Copyright (c) Ashish Gulhati.
212              
213             This software package is Open Software; you can use, redistribute,
214             and/or modify it under the terms of the Open Artistic License 2.0.
215              
216             Please see L for the full license
217             terms, and ensure that the license grant applies to you before using
218             or modifying this software. By using or modifying this software, you
219             indicate your agreement with the license terms.