File Coverage

blib/lib/Mojo/Promise/Limitter.pm
Criterion Covered Total %
statement 37 37 100.0
branch 5 6 83.3
condition 2 2 100.0
subroutine 11 11 100.0
pod 2 2 100.0
total 57 58 98.2


line stmt bran cond sub pod time code
1             package Mojo::Promise::Limitter;
2 3     3   203772 use Mojo::Base 'Mojo::EventEmitter';
  3         559410  
  3         21  
3 3     3   6016 use Mojo::Promise;
  3         438913  
  3         17  
4              
5             our $VERSION = '0.001';
6              
7             has outstanding => 0;
8             has concurrency => 0;
9             has jobs => sub { [] };
10              
11             sub new {
12 2     2 1 200 my ($class, $concurrency) = @_;
13 2         30 $class->SUPER::new(concurrency => $concurrency);
14             }
15              
16             sub limit {
17 9     9 1 1031 my ($self, $sub, $name) = @_;
18 9   100     35 $name //= 'anon';
19 9 100       26 if ($self->outstanding < $self->concurrency) {
20 4         58 return $self->_run($sub, $name);
21             } else {
22 5         40 return $self->_queue($sub, $name);
23             }
24             }
25              
26             sub _run {
27 9     9   27 my ($self, $sub, $name) = @_;
28 9         21 $self->{outstanding}++;
29 9         45 $self->emit(run => $name);
30             $sub->()->then(
31 7     7   601795 sub { $self->_remove($name); @_ },
  7         1008  
32 2     2   301010 sub { $self->_remove($name); Mojo::Promise->reject(@_) },
  2         275  
33 9         142 );
34             }
35              
36             sub _queue {
37 5     5   12 my ($self, $sub, $name) = @_;
38 5         15 $self->emit(queue => $name);
39             Mojo::Promise->new(sub {
40 5     5   125 my ($resolve, $reject) = @_;
41 5         9 push @{$self->jobs}, { sub => $sub, name => $name, resolve => $resolve, reject => $reject };
  5         13  
42 5         71 });
43             }
44              
45             sub _dequeue {
46 9     9   25 my $self = shift;
47 9 100       22 if (my $job = shift @{$self->jobs}) {
  9         72  
48 5         51 $self->emit(dequeue => $job->{name});
49 5         86 $self->_run($job->{sub}, $job->{name})->then($job->{resolve}, $job->{reject});
50             }
51             }
52              
53             sub _remove {
54 9     9   34 my ($self, $name) = @_;
55 9         29 $self->{outstanding}--;
56 9         65 $self->emit(remove => $name);
57 9 50       183 if ($self->outstanding < $self->concurrency) {
58 9         159 $self->_dequeue;
59             }
60             }
61              
62             1;
63             __END__