File Coverage

lib/MediaCloud/JobManager/Broker/RabbitMQ.pm
Criterion Covered Total %
statement 42 307 13.6
branch 0 98 0.0
condition 0 31 0.0
subroutine 14 39 35.9
pod 0 11 0.0
total 56 486 11.5


line stmt bran cond sub pod time code
1             package MediaCloud::JobManager::Broker::RabbitMQ;
2              
3             #
4             # RabbitMQ job broker (using Celery protocol)
5             #
6             # Usage:
7             #
8             # MediaCloud::JobManager::Broker::RabbitMQ->new();
9             #
10              
11 1     1   11 use strict;
  1         4  
  1         47  
12 1     1   8 use warnings;
  1         2  
  1         60  
13 1     1   8 use Modern::Perl "2012";
  1         3  
  1         12  
14              
15 1     1   292 use Moose;
  1         3  
  1         11  
16             with 'MediaCloud::JobManager::Broker';
17              
18 1     1   9828 use Net::AMQP::RabbitMQ;
  1         4358  
  1         37  
19 1     1   9 use UUID::Tiny ':std';
  1         2  
  1         234  
20 1     1   330 use Tie::Cache;
  1         2214  
  1         31  
21 1     1   487 use JSON;
  1         6295  
  1         7  
22 1     1   161 use Data::Dumper;
  1         3  
  1         61  
23 1     1   6 use Readonly;
  1         2  
  1         41  
24              
25 1     1   6 use Log::Log4perl qw(:easy);
  1         2  
  1         9  
26             Log::Log4perl->easy_init(
27                 {
28                     level => $DEBUG,
29                     utf8 => 1,
30                     layout => "%d{ISO8601} [%P]: %m%n"
31                 }
32             );
33              
34             # flush sockets after every write
35             $| = 1;
36              
37 1     1   832 use MediaCloud::JobManager;
  1         2  
  1         18  
38 1     1   5 use MediaCloud::JobManager::Job;
  1         2  
  1         3032  
