File Coverage

blib/lib/Sque.pm
Criterion Covered Total %
statement 23 40 57.5
branch 0 14 0.0
condition 0 9 0.0
subroutine 8 11 72.7
pod 4 4 100.0
total 35 78 44.8


line stmt bran cond sub pod time code
1 3     3   113645 use strict;
  3         8  
  3         110  
2 3     3   16 use warnings;
  3         4  
  3         145  
3             package Sque;
4             $Sque::VERSION = '0.010';
5 3     3   3698 use Any::Moose;
  3         129892  
  3         19  
6 3     3   1838 use Any::Moose '::Util::TypeConstraints';
  3         7  
  3         14  
7 3     3   4140 use Net::Stomp;
  3         61351  
  3         31  
8              
9 3     3   2344 use Sque::Job;
  3         12  
  3         33  
10 3     3   1935 use Sque::Worker;
  3         12  
  3         26  
11              
12             # ABSTRACT: Background job processing based on Resque, using Stomp
13              
14             subtype 'Sugar::Stomp' => as class_type('Net::Stomp');
15              
16             coerce 'Sugar::Stomp'
17             => from 'Str'
18             => via {
19             my ( $host, $port ) = split /:/;
20             my $stomp = Net::Stomp->new({ hostname => $host, port => $port });
21             $stomp->connect;
22             return $stomp;
23             };
24              
25             coerce 'Sugar::Stomp'
26             => from 'ArrayRef[Str]'
27             => via {
28             my ($a) = @_;
29             my $hosts = [];
30             for ( @$a ) {
31             my ( $host, $port ) = split /:/;
32             push @$hosts, { hostname => $host, port => $port };
33             }
34              
35             my $stomp = @$hosts > 1
36             ? Net::Stomp->new({ hosts => $hosts })
37             : Net::Stomp->new({
38             hostname => $hosts->[0]{hostname},
39             port => $hosts->[0]{port}
40             });
41              
42             $stomp->connect;
43             return $stomp;
44             };
45              
46             has stomp => (
47             is => 'ro',
48             lazy => 1,
49             coerce => 1,
50             isa => 'Sugar::Stomp',
51             default => sub { Net::Stomp->new->connect },
52             );
53              
54             has namespace => ( is => 'rw', default => sub { 'sque' });
55              
56             has worker => (
57             is => 'ro',
58             lazy => 1,
59             default => sub { Sque::Worker->new( sque => $_[0] ) },
60             );
61              
62             sub push {
63 0     0 1 0 my ( $self, $queue, $job ) = @_;
64 0 0 0     0 confess "Can't push an empty job." unless ( $job || ref $queue );
65 0 0       0 if( ref $queue ){
66 0 0       0 $job = $self->new_job($queue) unless ref $queue eq 'Sque::Job';
67 0         0 $queue = $job->queue;
68             } else {
69 0 0       0 $job = $self->new_job($job) unless ref $job eq 'Sque::Job';
70             }
71              
72 0         0 $self->stomp->send( {
73             persistent => 'true',
74             destination => $self->key( $queue ),
75             body => $job->encode,
76 0         0 %{ $job->headers },
77             } );
78             }
79              
80             sub pop {
81 0     0 1 0 my ( $self ) = @_;
82 0         0 my $frame = $self->stomp->receive_frame;
83 0 0       0 return unless $frame;
84              
85 0         0 $self->new_job({
86             frame => $frame,
87             queue => $frame->destination,
88             });
89             }
90              
91             sub new_job {
92 0     0 1 0 my ( $self, $job ) = @_;
93              
94 0 0 0     0 if ( $job && ref $job && ref $job eq 'HASH' ) {
    0 0        
95 0         0 return Sque::Job->new({ sque => $self, %$job });
96             }
97             elsif ( $job ) {
98 0         0 return Sque::Job->new({ sque => $self, payload => $job });
99             }
100 0         0 confess "Can't build an empty Sque::Job object.";
101             }
102              
103             sub key {
104 2     2 1 4 my $self = shift;
105 2         11 '/queue/' . $self->namespace . '/' . shift;
106             }
107              
108             __PACKAGE__->meta->make_immutable();
109              
110             1;
111              
112             __END__
113              
114             =pod
115              
116             =encoding UTF-8
117              
118             =head1 NAME
119              
120             Sque - Background job processing based on Resque, using Stomp
121              
122             =head1 VERSION
123              
124             version 0.010
125              
126             =head1 SYNOPSIS
127              
128             First you create a Sque instance where you configure the L<Stomp>
129             backend and then you can start sending jobs to be done by workers:
130              
131             use Sque;
132              
133             my $s = Sque->new( stomp => '127.0.0.1:61613' );
134             # Or, for failover
135             $s = Sque->new( stomp => [ '127.0.0.1:61613', '127.0.0.2:61613' ] );
136              
137             $s->push( my_queue => {
138             class => 'My::Task',
139             args => [ 'Hello world!' ]
140             });
141              
142             You can also send by just using:
143              
144             $s->push({
145             class => 'My::Task',
146             args => [ 'Hello world!' ]
147             });
148              
149             In this case, the queue will be set automatically automatically to the
150             job class name with colons removed, which in this
151             case would be 'MyTask'.
152              
153             You can set custom C<STOMP> headers by passing them in as follows:
154              
155             $s->push( my_queue => {
156             class => 'My::Task',
157             args => [ 'Hello world!' ],
158             headers => { header1 => 'val1', header2 => 'val2' }
159             });
160              
161             Additionally, the L<sque> command-line tool can be used to send messages:
162              
163             $ sque send -h 127.0.0.1 -p 61613 -c My::Task 'Hello world!'
164              
165             Background jobs can be any perl module that implement a perform() function.
166             The L<Sque::Job> object is passed as the only argument to this function:
167              
168             package My::Task;
169             use strict;
170             use 5.10.0;
171              
172             sub perform {
173             my ( $job ) = @_;
174             say $job->args->[0];
175             }
176              
177             1;
178              
179             Background jobs can also be OO. The perform function will still be called
180             with the L<Sque::Job> object as the only argument:
181              
182             package My::Task;
183             use strict;
184             use 5.10.0;
185             use Moose;
186              
187             with 'Role::Awesome';
188              
189             has attr => ( is => 'ro', default => 'Where am I?' );
190              
191             sub perform {
192             my ( $self, $job ) = @_;
193             say $self->attr;
194             say $job->args->[0];
195             }
196              
197             1;
198              
199             Finally, you run your jobs by instancing a L<Sque::Worker> and telling it
200             to listen to one or more queues:
201              
202             use Sque;
203              
204             my $w = Sque->new( stomp => '127.0.0.1:61613' )->worker;
205             $w->add_queues('my_queue');
206             $w->work;
207              
208             Or you can simply use the L<sque> command-line tool which uses L<App::Sque>
209             like so:
210              
211             $ sque work --host 127.0.0.1 --port 61613 --workers 5 --lib ./lib --lib ./lib2 --queues Queue1,Queue2,Queue3
212              
213             =head1 DESCRIPTION
214              
215             This is a copy of L<resque-perl|https://github.com/diegok/resque-perl>
216             by L<Diego Kuperman|https://github.com/diegok> simplified a little bit
217             (for better or worse) and made to work with any stomp server rather than Redis.
218              
219             =head1 ATTRIBUTES
220              
221             =head2 stomp
222              
223             A Stomp Client on this sque instance.
224              
225             =head2 namespace
226              
227             Namespace for queues, default is 'sque'
228              
229             =head2 worker
230              
231             A L<Sque::Worker> on this sque instance.
232              
233             =head1 METHODS
234              
235             =head2 push
236              
237             Pushes a job onto a queue. Queue name should be a string and the
238             item should be a L<Sque::Job> object or a hashref containing:
239             class - The String name of the job class to run.
240             args - Any arrayref of arguments to pass the job.
241              
242             Example:
243              
244             $sque->push( archive => { class => 'Archive', args => [ 35, 'tar' ] } )
245              
246             =head2 pop
247              
248             Pops a job off a queue. Queue name should be a string.
249             Returns a l<Sque::Job> object.
250              
251             =head2 key
252              
253             Concatenate C<$self->namespace> with the received array of names
254             to build a redis key name for this sque instance.
255              
256             =head2 new_job
257              
258             Build a L<Sque::Job> object on this system for the given
259             hashref(see L<Sque::Job>) or string(payload for object).
260              
261             =head1 ATTRIBUTES
262              
263             =head1 HELPER METHODS
264              
265             =head1 TODO
266              
267             =over 4
268              
269             =item * Make App::Sque that will let you run sque and just pass it the
270             stomp server/port, queue list, lib directories (if needed), and
271             number of workers.
272              
273             =item * More (real) tests.
274              
275             =back
276              
277             =head1 AUTHOR
278              
279             William Wolf <throughnothing@gmail.com>
280              
281             =head1 COPYRIGHT AND LICENSE
282              
283              
284             William Wolf has dedicated the work to the Commons by waiving all of his
285             or her rights to the work worldwide under copyright law and all related or
286             neighboring legal rights he or she had in the work, to the extent allowable by
287             law.
288              
289             Works under CC0 do not require attribution. When citing the work, you should
290             not imply endorsement by the author.
291              
292             =cut