File Coverage

blib/lib/Sque/Worker.pm
Criterion Covered Total %
statement 9 42 21.4
branch 0 8 0.0
condition 0 3 0.0
subroutine 3 13 23.0
pod 6 6 100.0
total 18 72 25.0


line stmt bran cond sub pod time code
1             package Sque::Worker;
2             $Sque::Worker::VERSION = '0.010';
3 3     3   20 use Any::Moose;
  3         6  
  3         26  
4 3     3   1774 use Any::Moose '::Util::TypeConstraints';
  3         4  
  3         13  
5 3     3   29397 use Try::Tiny;
  3         4063  
  3         1991  
6              
7             with 'Sque::Encoder';
8              
9             # ABSTRACT: Does the hard work of babysitting Sque::Job's
10              
11             has _dying => (is => 'rw', default => 0);
12              
13             has logger => (is => 'rw');
14              
15             has sque => (
16             is => 'ro',
17             required => 1,
18             handles => [qw/ stomp key /]
19             );
20              
21             has queues => (
22             is => 'rw',
23             isa => 'HashRef',
24             lazy => 1,
25             default => sub {{}}
26             );
27              
28             has verbose => ( is => 'rw', default => sub {0} );
29              
30             sub BUILD {
31 0     0 1   my ($self) = @_;
32              
33             my $die_handler = sub {
34 0     0     my ($signal) = @_;
35 0           $self->log("Received $signal signal, dying...");
36 0           $self->_dying(1);
37 0           };
38              
39             # Setup die handlers
40 0           $SIG{$_} = $die_handler for qw(INT TERM KILL QUIT)
41             }
42              
43             sub work {
44 0     0 1   my ( $self ) = @_;
45 0           while( my $job = $self->sque->pop ) {
46 0           $job->worker($self);
47 0           my $reval = $self->perform($job);
48 0 0         exit 0 if $self->_dying;
49             }
50             }
51              
52             sub perform {
53 0     0 1   my ( $self, $job ) = @_;
54 0           my $ret;
55             try {
56 0     0     $ret = $job->perform;
57 0           $self->log( sprintf( "done: %s", $job->stringify ) );
58             } catch {
59 0     0     $self->log( sprintf( "%s failed: %s", $job->stringify, $_ ) );
60 0           };
61 0           $self->stomp->ack({ frame => $job->frame });
62 0           return $ret;
63             }
64              
65             sub reserve {
66 0     0 1   my ( $self ) = @_;
67 0           return $self->sque->pop;
68             }
69              
70             sub add_queues {
71 0     0 1   my $self = shift;
72 0 0         return unless @_;
73 0           for my $q ( @_ ) {
74 0 0         if(!$self->queues->{$q}){
75 0           $self->queues->{$q} = 1;
76 0           my $queue = $self->sque->key( $q );
77 0           $self->_subscribe_queue( $queue );
78             }
79             }
80 0           return $self;
81             }
82              
83             sub log {
84 0     0 1   my $self = shift;
85 0 0 0       $self->logger->DEBUG(@_) if $self->verbose and $self->logger;
86             }
87              
88             sub _subscribe_queue {
89 0     0     my ( $self, $q ) = @_;
90 0           $self->stomp->subscribe( {
91             destination => $q,
92             ack => 'client',
93             } );
94             };
95              
96             __PACKAGE__->meta->make_immutable();
97              
98             1;
99              
100             __END__
101              
102             =pod
103              
104             =encoding UTF-8
105              
106             =head1 NAME
107              
108             Sque::Worker - Does the hard work of babysitting Sque::Job's
109              
110             =head1 VERSION
111              
112             version 0.010
113              
114             =head1 ATTRIBUTES
115              
116             =head2 sque
117              
118             The L<Sque> object running this worker.
119              
120             =head2 queues
121              
122             Queues this worker should fetch jobs from.
123              
124             =head2 verbose
125              
126             Set to a true value to make this worker report what's doing while
127             on work().
128              
129             =head1 METHODS
130              
131             =head2 work
132              
133             Calling this method will make this worker to start pulling & running jobs
134             from queues().
135              
136             This is the main wheel and will run while shutdown() is false.
137              
138             =head2 perform
139              
140             Call perform() on the given Sque::Job capturing and reporting
141             any exception.
142              
143             =head2 reserve
144              
145             Call reserve() to return the next job popped of the queue(s)
146              
147             =head2 add_queues
148              
149             Add a queue this worker should listen to.
150              
151             =head2 log
152              
153             If verbose() is true, this will print to STDERR.
154              
155             =head1 AUTHOR
156              
157             William Wolf <throughnothing@gmail.com>
158              
159             =head1 COPYRIGHT AND LICENSE
160              
161              
162             William Wolf has dedicated the work to the Commons by waiving all of his
163             or her rights to the work worldwide under copyright law and all related or
164             neighboring legal rights he or she had in the work, to the extent allowable by
165             law.
166              
167             Works under CC0 do not require attribution. When citing the work, you should
168             not imply endorsement by the author.
169              
170             =cut