File Coverage

blib/lib/IPC/Concurrency/DBI.pm
Criterion Covered Total %
statement 64 66 96.9
branch 19 30 63.3
condition 9 16 56.2
subroutine 14 15 93.3
pod 9 9 100.0
total 115 136 84.5


line stmt bran cond sub pod time code
1             package IPC::Concurrency::DBI;
2              
3 11     11   28254 use warnings;
  11         40  
  11         345  
4 11     11   51 use strict;
  11         11  
  11         223  
5              
6 11     11   7432 use Data::Dumper;
  11         99925  
  11         891  
7 11     11   6743 use Data::Validate::Type;
  11         84469  
  11         858  
8 11     11   119 use Carp;
  11         18  
  11         838  
9              
10 11     11   6611 use IPC::Concurrency::DBI::Application;
  11         25  
  11         9778  
11              
12              
13             =head1 NAME
14              
15             IPC::Concurrency::DBI - Control how many instances of an application run in parallel, using DBI as the IPC method.
16              
17              
18             =head1 VERSION
19              
20             Version 1.2.0
21              
22             =cut
23              
24             our $VERSION = '1.2.0';
25              
26              
27             =head1 SYNOPSIS
28              
29             This module controls how many instances of a given program are allowed to run
30             in parallel. It does not manage forking or starting those instances.
31              
32             You can use this module for example to prevent more than one instance of a
33             program from running at any given time, or to never have more than N instances
34             running in parallel to prevent exhausting all the available resources.
35              
36             It uses DBI as a storage layer for information about instances and applications,
37             which is particularly useful in contexts where Sarbanes-Oxley regulations allow
38             you database access but not file write rights in production environments.
39              
40             # Configure the concurrency object.
41             use IPC::Concurrency::DBI;
42             my $concurrency_manager = IPC::Concurrency::DBI->new(
43             'database_handle' => $dbh,
44             'verbose' => 1,
45             );
46              
47             # Create the tables that the concurrency manager needs to store information
48             # about the applications and instances.
49             $concurrency_manager->create_tables();
50              
51             # Register cron_script.pl as an application we want to limit to 10 parallel
52             # instances. We only need to do this once, obviously.
53             $concurrency_manager->register_application(
54             name => 'cron_script.pl',
55             maximum_instances => 10,
56             );
57              
58             # Retrieve the application.
59             my $application = $concurrency_manager->get_application(
60             name => 'cron_script.pl',
61             );
62              
63             # Count how many instances are currently running.
64             my $instances_count = $application->get_instances_count();
65              
66             # NOT IMPLEMENTED YET: Get a list of what instances are currently running.
67             # my $instances = $application->get_instances_list()
68              
69             # Start a new instance of the application. If this returns undef, we've
70             # reached the limit.
71             unless ( my $instance = $application->start_instance() )
72             {
73             print "Too many instances of $0 are already running.\n";
74             exit;
75             }
76              
77             # [...] Do some work.
78              
79             # Now that the application is about to exit, flag the instance as completed.
80             # (note: this is implicit when $instance is destroyed).
81             $instance->finish();
82              
83              
84             =head1 SUPPORTED DATABASES
85              
86             This distribution currently supports:
87              
88             =over 4
89              
90             =item * SQLite
91              
92             =item * MySQL
93              
94             =item * PostgreSQL
95              
96             =back
97              
98             Please contact me if you need support for another database type, I'm always
99             glad to add extensions if you can help me with testing.
100              
101              
102             =head1 METHODS
103              
104             =head2 new()
105              
106             Create a new IPC::Concurrency::DBI object.
107              
108             my $concurrency_manager = IPC::Concurrency::DBI->new(
109             'database_handle' => $dbh,
110             'verbose' => 1,
111             );
112              
113             Arguments:
114              
115             =over 4
116              
117             =item * database handle
118              
119             Mandatory, a DBI object.
120              
121             =item * verbose
122              
123             Optional, see verbose() for options.
124              
125             =back
126              
127             =cut
128              
129             sub new
130             {
131 10     10 1 125581 my ( $class, %args ) = @_;
132 10         35 my $database_handle = delete( $args{'database_handle'} );
133 10         29 my $verbose = delete( $args{'verbose'} );
134              
135             # Check parameters.
136 10 50       83 croak "Argument 'database_handle' is required to create a new IPC::Concurrency::DBI object"
137             unless defined( $database_handle );
138 10 50       312 croak "Argument 'database_handle' is not a DBI object"
139             if !Data::Validate::Type::is_instance( $database_handle, class => 'DBI::db' );
140              
141             # Create the object.
142 10         369 my $self = bless(
143             {
144             'database_handle' => $database_handle,
145             'verbose' => 0,
146             },
147             $class,
148             );
149              
150 10 100       85 $self->set_verbose( $verbose )
151             if defined( $verbose );
152              
153 10         43 return $self;
154             }
155              
156              
157             =head2 register_application()
158              
159             Register a new application with the concurrency manager and define the maximum
160             number of instances that should be allowed to run in parallel.
161              
162             $concurrency_manager->register_application(
163             name => 'cron_script.pl',
164             maximum_instances => 10,
165             );
166              
167             'name' is a unique name for the application. It can be the name of the script
168             for a cron script, for example.
169              
170             'maximum_instances' is the maximum number of instances that should be allowed to
171             run in parallel.
172              
173             =cut
174              
175             sub register_application
176             {
177 8     8 1 4581 my ( $self, %args ) = @_;
178 8         16 my $name = delete( $args{'name'} );
179 8         14 my $maximum_instances = delete( $args{'maximum_instances'} );
180              
181             # Check parameters.
182 8 100 66     69 croak 'The name of the application must be defined'
183             if !defined( $name ) || ( $name eq '' );
184 7 100       42 croak 'The application name is longer than 255 characters'
185             if length( $name ) > 255;
186 6 100 66     37 croak 'The maximum number of instances must be defined'
187             if !defined( $maximum_instances ) || ( $maximum_instances eq '' );
188 5 100       23 croak 'The maximum number of instances must be a strictly positive integer'
189             if !Data::Validate::Type::is_number( $maximum_instances, strictly_positive => 1 );
190              
191             # Insert the new application.
192 4         76 my $database_handle = $self->get_database_handle();
193 4         14 my $time = time();
194 4         41 my $rows_affected = $database_handle->do(
195             q|
196             INSERT INTO ipc_concurrency_applications( name, current_instances, maximum_instances, created, modified )
197             VALUES( ?, 0, ?, ?, ? )
198             |,
199             {},
200             $name,
201             $maximum_instances,
202             $time,
203             $time,
204             );
205 3 50       278252 croak 'Cannot execute SQL: ' . $database_handle->errstr()
206             if defined( $database_handle->errstr() );
207              
208 3 50 33     225 return defined( $rows_affected ) && $rows_affected == 1 ? 1 : 0;
209             }
210              
211              
212             =head2 get_application()
213              
214             Retrieve an application by name or by application ID.
215              
216             # Retrieve the application by name.
217             my $application = $concurrency_manager->get_application(
218             name => 'cron_script.pl',
219             );
220             die 'Application not found'
221             unless defined( $application );
222              
223             # Retrieve the application by ID.
224             my $application = $concurrency_manager->get_application(
225             id => 12345,
226             );
227             die 'Application not found'
228             unless defined( $application );
229              
230             =cut
231              
232             sub get_application
233             {
234 5     5 1 747 my ( $self, %args ) = @_;
235 5         13 my $name = delete( $args{'name'} );
236 5         8 my $application_id = delete( $args{'id'} );
237 5         18 my $database_handle = $self->get_database_handle();
238              
239 5         31 return IPC::Concurrency::DBI::Application->new(
240             name => $name,
241             id => $application_id,
242             database_handle => $database_handle,
243             );
244             }
245              
246              
247             =head2 create_tables()
248              
249             Create the tables that the concurrency manager needs to store information about
250             the applications and instances.
251              
252             $concurrency_manager->create_tables(
253             drop_if_exist => $boolean, #default 0
254             );
255              
256             By default, it won't drop any table but you can force that by setting
257             'drop_if_exist' to 1.
258              
259             =cut
260              
261             sub create_tables
262             {
263 1     1 1 48 my ( $self, %args ) = @_;
264 1         3 my $drop_if_exist = delete( $args{'drop_if_exist'} );
265 1         6 my $database_handle = $self->get_database_handle();
266              
267             # Defaults.
268 1 50 33     5 $drop_if_exist = 0
269             if !defined( $drop_if_exist ) || !$drop_if_exist;
270              
271             # Check the database type.
272 1         8 my $database_type = $self->get_database_type();
273 1 50       9 croak "This database type ($database_type) is not supported yet, please email the maintainer of the module for help"
274             if $database_type !~ m/^(?:SQLite|mysql|Pg)$/x;
275              
276             # Table definitions.
277 1         5 my $tables_sql =
278             {
279             SQLite =>
280             q|
281             CREATE TABLE ipc_concurrency_applications
282             (
283             ipc_concurrency_application_id INTEGER PRIMARY KEY AUTOINCREMENT,
284             name varchar(255) NOT NULL,
285             current_instances INTEGER NOT NULL default '0',
286             maximum_instances INTEGER NOT NULL default '0',
287             created bigint(20) NOT NULL default '0',
288             modified bigint(20) NOT NULL default '0',
289             UNIQUE (name)
290             )
291             |,
292             mysql =>
293             q|
294             CREATE TABLE ipc_concurrency_applications
295             (
296             ipc_concurrency_application_id BIGINT(20) UNSIGNED NOT NULL auto_increment,
297             name VARCHAR(255) NOT NULL,
298             current_instances INT(10) UNSIGNED NOT NULL default '0',
299             maximum_instances INT(10) UNSIGNED NOT NULL default '0',
300             created bigint(20) UNSIGNED NOT NULL default '0',
301             modified bigint(20) UNSIGNED NOT NULL default '0',
302             PRIMARY KEY (ipc_concurrency_application_id),
303             UNIQUE KEY idx_ipc_concurrency_applications_name (name)
304             )
305             ENGINE=InnoDB
306             |,
307             Pg =>
308             q|
309             CREATE TABLE ipc_concurrency_applications
310             (
311             ipc_concurrency_application_id BIGSERIAL,
312             name VARCHAR(255) NOT NULL,
313             current_instances INT NOT NULL default '0',
314             maximum_instances INT NOT NULL default '0',
315             created BIGINT NOT NULL default '0',
316             modified BIGINT NOT NULL default '0',
317             PRIMARY KEY (ipc_concurrency_application_id),
318             CONSTRAINT idx_ipc_concurrency_applications_name UNIQUE (name)
319             )
320             |,
321             };
322             croak "No table definition found for database type '$database_type'"
323 1 50       4 if !defined( $tables_sql->{ $database_type } );
324              
325             # Create the table that will hold the list of applications as well as
326             # a summary of the information about instances.
327 1 50       3 if ( $drop_if_exist )
328             {
329 0 0       0 $database_handle->do( q|DROP TABLE IF EXISTS ipc_concurrency_applications| )
330             || croak 'Cannot execute SQL: ' . $database_handle->errstr();
331             }
332             $database_handle->do(
333 1 50       9 $tables_sql->{ $database_type }
334             ) || croak 'Cannot execute SQL: ' . $database_handle->errstr();
335              
336             # TODO: create a separate table to hold information about what instances
337             # are currently running.
338              
339 1         9441458 return 1;
340             }
341              
342              
343             =head1 ACCESSORS
344              
345             =head2 get_database_handle()
346              
347             Returns the database handle used for this object.
348              
349             my $database_handle = $concurrency_manager->get_database_handle();
350              
351             =cut
352              
353             sub get_database_handle
354             {
355 13     13 1 1116 my ( $self ) = @_;
356              
357 13         41 return $self->{'database_handle'};
358             }
359              
360              
361             =head2 get_database_type()
362              
363             Return the database type corresponding to the database handle associated
364             with the L object.
365              
366             my $database_type = $concurrency_manager->get_database_type();
367              
368             =cut
369              
370             sub get_database_type
371             {
372 2     2 1 436 my ( $self ) = @_;
373              
374 2         6 my $database_handle = $self->get_database_handle();
375              
376 2   50     49 return $database_handle->{'Driver'}->{'Name'} || '';
377             }
378              
379              
380             =head2 get_verbose()
381              
382             Return the verbosity level, which is used in the module to determine when and
383             what type of debugging statements / information should be warned out.
384              
385             See C for the possible values this function can return.
386              
387             warn 'Verbose' if $queue->get_verbose();
388              
389             warn 'Very verbose' if $queue->get_verbose() > 1;
390              
391             =cut
392              
393             sub get_verbose
394             {
395 4     4 1 697 my ( $self ) = @_;
396              
397 4         22 return $self->{'verbose'};
398             }
399              
400              
401             =head2 set_verbose()
402              
403             Control the verbosity of the warnings in the code:
404              
405             =over 4
406              
407             =item * 0 will not display any warning;
408              
409             =item * 1 will only give one line warnings about the current operation;
410              
411             =item * 2 will also usually output the SQL queries performed.
412              
413             =back
414              
415             $queue->set_verbose(1); # turn on verbose information
416              
417             $queue->set_verbose(2); # be extra verbose
418              
419             $queue->set_verbose(0); # quiet now!
420              
421             =cut
422              
423             sub set_verbose
424             {
425 10     10 1 67 my ( $self, $verbose ) = @_;
426              
427 10   100     81 $self->{'verbose'} = ( $verbose || 0 );
428              
429 10         20 return;
430             }
431              
432              
433             =head1 DEPRECATED METHODS
434              
435             =head2 verbose()
436              
437             Please use C and C instead.
438              
439             =cut
440              
441             sub verbose
442             {
443 0     0 1   croak 'verbose() has been deprecated, please use get_verbose() / set_verbose() instead.';
444             }
445              
446              
447             =head1 BUGS
448              
449             Please report any bugs or feature requests through the web interface at
450             L.
451             I will be notified, and then you'll automatically be notified of progress on
452             your bug as I make changes.
453              
454              
455             =head1 SUPPORT
456              
457             You can find documentation for this module with the perldoc command.
458              
459             perldoc IPC::Concurrency::DBI
460              
461              
462             You can also look for information at:
463              
464             =over 4
465              
466             =item * GitHub's request tracker
467              
468             L
469              
470             =item * AnnoCPAN: Annotated CPAN documentation
471              
472             L
473              
474             =item * CPAN Ratings
475              
476             L
477              
478             =item * MetaCPAN
479              
480             L
481              
482             =back
483              
484              
485             =head1 AUTHOR
486              
487             L,
488             C<< >>.
489              
490              
491             =head1 ACKNOWLEDGEMENTS
492              
493             I originally developed this project for ThinkGeek
494             (L). Thanks for allowing me to open-source it!
495              
496             Thanks to Jacob Rose C<< >> for suggesting the idea of
497             this module and brainstorming with me about the features it should offer.
498              
499              
500             =head1 COPYRIGHT & LICENSE
501              
502             Copyright 2011-2017 Guillaume Aubert.
503              
504             This code is free software; you can redistribute it and/or modify it under the
505             same terms as Perl 5 itself.
506              
507             This program is distributed in the hope that it will be useful, but WITHOUT ANY
508             WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
509             PARTICULAR PURPOSE. See the LICENSE file for more details.
510              
511             =cut
512              
513             1;