File Coverage

lib/Mojo/IOLoop/ReadWriteProcess/Queue.pm
Criterion Covered Total %
statement 47 47 100.0
branch 10 12 83.3
condition 4 6 66.6
subroutine 12 12 100.0
pod 3 3 100.0
total 76 80 95.0


line stmt bran cond sub pod time code
1             package Mojo::IOLoop::ReadWriteProcess::Queue;
2 38     38   244 use Mojo::Base -base;
  38         78  
  38         253  
3 38     38   6110 use Mojo::IOLoop::ReadWriteProcess::Pool;
  38         80  
  38         1463  
4 38     38   209 use Mojo::IOLoop::ReadWriteProcess;
  38         75  
  38         343  
5 38     38   18556 use Mojo::IOLoop::ReadWriteProcess::Session;
  38         94  
  38         2154  
6              
7 38     38   280 use constant DEBUG => $ENV{MOJO_PROCESS_DEBUG};
  38         75  
  38         34072  
8              
9             has queue => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
10             has pool => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
11             has done => sub { Mojo::IOLoop::ReadWriteProcess::Pool->new() };
12             has session => sub { Mojo::IOLoop::ReadWriteProcess::Session->singleton };
13              
14             sub _dequeue {
15 40     40   91 my ($self, $process) = @_;
16              
17 40     75   102 $self->pool($self->pool->grep(sub { $process ne $_ }));
  75         918  
18 40 100 66     1054 shift @{$self->queue}
  32         298  
19             if ($self->queue->first && $self->pool->add($self->queue->first));
20             }
21              
22 359 100   359 1 914 sub exhausted { $_[0]->pool->size == 0 && shift->queue->size == 0 }
23              
24             sub consume {
25 4     4 1 12 my $self = shift;
26 4         16 $self->session->enable;
27 4         25 $self->done->maximum_processes(
28             $self->queue->maximum_processes + $self->pool->maximum_processes);
29 4         12 until ($self->exhausted) {
30 355         31074 sleep .5;
31 355         2261 $self->session->consume_collected_info;
32             $self->session->_protect(
33             sub {
34             $self->pool->each(
35             sub {
36 447         6497 my $p = shift;
37 447 50       937 return unless $p;
38 447 100       1122 return if exists $p->{started};
39 40         249 $p->{started}++;
40 40         997 $p->once(stop => sub { $self->done->add($p); $self->_dequeue($p) });
  40         4374  
  40         293  
41 40         1861 $p->start;
42 355     355   1043 });
43 355         2118 });
44             }
45             }
46              
47             sub add {
48 40     40 1 853 my $self = shift;
49 40   66     68 $self->pool->add(@_) // $self->queue->add(@_);
50             }
51              
52             sub AUTOLOAD {
53 4     4   82 our $AUTOLOAD;
54 4         10 my $fn = $AUTOLOAD;
55 4         40 $fn =~ s/.*:://;
56 4 100       22 return if $fn eq "DESTROY";
57 3         6 my $self = shift;
58             return (
59 3         12 eval { $self->pool->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) },
60             (grep(/once|on|emit/, $fn))
61 3 50       7 ? eval { $self->queue->Mojo::IOLoop::ReadWriteProcess::Pool::_cmd(@_, $fn) }
  3         13  
62             : ());
63             }
64              
65             1;
66              
67             =encoding utf-8
68              
69             =head1 NAME
70              
71             Mojo::IOLoop::ReadWriteProcess::Queue - Queue for Mojo::IOLoop::ReadWriteProcess objects.
72              
73             =head1 SYNOPSIS
74              
75             use Mojo::IOLoop::ReadWriteProcess qw(queue process);
76             my $n_proc = 20;
77             my $fired;
78              
79             my $q = queue;
80              
81             $q->pool->maximum_processes(2); # Max 2 processes in parallel
82             $q->queue->maximum_processes(10); # Max queue is 10
83              
84             $q->add( process sub { return 42 } ) for 1..7;
85              
86             # Subscribe to all "stop" events in the pool
87             $q->once(stop => sub { $fired++; });
88              
89             # Consume the queue
90             $q->consume();
91              
92             my $all = $q->done; # All processes, Mojo::Collection of Mojo::IOLoop::ReadWriteProcess
93              
94             # Set your own running pool
95             $q->pool(parallel sub { return 42 } => 5);
96              
97             # Set your own queue
98             $q->queue(parallel sub { return 42 } => 20);
99              
100             $q->consume();
101              
102             =head1 METHODS
103              
104             L inherits all methods from L and implements
105             the following new ones.
106             Note: It proxies all the other methods of L for the whole process group.
107              
108             =head2 add
109              
110             use Mojo::IOLoop::ReadWriteProcess qw(queue process);
111             my $q = queue();
112             $q->add(sub { print "Hello 2! " });
113             $q->add(process sub { print "Hello 2! " });
114              
115             Add the process to the queue.
116              
117             =head2 consume
118              
119             use Mojo::IOLoop::ReadWriteProcess qw(queue);
120             my $q = queue();
121             $q->add(sub { print "Hello 2! " });
122             $q->add(process sub { print "Hello 2! " });
123             $q->consume; # executes and exhaust the processes
124              
125             Starts the processes and empties the queue.
126             Note: maximum_processes can be set both to the pool (number of process to be run in parallel),
127             and for the queue (that gets exhausted during the C phase).
128              
129             $q->pool->maximum_processes(2); # Max 2 processes in parallel
130             $q->queue->maximum_processes(10); # Max queue is 10
131              
132             =head2 exhausted
133              
134             use Mojo::IOLoop::ReadWriteProcess qw(queue);
135             my $q = queue();
136             $q->add(sub { print "Hello 2! " });
137             $q->add(process sub { print "Hello 2! " });
138             $q->consume; # executes and exhaust the processes
139             $q->exhausted; # 1
140              
141             Returns 1 if the queue is exhausted.
142              
143             =head1 ENVIRONMENT
144              
145             You can set the MOJO_PROCESS_MAXIMUM_PROCESSES environment variable to specify the
146             the maximum number of processes allowed in the pool and the queue, that are
147             L instances.
148              
149             MOJO_PROCESS_MAXIMUM_PROCESSES=10000
150              
151             =head1 LICENSE
152              
153             Copyright (C) Ettore Di Giacinto.
154              
155             This library is free software; you can redistribute it and/or modify
156             it under the same terms as Perl itself.
157              
158             =head1 AUTHOR
159              
160             Ettore Di Giacinto Eedigiacinto@suse.comE
161              
162             =cut