File Coverage

blib/lib/Mojo/Promise/Limiter.pm
Criterion Covered Total %
statement 37 37 100.0
branch 5 6 83.3
condition 2 2 100.0
subroutine 11 11 100.0
pod 2 2 100.0
total 57 58 98.2


line stmt bran cond sub pod time code
1             package Mojo::Promise::Limiter;
2 3     3   215114 use Mojo::Base 'Mojo::EventEmitter';
  3         592932  
  3         21  
3 3     3   6398 use Mojo::Promise;
  3         466540  
  3         20  
4              
5             our $VERSION = '0.100';
6              
7             has outstanding => 0;
8             has concurrency => 0;
9             has jobs => sub { [] };
10              
11             sub new {
12 2     2 1 214 my ($class, $concurrency) = @_;
13 2         31 $class->SUPER::new(concurrency => $concurrency);
14             }
15              
16             sub limit {
17 9     9 1 1203 my ($self, $sub, $name) = @_;
18 9   100     36 $name //= 'anon';
19 9 100       26 if ($self->outstanding < $self->concurrency) {
20 4         60 return $self->_run($sub, $name);
21             } else {
22 5         49 return $self->_queue($sub, $name);
23             }
24             }
25              
26             sub _run {
27 9     9   37 my ($self, $sub, $name) = @_;
28 9         25 $self->{outstanding}++;
29 9         44 $self->emit(run => $name);
30             $sub->()->then(
31 7     7   603460 sub { $self->_remove($name); @_ },
  7         1040  
32 2     2   300710 sub { $self->_remove($name); Mojo::Promise->reject(@_) },
  2         287  
33 9         135 );
34             }
35              
36             sub _queue {
37 5     5   19 my ($self, $sub, $name) = @_;
38 5         19 $self->emit(queue => $name);
39             Mojo::Promise->new(sub {
40 5     5   127 my ($resolve, $reject) = @_;
41 5         8 push @{$self->jobs}, { sub => $sub, name => $name, resolve => $resolve, reject => $reject };
  5         16  
42 5         74 });
43             }
44              
45             sub _dequeue {
46 9     9   30 my $self = shift;
47 9 100       23 if (my $job = shift @{$self->jobs}) {
  9         102  
48 5         63 $self->emit(dequeue => $job->{name});
49 5         73 $self->_run($job->{sub}, $job->{name})->then($job->{resolve}, $job->{reject});
50             }
51             }
52              
53             sub _remove {
54 9     9   38 my ($self, $name) = @_;
55 9         35 $self->{outstanding}--;
56 9         83 $self->emit(remove => $name);
57 9 50       239 if ($self->outstanding < $self->concurrency) {
58 9         181 $self->_dequeue;
59             }
60             }
61              
62             1;
63             __END__