39              
40             # RabbitMQ default timeout
41             Readonly my $RABBITMQ_DEFAULT_TIMEOUT => 60;
42              
43             # Default amount of retries to try connecting to RabbitMQ to
44             Readonly my $RABBITMQ_DEFAULT_RETRIES => 60;
45              
46             # RabbitMQ delivery modes
47             Readonly my $RABBITMQ_DELIVERY_MODE_NONPERSISTENT => 1;
48             Readonly my $RABBITMQ_DELIVERY_MODE_PERSISTENT => 2;
49              
50             # RabbitMQ queue durability
51             Readonly my $RABBITMQ_QUEUE_TRANSIENT => 0;
52             Readonly my $RABBITMQ_QUEUE_DURABLE => 1;
53              
54             # RabbitMQ priorities
55             Readonly my %RABBITMQ_PRIORITIES => (
56                 $MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_LOW => 0,
57                 $MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_NORMAL => 1,
58                 $MediaCloud::JobManager::Job::MJM_JOB_PRIORITY_HIGH => 2,
59             );
60              
61             # JSON (de)serializer
62             my $json = JSON->new->allow_nonref->canonical->utf8;
63              
64             # RabbitMQ connection credentials
65             has '_hostname' => ( is => 'rw', isa => 'Str' );
66             has '_port' => ( is => 'rw', isa => 'Int' );
67             has '_username' => ( is => 'rw', isa => 'Str' );
68             has '_password' => ( is => 'rw', isa => 'Str' );
69             has '_vhost' => ( is => 'rw', isa => 'Str' );
70             has '_timeout' => ( is => 'rw', isa => 'Int' );
71             has '_retries' => ( is => 'rw', isa => 'Int' );
72              
73             # RabbitMQ connection pool for every connection ID (PID + credentials)
74             my %_rabbitmq_connection_for_connection_id;
75              
76             # "reply_to" queues for connection ID + function name
77             #
78             # We emulate Celery's RPC via RabbitMQ behavior in which results are being
79             # stuffed in per-client result queues and can be retrieved only by the same
80             # client that requested the job using run_remotely() or add_to_queue():
81             #
82             # http://docs.celeryproject.org/en/latest/userguide/tasks.html#rpc-result-backend-rabbitmq-qpid
83             my %_reply_to_queues_for_connection_id_function_name;
84              
85             # Memory-limited results cache for connection ID + function name
86             #
87             # When fetching messages from "reply_to" queue for a specific name,
88             # run_remotely() can't requeue messages that don't belong to a specific job ID
89             # so it has to put it somewhere. This hash of hashes serves as a backlog for
90             # unused job results.
91             #
92             # It's not ideal that some job results might get invalidated but Celery does
93             # that too (purges results deemed too old).
94             my %_results_caches_for_connection_id_function_name;
95              
96             # Limits of results cache above
97             Readonly my $RABBITMQ_RESULTS_CACHE_MAXCOUNT => 1024 * 100;
98             Readonly my $RABBITMQ_RESULTS_CACHE_MAXBYTES => 1024 * 1024 * 10;
99              
100             # Constructor
101             sub BUILD
102             {
103 0     0 0       my $self = shift;
104 0               my $args = shift;
105              
106 0   0           $self->_hostname( $args->{ hostname } // 'localhost' );
107 0   0           $self->_port( $args->{ port } // 5672 );
108 0   0           $self->_username( $args->{ username } // 'guest' );
109 0   0           $self->_password( $args->{ password } // 'guest' );
110 0               my $default_vhost = '/';
111 0   0           $self->_vhost( $args->{ vhost } // $default_vhost );
112 0   0           $self->_timeout( $args->{ timeout } // $RABBITMQ_DEFAULT_TIMEOUT );
113 0   0           $self->_retries( $args->{ retries } // $RABBITMQ_DEFAULT_RETRIES );
114              
115             # Connect to the current connection ID (PID + credentials)
116 0               my $mq = $self->_mq();
117             }
118              
119             # Used to uniquely identify RabbitMQ connections (by connection credentials and
120             # PID) so that we know when to reconnect
121             sub _connection_identifier($)
122             {
123 0     0         my $self = shift;
124              
125             # Reconnect when running on a fork too
126 0               my $pid = $$;
127              
128 0               return sprintf(
129                     'PID=%d; hostname=%s; port=%d; username: %s; password=%s; vhost=%s, timeout=%d, retries=%d',
130                     $pid, $self->_hostname, $self->_port, $self->_username,
131                     $self->_password, $self->_vhost, $self->_timeout, $self->_retries
132                 );
133             }
134              
135             # Returns RabbitMQ connection handler for the current connection ID
136             sub _mq($)
137             {
138 0     0         my $self = shift;
139              
140 0               my $conn_id = $self->_connection_identifier();
141              
142 0 0             unless ( $_rabbitmq_connection_for_connection_id{ $conn_id } )
143                 {
144              
145             # Connect to RabbitMQ, open channel
146 0                   DEBUG( "Connecting to RabbitMQ (PID: $$, hostname: " .
147                           $self->_hostname . ", port: " . $self->_port . ", username: " . $self->_username . ")..." );
148              
149             # RabbitMQ might not yet be up at the time of connecting, so try for up to a minute
150 0                   my $mq;
151 0                   my $connected = 0;
152 0                   my $last_error_message;
153 0                   for ( my $retry = 0 ; $retry < $self->_retries ; ++$retry )
154                     {
155 0                       eval {
156 0 0                         if ( $retry > 0 )
157                             {
158 0                               DEBUG( "Retrying #$retry..." );
159                             }
160              
161 0                           $mq = Net::AMQP::RabbitMQ->new();
162 0                           $mq->connect(
163                                 $self->_hostname,
164                                 {
165                                     user => $self->_username,
166                                     password => $self->_password,
167                                     port => $self->_port,
168                                     vhost => $self->_vhost,
169                                     timeout => $self->_timeout,
170                                 }
171                             );
172                         };
173 0 0                     if ( $@ )
174                         {
175 0                           $last_error_message = $@;
176 0                           WARN( "Unable to connect to RabbitMQ, will retry: $last_error_message" );
177 0                           sleep( 1 );
178                         }
179                         else
180                         {
181 0                           $connected = 1;
182 0                           last;
183                         }
184                     }
185 0 0                 unless ( $connected )
186                     {
187 0                       LOGDIE( "Unable to connect to RabbitMQ, giving up: $last_error_message" );
188                     }
189              
190 0                   my $channel_number = _channel_number();
191 0 0                 unless ( $channel_number )
192                     {
193 0                       LOGDIE( "Channel number is unset." );
194                     }
195              
196 0                   eval {
197 0                       $mq->channel_open( $channel_number );
198              
199             # Fetch one message at a time
200 0                       $mq->basic_qos( $channel_number, { prefetch_count => 1 } );
201                     };
202 0 0                 if ( $@ )
203                     {
204 0                       LOGDIE( "Unable to open channel $channel_number: $@" );
205                     }
206              
207 0                   $_rabbitmq_connection_for_connection_id{ $conn_id } = $mq;
208 0                   $_reply_to_queues_for_connection_id_function_name{ $conn_id } = ();
209 0                   $_results_caches_for_connection_id_function_name{ $conn_id } = ();
210                 }
211              
212 0               return $_rabbitmq_connection_for_connection_id{ $conn_id };
213             }
214              
215             # Returns "reply_to" queue name for current connection and provided function name
216             sub _reply_to_queue($$)
217             {
218 0     0         my ( $self, $function_name ) = @_;
219              
220 0               my $conn_id = $self->_connection_identifier();
221              
222 0 0             unless ( defined $_reply_to_queues_for_connection_id_function_name{ $conn_id } )
223                 {
224 0                   $_reply_to_queues_for_connection_id_function_name{ $conn_id } = ();
225                 }
226              
227 0 0             unless ( $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name } )
228                 {
229 0                   my $reply_to_queue = _random_uuid();
230 0                   $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name } = $reply_to_queue;
231                 }
232              
233 0               return $_reply_to_queues_for_connection_id_function_name{ $conn_id }{ $function_name };
234             }
235              
236             # Returns reference to results cache for current connection and provided function name
237             sub _results_cache_hashref($$)
238             {
239 0     0         my ( $self, $function_name ) = @_;
240              
241 0               my $conn_id = $self->_connection_identifier();
242              
243 0 0             unless ( defined $_results_caches_for_connection_id_function_name{ $conn_id } )
244                 {
245 0                   $_results_caches_for_connection_id_function_name{ $conn_id } = ();
246                 }
247              
248 0 0             unless ( defined $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } )
249                 {
250 0                   $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } = {};
251              
252 0                   tie %{ $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name } }, 'Tie::Cache',
  0            
253                       {
254                         MaxCount => $RABBITMQ_RESULTS_CACHE_MAXCOUNT,
255                         MaxBytes => $RABBITMQ_RESULTS_CACHE_MAXBYTES
256                       };
257                 }
258              
259 0               return $_results_caches_for_connection_id_function_name{ $conn_id }{ $function_name };
260             }
261              
262             # Channel number we should be talking to
263             sub _channel_number()
264             {
265             # Each PID + credentials pair has its own connection so we can just use constant channel
266 0     0         return 1;
267             }
268              
269             sub _declare_queue($$$$;$)
270             {
271 0     0         my ( $self, $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue ) = @_;
272              
273 0 0             unless ( defined $queue_name )
274                 {
275 0                   LOGCONFESS( 'Queue name is undefined' );
276                 }
277              
278 0               my $mq = $self->_mq();
279              
280 0               my $channel_number = _channel_number();
281 0               my $options = {
282                     durable => $durable,
283                     auto_delete => 0,
284                 };
285 0 0             my $arguments = {
286                     'x-max-priority' => _priority_count(),
287                     'x-queue-mode' => ( $lazy_queue ? 'lazy' : 'default' ),
288                 };
289              
290 0               eval { $mq->queue_declare( $channel_number, $queue_name, $options, $arguments ); };
  0            
291 0 0             if ( $@ )
292                 {
293 0                   LOGDIE( "Unable to declare queue '$queue_name': $@" );
294                 }
295              
296 0 0             if ( $declare_and_bind_exchange )
297                 {
298 0                   my $exchange_name = $queue_name;
299 0                   my $routing_key = $queue_name;
300              
301 0                   eval {
302 0                       $mq->exchange_declare(
303                             $channel_number,
304                             $exchange_name,
305                             {
306                                 durable => $durable,
307                                 auto_delete => 0,
308                             }
309                         );
310 0                       $mq->queue_bind( $channel_number, $queue_name, $exchange_name, $routing_key );
311                     };
312 0 0                 if ( $@ )
313                     {
314 0                       LOGDIE( "Unable to bind queue '$queue_name' to exchange '$exchange_name': $@" );
315                     }
316                 }
317             }
318              
319             sub _declare_task_queue($$;$)
320             {
321 0     0         my ( $self, $queue_name, $lazy_queue ) = @_;
322              
323 0 0             unless ( defined $queue_name )
324                 {
325 0                   LOGCONFESS( 'Queue name is undefined' );
326                 }
327              
328 0               my $durable = $RABBITMQ_QUEUE_DURABLE;
329 0               my $declare_and_bind_exchange = 1;
330              
331 0               return $self->_declare_queue( $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue );
332             }
333              
334             sub _declare_results_queue($$;$)
335             {
336 0     0         my ( $self, $queue_name, $lazy_queue ) = @_;
337              
338 0 0             unless ( defined $queue_name )
339                 {
340 0                   LOGCONFESS( 'Queue name is undefined' );
341                 }
342              
343 0               my $durable = $RABBITMQ_QUEUE_TRANSIENT;
344 0               my $declare_and_bind_exchange = 0;
345              
346 0               return $self->_declare_queue( $queue_name, $durable, $declare_and_bind_exchange, $lazy_queue );
347             }
348              
349             sub _publish_json_message($$$;$$)
350             {
351 0     0         my ( $self, $routing_key, $payload, $extra_options, $extra_props ) = @_;
352              
353 0               my $mq = $self->_mq();
354              
355 0 0             unless ( $routing_key )
356                 {
357 0                   LOGCONFESS( 'Routing key is undefined.' );
358                 }
359 0 0             unless ( $payload )
360                 {
361 0                   LOGCONFESS( 'Payload is undefined.' );
362                 }
363              
364 0               my $payload_json;
365 0               eval { $payload_json = $json->encode( $payload ); };
  0            
366 0 0             if ( $@ )
367                 {
368 0                   LOGDIE( "Unable to encode JSON message: $@" );
369                 }
370              
371 0               my $channel_number = _channel_number();
372              
373 0               my $options = {};
374 0 0             if ( $extra_options )
375                 {
376 0                   $options = { %{ $options }, %{ $extra_options } };
  0            
  0            
377                 }
378 0               my $props = {
379                     content_type => 'application/json',
380                     content_encoding => 'utf-8',
381                 };
382 0 0             if ( $extra_props )
383                 {
384 0                   $props = { %{ $props }, %{ $extra_props } };
  0            
  0            
385                 }
386              
387 0               eval { $mq->publish( $channel_number, $routing_key, $payload_json, $options, $props ); };
  0            
388 0 0             if ( $@ )
389                 {
390 0                   LOGDIE( "Unable to publish message to routing key '$routing_key': $@" );
391                 }
392             }
393              
394             sub _random_uuid()
395             {
396             # Celery uses v4 (random) UUIDs
397 0     0         return create_uuid_as_string( UUID_RANDOM );
398             }
399              
400             sub _priority_to_int($)
401             {
402 0     0         my $priority = shift;
403              
404 0 0             unless ( exists $RABBITMQ_PRIORITIES{ $priority } )
405                 {
406 0                   LOGDIE( "Unknown job priority: $priority" );
407                 }
408              
409 0               return $RABBITMQ_PRIORITIES{ $priority };
410             }
411              
412             sub _priority_count()
413             {
414 0     0         return scalar( keys( %RABBITMQ_PRIORITIES ) );
415             }
416              
417             sub _process_worker_message($$$)
418             {
419 0     0         my ( $self, $function_name, $message ) = @_;
420              
421 0               my $mq = $self->_mq();
422              
423 0               my $correlation_id = $message->{ props }->{ correlation_id };
424 0 0             unless ( $correlation_id )
425                 {
426 0                   LOGDIE( '"correlation_id" is empty.' );
427                 }
428              
429             # "reply_to" might be empty if sending back job results is disabled via
430             # !publish_results()
431 0               my $reply_to = $message->{ props }->{ reply_to };
432              
433 0   0           my $priority = $message->{ props }->{ priority } // 0;
434              
435 0               my $delivery_tag = $message->{ delivery_tag };
436 0 0             unless ( $delivery_tag )
437                 {
438 0                   LOGDIE( "'delivery_tag' is empty." );
439                 }
440              
441 0               my $payload_json = $message->{ body };
442 0 0             unless ( $payload_json )
443                 {
444 0                   LOGDIE( 'Message payload is empty.' );
445                 }
446              
447 0               my $payload;
448 0               eval { $payload = $json->decode( $payload_json ); };
  0            
449 0 0 0           if ( $@ or ( !$payload ) or ( ref( $payload ) ne ref( {} ) ) )
      0        
450                 {
451 0                   LOGDIE( 'Unable to decode payload JSON: ' . $@ );
452                 }
453              
454 0 0             if ( $payload->{ task } ne $function_name )
455                 {
456 0                   LOGDIE( "Task name is not '$function_name'; maybe you're using same queue for multiple types of jobs?" );
457                 }
458              
459 0               my $celery_job_id = $payload->{ id };
460 0               my $args = $payload->{ kwargs };
461              
462             # Do the job
463 0               my $job_result;
464 0               eval { $job_result = $function_name->run_locally( $args, $celery_job_id ); };
  0            
465 0               my $error_message = $@;
466              
467             # If the job has failed, run_locally() has already printed the error
468             # message multiple times at this point so we don't repeat outselves
469              
470 0 0             if ( $reply_to )
471                 { # undef if !publish_results()
472              
473             # Construct response based on whether the job succeeded or failed
474 0                   my $response;
475 0 0                 if ( $error_message )
476                     {
477 0                       ERROR( "Job '$celery_job_id' died: $@" );
478 0                       $response = {
479                             'status' => 'FAILURE',
480                             'traceback' => "Job died: $error_message",
481                             'result' => {
482                                 'exc_message' => 'Task has failed',
483                                 'exc_type' => 'Exception',
484                             },
485                             'task_id' => $celery_job_id,
486                             'children' => []
487                         };
488                     }
489                     else
490                     {
491 0                       $response = {
492                             'status' => 'SUCCESS',
493                             'traceback' => undef,
494                             'result' => $job_result,
495                             'task_id' => $celery_job_id,
496                             'children' => []
497                         };
498                     }
499              
500             # Send message back with the job result
501 0                   eval {
502 0                       $self->_declare_results_queue( $reply_to, $function_name->lazy_queue() );
503 0                       $self->_publish_json_message(
504                             $reply_to,
505                             $response,
506                             {
507             # Options
508                             },
509                             {
510             # Properties
511                                 delivery_mode => $RABBITMQ_DELIVERY_MODE_NONPERSISTENT,
512                                 priority => $priority,
513                                 correlation_id => $celery_job_id,
514                             }
515                         );
516                     };
517 0 0                 if ( $@ )
518                     {
519 0                       LOGDIE( "Unable to publish job $celery_job_id result: $@" );
520                     }
521                 }
522              
523             # ACK the message (mark the job as completed)
524 0               eval { $mq->ack( _channel_number(), $delivery_tag ); };
  0            
525 0 0             if ( $@ )
526                 {
527 0                   LOGDIE( "Unable to mark job $celery_job_id as completed: $@" );
528                 }
529             }
530              
531             sub start_worker($$)
532             {
533 0     0 0       my ( $self, $function_name ) = @_;
534              
535 0               my $mq = $self->_mq();
536              
537 0               $self->_declare_task_queue( $function_name, $function_name->lazy_queue() );
538              
539 0               my $consume_options = {
540              
541             # Don't assume that the job is finished when it reaches the worker
542                     no_ack => 0,
543                 };
544 0               my $consumer_tag = $mq->consume( _channel_number(), $function_name, $consume_options );
545              
546 0               INFO( "Consumer tag: $consumer_tag" );
547 0               INFO( "Worker is ready and accepting jobs" );
548 0               my $recv_timeout = 0; # block until message is received
549 0               while ( my $message = $mq->recv( 0 ) )
550                 {
551 0                   $self->_process_worker_message( $function_name, $message );
552                 }
553             }
554              
555             sub run_job_sync($$$$)
556             {
557 0     0 0       my ( $self, $function_name, $args, $priority ) = @_;
558              
559 0               my $mq = $self->_mq();
560              
561             # Post the job
562 0               my $publish_results = 1; # always publish results when running synchronously
563 0               my $celery_job_id = $self->_run_job_on_rabbitmq( $function_name, $args, $priority, $publish_results );
564              
565             # Declare result queue (ignore function's publish_results())
566 0               my $reply_to_queue = $self->_reply_to_queue( $function_name );
567 0               eval { $self->_declare_results_queue( $reply_to_queue, $function_name->lazy_queue() ); };
  0            
568 0 0             if ( $@ )
569                 {
570 0                   LOGDIE( "Unable to declare results queue '$reply_to_queue': $@" );
571                 }
572              
573 0               my $results_cache = $self->_results_cache_hashref( $function_name );
574              
575 0               my $message;
576 0 0             if ( exists $results_cache->{ $celery_job_id } )
577                 {
578             # Result for this job ID was fetched previously -- return from cache
579 0                   DEBUG( "Results message for job ID '$celery_job_id' found in cache" );
580 0                   $message = $results_cache->{ $celery_job_id };
581 0                   delete $results_cache->{ $celery_job_id };
582              
583                 }
584                 else
585                 {
586             # Result not yet fetched -- process the result queue
587              
588 0                   my $channel_number = _channel_number();
589 0                   my $consume_options = {};
590 0                   my $consumer_tag = $mq->consume( $channel_number, $reply_to_queue, $consume_options );
591              
592 0                   my $recv_timeout = 0; # block until message is received
593              
594 0                   while ( my $queue_message = $mq->recv( 0 ) )
595                     {
596 0                       my $correlation_id = $queue_message->{ props }->{ correlation_id };
597 0 0                     unless ( $correlation_id )
598                         {
599 0                           LOGDIE( '"correlation_id" is empty.' );
600                         }
601              
602 0 0                     if ( $correlation_id eq $celery_job_id )
603                         {
604 0                           DEBUG( "Found results message with job ID '$celery_job_id'." );
605 0                           $message = $queue_message;
606 0                           last;
607              
608                         }
609                         else
610                         {
611             # Message belongs to some other job -- add to cache and continue
612 0                           DEBUG( "Results message '$correlation_id' does not belong to job ID '$celery_job_id'." );
613 0                           $results_cache->{ $correlation_id } = $queue_message;
614                         }
615                     }
616                 }
617              
618 0 0             unless ( $message )
619                 {
620 0                   LOGDIE( "At this point, message should have been fetched either from broker or from cache" );
621                 }
622              
623 0               my $correlation_id = $message->{ props }->{ correlation_id };
624 0 0             unless ( $correlation_id )
625                 {
626 0                   LOGDIE( '"correlation_id" is empty.' );
627                 }
628 0 0             if ( $correlation_id ne $celery_job_id )
629                 {
630             # Message belongs to some other job -- requeue and skip
631 0                   DEBUG( "'correlation_id' ('$correlation_id') is not equal to job ID ('$celery_job_id')." );
632 0                   next;
633                 }
634              
635 0               my $payload_json = $message->{ body };
636 0 0             unless ( $payload_json )
637                 {
638 0                   LOGDIE( 'Message payload is empty.' );
639                 }
640              
641 0               my $payload;
642 0               eval { $payload = $json->decode( $payload_json ); };
  0            
643 0 0 0           if ( $@ or ( !$payload ) or ( ref( $payload ) ne ref( {} ) ) )
      0        
644                 {
645 0                   LOGDIE( 'Unable to decode payload JSON: ' . $@ );
646                 }
647              
648 0 0             if ( $payload->{ task_id } ne $celery_job_id )
649                 {
650 0                   LOGDIE( "'task_id' ('$payload->{ task_id }') is not equal to job ID ('$celery_job_id')." );
651                 }
652              
653             # Return job result
654 0 0             if ( $payload->{ status } eq 'SUCCESS' )
    0          
655                 {
656             # Job completed successfully
657 0                   return $payload->{ result };
658              
659                 }
660                 elsif ( $payload->{ status } eq 'FAILURE' )
661                 {
662             # Job failed -- pass the failure to the caller
663 0                   LOGDIE( "Job '$celery_job_id' failed: " . $payload->{ traceback } );
664              
665                 }
666                 else
667                 {
668             # Unknown value
669 0                   LOGDIE( "Unknown 'status' value: " . $payload->{ status } );
670                 }
671             }
672              
673             sub run_job_async($$$$)
674             {
675 0     0 0       my ( $self, $function_name, $args, $priority ) = @_;
676              
677 0               return $self->_run_job_on_rabbitmq( $function_name, $args, $priority, $function_name->publish_results() );
678             }
679              
680             sub _run_job_on_rabbitmq($$$$$)
681             {
682 0     0         my ( $self, $function_name, $args, $priority, $publish_results ) = @_;
683              
684 0 0             unless ( defined( $args ) )
685                 {
686 0                   $args = {};
687                 }
688 0 0             unless ( ref( $args ) eq ref( {} ) )
689                 {
690 0                   LOGDIE( "'args' is not a hashref." );
691                 }
692              
693 0               my $celery_job_id = create_uuid_as_string( UUID_RANDOM );
694              
695             # Encode payload
696 0               my $payload = {
697                     'expires' => undef,
698                     'utc' => JSON::true,
699                     'args' => [],
700                     'chord' => undef,
701                     'callbacks' => undef,
702                     'errbacks' => undef,
703                     'taskset' => undef,
704                     'id' => $celery_job_id,
705                     'retries' => $function_name->retries(),
706                     'task' => $function_name,
707                     'timelimit' => [ undef, undef, ],
708                     'eta' => undef,
709                     'kwargs' => $args,
710                 };
711              
712             # Declare task queue
713 0               $self->_declare_task_queue( $function_name, $function_name->lazy_queue() );
714              
715 0               my $reply_to_queue;
716 0 0             if ( $publish_results )
717                 {
718             # Declare result queue before posting a job (just like Celery does)
719 0                   $reply_to_queue = $self->_reply_to_queue( $function_name );
720 0                   $self->_declare_results_queue( $reply_to_queue, $function_name->lazy_queue() );
721                 }
722                 else
723                 {
724 0                   $reply_to_queue = ''; # undef doesn't work with Net::AMQP::RabbitMQ
725                 }
726              
727             # Post a job
728 0               eval {
729 0                   my $rabbitmq_priority = _priority_to_int( $priority );
730 0                   $self->_publish_json_message(
731                         $function_name,
732                         $payload,
733                         {
734             # Options
735                             exchange => $function_name
736                         },
737                         {
738             # Properties
739                             delivery_mode => $RABBITMQ_DELIVERY_MODE_PERSISTENT,
740                             priority => $rabbitmq_priority,
741                             correlation_id => $celery_job_id,
742                             reply_to => $reply_to_queue,
743                         }
744                     );
745                 };
746 0 0             if ( $@ )
747                 {
748 0                   LOGDIE( "Unable to add job '$celery_job_id' to queue: $@" );
749                 }
750              
751 0               return $celery_job_id;
752             }
753              
754             sub job_id_from_handle($$)
755             {
756 0     0 0       my ( $self, $job_handle ) = @_;
757              
758 0               return $job_handle;
759             }
760              
761             sub set_job_progress($$$$)
762             {
763 0     0 0       my ( $self, $job, $numerator, $denominator ) = @_;
764              
765 0               LOGDIE( "FIXME not implemented." );
766             }
767              
768             sub job_status($$$)
769             {
770 0     0 0       my ( $self, $function_name, $job_id ) = @_;
771              
772 0               LOGDIE( "FIXME not implemented." );
773             }
774              
775             sub show_jobs($)
776             {
777 0     0 0       my $self = shift;
778              
779 0               LOGDIE( "FIXME not implemented." );
780             }
781              
782             sub cancel_job($)
783             {
784 0     0 0       my ( $self, $job_id ) = @_;
785              
786 0               LOGDIE( "FIXME not implemented." );
787             }
788              
789             sub server_status($$)
790             {
791 0     0 0       my $self = shift;
792              
793 0               LOGDIE( "FIXME not implemented." );
794             }
795              
796             sub workers($)
797             {
798 0     0 0       my $self = shift;
799              
800 0               LOGDIE( "FIXME not implemented." );
801             }
802              
803 1     1   19 no Moose; # gets rid of scaffolding
  1         3  
  1         16  
804              
805             1;
806