File Coverage

lib/Redis/JobQueue.pm
Criterion Covered Total %
statement 92 385 23.9
branch 0 156 0.0
condition 0 91 0.0
subroutine 31 72 43.0
pod 15 15 100.0
total 138 719 19.1


line stmt bran cond sub pod time code
1             package Redis::JobQueue;
2              
3             =head1 NAME
4              
5             Redis::JobQueue - Job queue management implemented using Redis server.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.18
10              
11             =cut
12              
13             #-- Pragmas --------------------------------------------------------------------
14              
15 62     62   5401845 use 5.010;
  62         144  
16 62     62   262 use strict;
  62         65  
  62         1019  
17 62     62   188 use warnings;
  62         73  
  62         2178  
18              
19             # ENVIRONMENT ------------------------------------------------------------------
20              
21             our $VERSION = '1.18';
22              
23 62         2523 use Exporter qw(
24             import
25 62     62   233 );
  62         74  
26             our @EXPORT_OK = qw(
27             DEFAULT_SERVER
28             DEFAULT_PORT
29             DEFAULT_TIMEOUT
30             NAMESPACE
31             E_NO_ERROR
32             E_MISMATCH_ARG
33             E_DATA_TOO_LARGE
34             E_NETWORK
35             E_MAX_MEMORY_LIMIT
36             E_JOB_DELETED
37             E_REDIS
38             );
39              
40             #-- load the modules -----------------------------------------------------------
41              
42 62     62   199 use Carp;
  62         74  
  62         2848  
43 62     62   23302 use Data::UUID;
  62         27547  
  62         3033  
44 62         2784 use Digest::SHA1 qw(
45             sha1_hex
46 62     62   23250 );
  62         26727  
47 62         4490 use List::Util qw(
48             min
49             shuffle
50 62     62   300 );
  62         79  
51 62         330 use List::MoreUtils qw(
52             firstidx
53 62     62   23793 );
  62         399070  
54 62     62   26838 use Mouse;
  62         38818  
  62         381  
55 62     62   18536 use Mouse::Util::TypeConstraints;
  62         66  
  62         495  
56 62         4030 use Params::Util qw(
57             _ARRAY0
58             _INSTANCE
59             _NONNEGINT
60             _STRING
61 62     62   30468 );
  62         108572  
62 62     62   35845 use Redis '1.976';
  62         1110175  
  62         1871  
63 62         3884 use Redis::JobQueue::Job qw(
64             STATUS_CREATED
65             STATUS_WORKING
66             STATUS_COMPLETED
67             STATUS_FAILED
68 62     62   20797 );
  62         5517  
69 62         2914 use Redis::JobQueue::Util qw(
70             format_message
71 62     62   17941 );
  62         99  
72 62         3426 use Storable qw(
73             nfreeze
74             thaw
75 62     62   34005 );
  62         143889  
76 62     62   336 use Try::Tiny;
  62         70  
  62         4127  
77              
78             #-- declarations ---------------------------------------------------------------
79              
80             =head1 SYNOPSIS
81              
82             use 5.010;
83             use strict;
84             use warnings;
85              
86             #-- Common
87             use Redis::JobQueue qw(
88             DEFAULT_SERVER
89             DEFAULT_PORT
90             );
91              
92             my $connection_string = DEFAULT_SERVER.':'.DEFAULT_PORT;
93             my $jq = Redis::JobQueue->new( redis => $connection_string );
94              
95             #-- Producer
96             my $job = $jq->add_job(
97             {
98             queue => 'xxx',
99             workload => \'Some stuff',
100             expire => 12*60*60, # 12h,
101             }
102             );
103              
104             #-- Worker
105             sub xxx {
106             my $job = shift;
107              
108             my $workload = ${ $job->workload };
109             # do something with workload;
110              
111             $job->result( 'XXX JOB result comes here' );
112             }
113              
114             while ( $job = $jq->get_next_job(
115             queue => 'xxx',
116             blocking => 1,
117             ) ) {
118             $job->status( 'working' );
119             $jq->update_job( $job );
120              
121             # do my stuff
122             xxx( $job );
123              
124             $job->status( 'completed' );
125             $jq->update_job( $job );
126             }
127              
128             #-- Consumer
129             my $id = $ARGV[0];
130             my $status = $jq->get_job_data( $id, 'status' );
131              
132             if ( $status eq 'completed' ) {
133             # it is now safe to remove it from JobQueue, since it's completed
134             my $job = $jq->load_job( $id );
135              
136             $jq->delete_job( $id );
137             say 'Job result: ', ${ $job->result };
138             } else {
139             say "Job is not complete, has current '$status' status";
140             }
141              
142             To see a brief but working code example of the C
143             package usage look at the L section.
144              
145             Description of the used by C data
146             structures (on Redis server) is provided in L section.
147              
148             =head1 ABSTRACT
149              
150             The C package is a set of Perl modules which
151             allows creation of a simple job queue based on Redis server capabilities.
152              
153             =head1 DESCRIPTION
154              
155             The main features of the package are:
156              
157             =over 3
158              
159             =item *
160              
161             Supports the automatic creation of job queues, job status monitoring,
162             updating the job data set, obtaining a consistent job from the queue,
163             removing jobs, and the classification of possible errors.
164              
165             =item *
166              
167             Contains various reusable components that can be used separately or together.
168              
169             =item *
170              
171             Provides an object oriented API.
172              
173             =item *
174              
175             Support of storing arbitrary job-related data structures.
176              
177             =item *
178              
179             Simple methods for organizing producer, worker, and consumer clients.
180              
181             =back
182              
183             =head3 Atributes
184              
185             =over
186              
187             =item C
188              
189             An id that uniquely identifies the job, scalar.
190              
191             =item C
192              
193             Queue name in which job is placed, scalar.
194              
195             =item C
196              
197             For how long (seconds) job data structures will be kept in memory.
198              
199             =item C
200              
201             Job status, scalar. See L L section
202             for the list of pre-defined statuses.
203             Can be also set to any arbitrary value.
204              
205             =item C, C
206              
207             User-set data structures which will be serialized before stored in Redis server.
208             Suitable for passing large amounts of data.
209              
210             =item C<*>
211              
212             Any other custom-named field passed to L or L method
213             will be stored as metadata scalar in Redis server.
214             Suitable for storing scalar values with fast access
215             (will be serialized before stored in Redis server).
216              
217             =back
218              
219             =cut
220              
221             =head2 EXPORT
222              
223             None by default.
224              
225             The following additional constants, defining defaults for various parameters, are available for export:
226              
227             =over
228              
229             =item C
230              
231             Maximum size of the data stored in C, C: 512MB.
232              
233             =cut
234 62     62   244 use constant MAX_DATASIZE => 512*1024*1024; # A String value can be at max 512 Megabytes in length.
  62         66  
  62         3621  
235              
236             =item C
237              
238             Default address of the Redis server - C<'localhost'>.
239              
240             =cut
241 62     62   282 use constant DEFAULT_SERVER => 'localhost';
  62         82  
  62         2854  
242              
243             =item C
244              
245             Default port of the Redis server - 6379.
246              
247             =cut
248 62     62   218 use constant DEFAULT_PORT => 6379;
  62         66  
  62         2514  
249              
250             =item C
251              
252             Maximum time (in seconds) to wait for a new job from the queue,
253             0 - unlimited.
254              
255             =cut
256 62     62   214 use constant DEFAULT_TIMEOUT => 0; # 0 for an unlimited timeout
  62         57  
  62         2200  
257              
258             =item C
259              
260             Namespace used for keys on the Redis server - C<'JobQueue'>.
261              
262             =cut
263 62     62   214 use constant NAMESPACE => 'JobQueue';
  62         62  
  62         2266  
264              
265             =item Error codes are identified
266              
267             More details about error codes are provided in L section.
268              
269             =back
270              
271             Possible error codes:
272              
273             =cut
274              
275             =over 3
276              
277             =item C
278              
279             0 - No error
280              
281             =cut
282 62     62   210 use constant E_NO_ERROR => 0;
  62         61  
  62         2120  
283              
284             =item C
285              
286             1 - Invalid argument of C or other L.
287              
288             =cut
289 62     62   193 use constant E_MISMATCH_ARG => 1;
  62         109  
  62         2283  
290              
291             =item C
292              
293             2 - Provided data is too large.
294              
295             =cut
296 62     62   206 use constant E_DATA_TOO_LARGE => 2;
  62         86  
  62         2626  
297              
298             =item C
299              
300             3 - Error connecting to Redis server.
301              
302             =cut
303 62     62   209 use constant E_NETWORK => 3;
  62         61  
  62         2502  
304              
305             =item C
306              
307             4 - Command failed because its execution requires more than allowed memory, set in C.
308              
309             =cut
310 62     62   221 use constant E_MAX_MEMORY_LIMIT => 4;
  62         82  
  62         2196  
311              
312             =item C
313              
314             5 - Job's data was removed.
315              
316             =cut
317 62     62   189 use constant E_JOB_DELETED => 5;
  62         55  
  62         2086  
318              
319             =item C
320              
321             6 - Other error on Redis server.
322              
323             =back
324              
325             =cut
326 62     62   202 use constant E_REDIS => 6;
  62         74  
  62         233212  
327              
328             our @ERROR = (
329             'No error',
330             'Invalid argument',
331             'Data is too large',
332             'Error in connection to Redis server',
333             "Command not allowed when used memory > 'maxmemory'",
334             'job was removed by maxmemory-policy',
335             'job was removed prior to use',
336             'Redis error',
337             );
338              
339             my $_ID_IN_QUEUE_FIELD = '__id_in_queue__';
340              
341             my $NAMESPACE = NAMESPACE;
342              
343             my @job_fields = Redis::JobQueue::Job->job_attributes; # sorted list
344             splice @job_fields, ( firstidx { $_ eq 'meta_data' } @job_fields ), 1;
345             my %job_fnames = map { $_ => 1 } @job_fields;
346              
347             my $uuid = new Data::UUID;
348              
349             my %lua_script_body;
350             my $lua_id_rgxp = '([^:]+)$';
351              
352             my $lua_body_start = <<"END_BODY_START";
353             local job_id = ARGV[1]
354             local data_fields = { unpack( ARGV, 2 ) }
355              
356             local job_key = '${NAMESPACE}:'..job_id
357             local job_exists = redis.call( 'EXISTS', job_key )
358             local job_data = {}
359              
360             if job_exists == 1 then
361             END_BODY_START
362              
363             my $lua_body_end = <<"END_BODY_END";
364             end
365             return { job_exists, unpack( job_data ) }
366             END_BODY_END
367              
368             # Deletes the job data in Redis server
369             $lua_script_body{delete_job} = <<"END_DELETE_JOB";
370             local job_id = ARGV[1]
371              
372             local job_key = '${NAMESPACE}:'..job_id
373             if redis.call( 'EXISTS', job_key ) == 1 then
374             redis.call( 'LREM', '${NAMESPACE}:queue:'..redis.call( 'HGET', job_key, 'queue' ), 0, redis.call( 'HGET', job_key, '${_ID_IN_QUEUE_FIELD}' ) )
375             return redis.call( 'DEL', job_key )
376             else
377             return nil
378             end
379             END_DELETE_JOB
380              
381             # Adds a job to the queue on the Redis server
382             $lua_script_body{load_job} = <<"END_LOAD_JOB";
383             $lua_body_start
384              
385             for _, field in ipairs( redis.call( 'HKEYS', job_key ) ) do
386             if field ~= '${_ID_IN_QUEUE_FIELD}' then
387             -- return the field names and values for the data fields
388             table.insert( job_data, field )
389             table.insert( job_data, redis.call( 'HGET', job_key, field ) )
390             end
391             end
392              
393             $lua_body_end
394             END_LOAD_JOB
395              
396             # Data of the job is requested from the Redis server
397             $lua_script_body{get_job_data} = <<"END_GET_JOB_DATA";
398             $lua_body_start
399              
400             for _, field in ipairs( data_fields ) do
401             table.insert( job_data, redis.call( 'HGET', job_key, field ) )
402             end
403              
404             $lua_body_end
405             END_GET_JOB_DATA
406              
407             # Gets queue status from the Redis server
408             $lua_script_body{queue_status} = <<"END_QUEUE_STATUS";
409             local queue = ARGV[1]
410             local tm = tonumber( ARGV[2] ) -- a floating seconds since the epoch
411              
412             local queue_key = '${NAMESPACE}:queue:'..queue
413             local queue_status = {}
414              
415             -- if it is necessary to determine the status of a particular queue
416             if queue then
417             -- Queue length
418             queue_status[ 'length' ] = redis.call( 'LLEN', queue_key )
419             -- if the queue is set
420             if queue_status[ 'length' ] > 0 then
421             -- for each item in the queue
422             for _, in_queue_id in ipairs( redis.call( 'LRANGE', queue_key, 0, -1 ) ) do
423             -- select the job ID and determine the value of a field 'created'
424             local created = redis.call( 'HGET', '${NAMESPACE}:'..in_queue_id:match( '^(%S+)' ), 'created' )
425             if created then
426             created = tonumber( created ) -- values are stored as strings (a floating seconds since the epoch)
427             -- initialize the calculated values
428             if not queue_status[ 'max_job_age' ] then
429             queue_status[ 'max_job_age' ] = 0
430             queue_status[ 'min_job_age' ] = 0
431             end
432             -- time of birth
433             if queue_status[ 'max_job_age' ] == 0 or created < queue_status[ 'max_job_age' ] then
434             queue_status[ 'max_job_age' ] = created
435             end
436             if queue_status[ 'min_job_age' ] == 0 or created > queue_status[ 'min_job_age' ] then
437             queue_status[ 'min_job_age' ] = created
438             end
439             end
440             end
441              
442             -- time of birth -> age
443             if queue_status[ 'max_job_age' ] then -- queue_status[ 'min_job_age' ] also ~= 0
444             -- The age of the old job (the lifetime of the queue)
445             queue_status[ 'max_job_age' ] = tm - queue_status[ 'max_job_age' ]
446             -- The age of the younger job
447             queue_status[ 'min_job_age' ] = tm - queue_status[ 'min_job_age' ]
448             -- Duration of queue activity (the period during which new jobs were created)
449             queue_status[ 'lifetime' ] = queue_status[ 'max_job_age' ] - queue_status[ 'min_job_age' ]
450             end
451             end
452             end
453              
454             -- all jobs in the queue, including inactive ones
455             queue_status[ 'all_jobs' ] = 0
456             -- review all job keys on the server, including inactive ones
457             for _, key in ipairs( redis.call( 'KEYS', '${NAMESPACE}:*' ) ) do
458             -- counting on the basic structures of jobs
459             if key:find( '^${NAMESPACE}:${lua_id_rgxp}' ) then
460             -- consider only the structure related to a given queue
461             if redis.call( 'HGET', key, 'queue' ) == queue then
462             queue_status[ 'all_jobs' ] = queue_status[ 'all_jobs' ] + 1
463             end
464             end
465             end
466              
467             local result_status = {}
468             -- memorize the names and values of what was possible to calculate
469             for key, val in pairs( queue_status ) do
470             table.insert( result_status, key )
471             table.insert( result_status, tostring( val ) )
472             end
473              
474             return result_status
475             END_QUEUE_STATUS
476              
477             # Gets a list of job IDs on the Redis server
478             $lua_script_body{get_job_ids} = <<"END_GET_JOB_IDS";
479             local is_queued = tonumber( ARGV[1] )
480             local queues = { unpack( ARGV, 3, 2 + ARGV[2] ) }
481             local statuses = { unpack( ARGV, 3 + ARGV[2] ) }
482              
483             local tmp_ids = {}
484              
485             if is_queued == 1 then -- jobs in the queues
486             -- if not limited to specified queues
487             if #queues == 0 then
488             -- determine the queues still have not served the job
489             for _, queue_key in ipairs( redis.call( 'KEYS', '${NAMESPACE}:queue:*' ) ) do
490             table.insert( queues, queue_key:match( '${lua_id_rgxp}' ) )
491             end
492             end
493             -- view each specified queue
494             for _, queue in ipairs( queues ) do
495             -- for each identifier contained in the queue
496             for _, in_queue_id in ipairs( redis.call( 'LRANGE', '${NAMESPACE}:queue:'..queue, 0, -1 ) ) do
497             -- distinguish and remember the ID of the job
498             tmp_ids[ in_queue_id:match( '^(%S+)' ) ] = 1
499             end
500             end
501             else -- all jobs on the server
502             local all_job_ids = {}
503             for _, key in ipairs( redis.call( 'KEYS', '${NAMESPACE}:*' ) ) do
504             -- considering only the basic structure of jobs
505             if key:find( '^${NAMESPACE}:${lua_id_rgxp}' ) then
506             -- forming a "hash" of jobs IDs
507             all_job_ids[ key:match( '${lua_id_rgxp}' ) ] = 1
508             end
509             end
510             -- if a restriction is set on the queues
511             if #queues > 0 then
512             -- analyze each job on a server
513             for job_id, _ in pairs( all_job_ids ) do
514             -- get the name of the job queue
515             local tmp_queue = redis.call( 'HGET', '${NAMESPACE}:'..job_id, 'queue' )
516             -- associate job queue name with the names of the queues from the restriction
517             for _, queue in ipairs( queues ) do
518             if tmp_queue == queue then
519             -- memorize the appropriate job ID
520             tmp_ids[ job_id ] = 1
521             break
522             end
523             end
524             end
525             else
526             -- if there is no restriction on the queues, then remember all the jobs on the server
527             tmp_ids = all_job_ids
528             end
529             end
530              
531             -- if the restriction is set by the statuses
532             if #statuses > 0 then
533             -- analyze each job from a previously created "hash"
534             for job_id, _ in pairs( tmp_ids ) do
535             -- determine the current status of the job
536             local job_status = redis.call( 'HGET', '${NAMESPACE}:'..job_id, 'status' )
537             -- associate job status with the statuses from the restriction
538             for _, status in ipairs( statuses ) do
539             if job_status == status then
540             -- mark the job satisfying the restriction
541             tmp_ids[ job_id ] = 2 -- Filter by status sign
542             break
543             end
544             end
545             end
546             -- reanalyze each job from a previously created "hash"
547             for job_id, _ in pairs( tmp_ids ) do
548             if tmp_ids[ job_id ] ~= 2 then
549             -- remove unfiltered by status
550             tmp_ids[ job_id ] = nil
551             end
552             end
553             end
554              
555             local job_ids = {}
556             for id, _ in pairs( tmp_ids ) do
557             -- remember identifiers of selected jobs
558             table.insert( job_ids, id )
559             end
560              
561             return job_ids
562             END_GET_JOB_IDS
563              
564             #-- constructor ----------------------------------------------------------------
565              
566             =head2 CONSTRUCTOR
567              
568             =head3 C $server, timeout =E $timeout, check_maxmemory =E $mode )>
569              
570             Creates a new C object to communicate with Redis server.
571             If invoked without any arguments, the constructor C creates and returns
572             a C object that is configured with the default settings and uses
573             local C server.
574              
575             In the parameters of the constructor 'redis' may be either a Redis object
576             or a server string or a hash reference of parameters to create a Redis object.
577              
578             Optional C boolean argument (default is true)
579             defines if attempt is made to find out maximum available memory from Redis.
580              
581             In some cases Redis implementation forbids such request,
582             but setting to false can be used as a workaround.
583              
584             =head3 Caveats related to connection with Redis server
585              
586             =over 3
587              
588             =item *
589              
590             According to L documentation:
591              
592             This module consider that any data sent to the Redis server is a raw octets string,
593             even if it has utf8 flag set.
594             And it doesn't do anything when getting data from the Redis server.
595              
596             =item *
597              
598             Non-serialize-able fields (like status or message) passed in UTF-8 can not be
599             correctly restored from Redis server. To avoid potential data corruction, passing
600             UTF-8 encoded value causes error.
601              
602             =item *
603              
604             L value is used when a L class object is
605             passed to the C constructor without additional C argument.
606              
607             =back
608              
609             This example illustrates C call with all possible arguments:
610              
611             my $jq = Redis::JobQueue->new(
612             redis => "$server:$port", # Connection info for Redis which hosts queue
613             timeout => $timeout, # wait time (in seconds)
614             # for blocking call of get_next_job.
615             # Set 0 for unlimited wait time
616             );
617             # or
618             my $jq = Redis::JobQueue->new(
619             redis => { # The hash reference of parameters
620             # to create a Redis object
621             server => "$server:$port",
622             },
623             );
624              
625             The following examples illustrate other uses of the C method:
626              
627             $jq = Redis::JobQueue->new();
628             my $next_jq = Redis::JobQueue->new( $jq );
629              
630             my $redis = Redis->new( server => "$server:$port" );
631             $next_jq = Redis::JobQueue->new(
632             $redis,
633             timeout => $timeout,
634             );
635             # or
636             $next_jq = Redis::JobQueue->new(
637             redis => $redis,
638             timeout => $timeout,
639             );
640              
641             An invalid argument causes die (C).
642              
643             =cut
644             around BUILDARGS => sub {
645             my $orig = shift;
646             my $class = shift;
647              
648             if ( _INSTANCE( $_[0], 'Redis' ) ) {
649             my $redis = shift;
650             return $class->$orig(
651             # have to look into the Redis object ...
652             redis => $redis->{server},
653             # it is impossible to know from Redis now ...
654             #timeout => $redis->???,
655             _redis => $redis,
656             @_
657             );
658             } elsif ( _INSTANCE( $_[0], 'Test::RedisServer' ) ) {
659             # to test only
660             my $redis = shift;
661             # have to look into the Test::RedisServer object ...
662             my $conf = $redis->conf;
663             $conf->{server} = '127.0.0.1:'.$conf->{port} unless exists $conf->{server};
664             return $class->$orig(
665             redis => $conf->{server},
666             _redis => Redis->new( %$conf ),
667             @_
668             );
669             } elsif ( _INSTANCE( $_[0], __PACKAGE__ ) ) {
670             my $jq = shift;
671             return $class->$orig(
672             redis => $jq->_server,
673             _redis => $jq->_redis,
674             timeout => $jq->timeout,
675             @_
676             );
677             } else {
678             my %args = @_;
679             my $redis = $args{redis};
680             if ( _INSTANCE( $redis, 'Redis' ) ) {
681             delete $args{redis};
682             return $class->$orig(
683             # have to look into the Redis object ...
684             redis => $redis->{server},
685             # it is impossible to know from Redis now ...
686             #timeout => $redis->???,
687             _redis => $redis,
688             %args,
689             );
690             }
691             elsif ( ref( $redis ) eq 'HASH' ) {
692             $args{_use_external_connection} = 0;
693             my $conf = $redis;
694             $conf->{server} = DEFAULT_SERVER.':'.DEFAULT_PORT unless exists $conf->{server};
695             delete $args{redis};
696             return $class->$orig(
697             redis => $conf->{server},
698             _redis => Redis->new( %$conf ),
699             %args,
700             );
701             } else {
702             return $class->$orig( %args );
703             }
704             }
705             };
706              
707             sub BUILD {
708 0     0 1   my ( $self ) = @_;
709              
710 0 0         $self->_redis( $self->_redis_constructor )
711             unless ( $self->_redis );
712 0 0         if ( $self->_check_maxmemory ) {
713 0           my ( undef, $max_datasize ) = $self->_call_redis( 'CONFIG', 'GET', 'maxmemory' );
714 0 0         defined( _NONNEGINT( $max_datasize ) )
715             or die $self->_error( E_NETWORK );
716 0 0         $self->max_datasize( min $max_datasize, $self->max_datasize )
717             if $max_datasize;
718             }
719              
720 0           my ( $major, $minor ) = $self->_redis->info->{redis_version} =~ /^(\d+)\.(\d+)/;
721 0 0 0       if ( $major < 2 || $major == 2 && $minor <= 4 ) {
      0        
722 0           $self->_error( E_REDIS );
723 0           confess 'Needs Redis server version 2.6 or higher';
724             }
725             }
726              
727             #-- public attributes ----------------------------------------------------------
728              
729             =head2 METHODS
730              
731             The following methods are available for object of the C class:
732              
733             =cut
734              
735             =head3 C
736              
737             Accessor to the C attribute.
738              
739             Returns current value of the attribute if called without an argument.
740              
741             Non-negative integer value can be used to specify a new value of the maximum
742             waiting time for queue (of the L method). Use
743             C = 0 for an unlimited wait time.
744              
745             =cut
746             has 'timeout' => (
747             is => 'rw',
748             isa => 'Redis::JobQueue::Job::NonNegInt',
749             default => DEFAULT_TIMEOUT,
750             );
751              
752             =head3 C
753              
754             Provides access to the C attribute.
755              
756             Returns current value of the attribute if called with no argument.
757              
758             Non-negative integer value can be used to specify a new value for the maximum
759             size of data in the attributes of a
760             L object.
761              
762             The check is done before sending data to the module L,
763             after possible processing by methods of module L
764             (attributes L, L
765             and L).
766             It is automatically serialized.
767              
768             The C attribute value is used in the L
769             and data entry job operations on the Redis server.
770              
771             The L uses the smaller of the values of 512MB and
772             C limit from a F file.
773              
774             =cut
775             has 'max_datasize' => (
776             is => 'rw',
777             isa => 'Redis::JobQueue::Job::NonNegInt',
778             default => MAX_DATASIZE,
779             );
780              
781             =head3 C
782              
783             Returns the code of the last identified error.
784              
785             See L section for description of error codes.
786              
787             =cut
788             has 'last_errorcode' => (
789             reader => 'last_errorcode',
790             writer => '_set_last_errorcode',
791             isa => 'Int',
792             default => 0,
793             );
794              
795             #-- private attributes ---------------------------------------------------------
796              
797             has '_server' => (
798             is => 'rw',
799             init_arg => 'redis',
800             isa => 'Str',
801             default => DEFAULT_SERVER.':'.DEFAULT_PORT,
802             trigger => sub {
803             my $self = shift;
804             $self->_server( $self->_server.':'.DEFAULT_PORT )
805             unless $self->_server =~ /:/;
806             },
807             );
808              
809             has '_redis' => (
810             is => 'rw',
811             isa => 'Maybe[Redis]',
812             default => undef,
813             );
814              
815             has '_transaction' => (
816             is => 'rw',
817             isa => 'Bool',
818             default => undef,
819             );
820              
821             has '_lua_scripts' => (
822             is => 'ro',
823             isa => 'HashRef[Str]',
824             lazy => 1,
825             init_arg => undef,
826             builder => sub { return {}; },
827             );
828              
829             has '_check_maxmemory' => (
830             is => 'ro',
831             init_arg => 'check_maxmemory',
832             isa => 'Bool',
833             default => 1,
834             );
835              
836             has '_use_external_connection' => (
837             is => 'rw',
838             isa => 'Bool',
839             default => 1,
840             );
841              
842             #-- public methods -------------------------------------------------------------
843              
844             =head3 C
845              
846             Returns error message of the last identified error.
847              
848             See L section for more info on errors.
849              
850             =cut
851             sub last_error {
852 0     0 1   my ( $self ) = @_;
853              
854 0           return $ERROR[ $self->last_errorcode ];
855             }
856              
857             =head3 C 1 )>
858              
859             Adds a job to the queue on the Redis server.
860              
861             The first argument should be either L
862             object (which is modified by the method) or a reference to a hash representing
863             L - in the latter case
864             a new L object is created.
865              
866             Returns a L object with a new
867             unique identifier.
868              
869             Job status is set to L.
870              
871             C optionally takes arguments in key-value pairs.
872              
873             The following example illustrates a C call with all possible arguments:
874              
875             my $job_data = {
876             id => '4BE19672-C503-11E1-BF34-28791473A258',
877             queue => 'lovely_queue', # required
878             job => 'strong_job', # optional attribute
879             expire => 12*60*60,
880             status => 'created',
881             workload => \'Some stuff',
882             result => \'JOB result comes here',
883             };
884              
885             my $job = Redis::JobQueue::Job->new( $job_data );
886              
887             my $resulting_job = $jq->add_job( $job );
888             # or
889             $resulting_job = $jq->add_job(
890             $pre_job,
891             LPUSH => 1,
892             );
893              
894             If used with the optional argument C 1>, the job is placed at the beginnig of
895             the queue and will be returned by the next call to get_next_job.
896              
897             TTL of job data on Redis server is set in accordance with the L
898             attribute of the L object. Make
899             sure it's higher than the time needed to process all the jobs in the queue.
900              
901             =cut
902             sub add_job {
903 0     0 1   my $self = shift;
904              
905 0           $self->_error( E_NO_ERROR );
906 0 0 0       ref( $_[0] ) eq 'HASH' || _INSTANCE( $_[0], 'Redis::JobQueue::Job' )
907             || confess $self->_error( E_MISMATCH_ARG );
908 0 0         my $job = _INSTANCE( $_[0], 'Redis::JobQueue::Job' ) ? shift : Redis::JobQueue::Job->new( shift );
909              
910 0           my %args = ( @_ );
911              
912 0           my ( $id, $key );
913 0           do {
914 0           $self->_call_redis( 'UNWATCH' );
915 0           $key = $self->_jkey( $id = $uuid->create_str );
916 0           $self->_call_redis( 'WATCH', $key );
917             } while ( $self->_call_redis( 'EXISTS', $self->_jkey( $id ) ) );
918              
919 0           $job->id( $id );
920 0           my $expire = $job->expire;
921              
922             # transaction start
923 0           $self->_call_redis( 'MULTI' );
924              
925             $self->_call_redis( 'HSET', $key, $_, $job->$_ // q{} )
926 0   0       for @job_fields;
927             $self->_call_redis( 'HSET', $key, $_, $job->meta_data( $_ ) // q{} )
928 0   0       for keys %{ $job->meta_data };
  0            
929              
930 0           my $id_in_list = $id;
931              
932 0 0         if ( $expire ) {
933 0           $id_in_list .= ' '.( time + $expire );
934 0           $self->_call_redis( 'EXPIRE', $key, $expire );
935             }
936              
937 0           $self->_call_redis( 'HSET', $key, $_ID_IN_QUEUE_FIELD, $id_in_list );
938              
939 0 0         $self->_call_redis( $args{LPUSH} ? 'LPUSH' : 'RPUSH', $self->_qkey( $job->queue ), $id_in_list );
940              
941             # transaction end
942 0   0       $self->_call_redis( 'EXEC' ) // return;
943              
944 0           $job->clear_modified;
945 0           return $job;
946             }
947              
948             =head3 C
949              
950             Data of the job is requested from the Redis server. First argument can be either
951             a job ID or L object.
952              
953             Returns C when the job was not found on Redis server.
954              
955             The method returns the jobs data from the Redis server.
956             See L for the list of standard jobs data fields.
957              
958             The method returns a reference to a hash of the standard jobs data fields if only the first
959             argument is specified.
960              
961             If given a key name C<$data_key>, it returns data corresponding to
962             the key or C when the value is undefined or key is not in the data or metadata.
963              
964             The following examples illustrate uses of the C method:
965              
966             my $data_href = $jq->get_job_data( $id );
967             # or
968             $data_href = $jq->get_job_data( $job );
969             # or
970             my $data_key = 'foo';
971             my $data = $jq->get_job_data( $job->id, $data_key );
972              
973             You can specify a list of names of key data or metadata.
974             In this case it returns the corresponding list of data. For example:
975              
976             my ( $status, $foo ) = $jq->get_job_data( $job->id, 'status', 'foo' );
977              
978             See L for more informations about
979             the jobs metadata.
980              
981             =cut
982             sub get_job_data {
983 0     0 1   my ( $self, $id_source, @data_keys ) = @_;
984              
985 0           my $job_id = $self->_get_job_id( $id_source );
986 0           my $data_fields = scalar @data_keys;
987 0 0         my @right_keys = map { _STRING( $_ ) || q{} } @data_keys;
  0            
988 0           my %right_names = map { $_ => 1 } grep { $_ } @right_keys;
  0            
  0            
989              
990 0           my @additional = ();
991 0 0         if ( $data_fields ) {
992 0 0         if ( exists $right_names{elapsed} ) {
993 0           foreach my $field ( qw( started completed failed ) ) {
994 0 0         push @additional, $field if !exists $right_names{ $field };
995             }
996             }
997             } else {
998 0           @additional = @job_fields;
999             }
1000 0           my @all_fields = ( @right_keys, @additional );
1001 0           my $total_fields = scalar( @all_fields );
1002              
1003 0           $self->_error( E_NO_ERROR );
1004              
1005 0           my $tm = time;
1006 0           my ( $job_exists, @data ) = $self->_call_redis(
1007             $self->_lua_script_cmd( 'get_job_data' ),
1008             0,
1009             $job_id,
1010             @all_fields,
1011             );
1012              
1013 0 0         return unless $job_exists;
1014              
1015 0           for ( my $i = 0; $i < $total_fields; ++$i ) {
1016 0           my $field = $all_fields[ $i ];
1017 0 0 0       if ( $field ne 'elapsed' && ( $field =~ /^(workload|result)$/ || !$job_fnames{ $field } ) ) {
      0        
1018 0 0         $data[ $i ] = ${ thaw( $data[ $i ] ) }
  0            
1019             if $data[ $i ];
1020             }
1021             }
1022              
1023 0 0         if ( !$data_fields ) {
1024 0           my %result_data;
1025 0           for ( my $i = 0; $i < $total_fields; ++$i ) {
1026 0           $result_data{ $all_fields[ $i ] } = $data[ $i ];
1027             }
1028              
1029 0 0         if ( my $started = $result_data{started} ) {
1030             $result_data{elapsed} = (
1031             $result_data{completed}
1032             || $result_data{failed}
1033             || time
1034 0   0       ) - $started;
1035             } else {
1036 0           $result_data{elapsed} = undef;
1037             }
1038              
1039 0           return \%result_data;
1040             } else {
1041 0           for ( my $i = 0; $i < $data_fields; ++$i ) {
1042 0 0         if ( $right_keys[ $i ] eq 'elapsed' ) {
1043 0 0   0     if ( my $started = $data[ firstidx { $_ eq 'started' } @all_fields ] ) {
  0            
1044             $data[ $i ] = (
1045 0     0     $data[ firstidx { $_ eq 'completed' } @all_fields ]
1046 0   0 0     || $data[ firstidx { $_ eq 'failed' } @all_fields ]
  0            
1047             || $tm
1048             ) - $started;
1049             } else {
1050 0           $data[ $i ] = undef;
1051             }
1052             }
1053              
1054 0 0         return $data[0] unless wantarray;
1055             }
1056              
1057 0           return @data;
1058             }
1059             }
1060              
1061             =head3 C
1062              
1063             The list of names of metadata fields of the job is requested from the Redis server.
1064             First argument can be either a job ID or
1065             L object.
1066              
1067             Returns empty list when the job was not found on Redis server or
1068             the job does not have metadata.
1069              
1070             The following examples illustrate uses of the C method:
1071              
1072             my @fields = $jq->get_job_meta_fields( $id );
1073             # or
1074             @fields = $jq->get_job_meta_fields( $job );
1075             # or
1076             @fields = $jq->get_job_meta_fields( $job->id );
1077              
1078             See L for more informations about
1079             the jobs metadata.
1080              
1081             =cut
1082             sub get_job_meta_fields {
1083 0     0 1   my ( $self, $id_source ) = @_;
1084              
1085 0 0         return grep { !$job_fnames{ $_ } && $_ ne $_ID_IN_QUEUE_FIELD } $self->_call_redis( 'HKEYS', $self->_jkey( $self->_get_job_id( $id_source ) ) );
  0            
1086             }
1087              
1088             =head3 C
1089              
1090             Loads job data from the Redis server. The argument is either job ID or
1091             a L object.
1092              
1093             Method returns the object corresponding to the loaded job.
1094             Returns C if the job is not found on the Redis server.
1095              
1096             The following examples illustrate uses of the C method:
1097              
1098             $job = $jq->load_job( $id );
1099             # or
1100             $job = $jq->load_job( $job );
1101              
1102             =cut
1103             sub load_job {
1104 0     0 1   my ( $self, $id_source ) = @_;
1105              
1106 0           my $job_id = $self->_get_job_id( $id_source );
1107              
1108 0           $self->_error( E_NO_ERROR );
1109              
1110 0           my ( $job_exists, @job_data ) = $self->_call_redis(
1111             $self->_lua_script_cmd( 'load_job' ),
1112             0,
1113             $job_id,
1114             @job_fields,
1115             );
1116              
1117 0 0         return unless $job_exists;
1118              
1119 0           my ( $pre_job, $key, $val );
1120 0           while ( @job_data ) {
1121 0           $key = shift @job_data;
1122 0           $val = shift @job_data;
1123 0 0         if ( $job_fnames{ $key } ) {
1124 0           $pre_job->{ $key } = $val;
1125             } else {
1126 0           $pre_job->{meta_data}->{ $key } = $val;
1127             }
1128             }
1129              
1130 0           foreach my $field ( qw( workload result ) ) {
1131 0           $pre_job->{ $field } = ${ thaw( $pre_job->{ $field } ) }
1132 0 0         if $pre_job->{ $field };
1133             }
1134 0 0         if ( $pre_job->{meta_data} ) {
1135 0           my $meta_data = $pre_job->{meta_data};
1136 0           foreach my $field ( keys %$meta_data ) {
1137 0           $meta_data->{ $field } = ${ thaw( $meta_data->{ $field } ) }
1138 0 0         if $meta_data->{ $field };
1139             }
1140             }
1141              
1142 0           my $new_job = Redis::JobQueue::Job->new( $pre_job );
1143 0           $new_job->clear_modified;
1144              
1145 0           return $new_job;
1146             }
1147              
1148             =head3 C $queue_name, blocking =E 1 )>
1149              
1150             Selects the job identifier which is at the beginning of the queue.
1151              
1152             C takes arguments in key-value pairs.
1153             You can specify a queue name or a reference to an array of queue names.
1154             Queues from the list are processed in random order.
1155              
1156             By default, each queue is processed in a separate request with the result
1157             returned immediately if a job is found (waiting) in that queue. If no waiting
1158             job found, returns undef.
1159             In case optional C argument is true, all queues are processed in
1160             a single request to Redis server and if no job is found waiting in queue(s),
1161             get_next_job execution will be paused for up to C seconds or until
1162             a job becomes available in any of the listed queues.
1163              
1164             Use C = 0 for an unlimited wait time.
1165             Default - C is false (0).
1166              
1167             Method returns the job object corresponding to the received job identifier.
1168             Returns the C if there is no job in the queue.
1169              
1170             These examples illustrate a C call with all the valid arguments:
1171              
1172             $job = $jq->get_next_job(
1173             queue => 'xxx',
1174             blocking => 1,
1175             );
1176             # or
1177             $job = $jq->get_next_job(
1178             queue => [ 'aaa', 'bbb' ],
1179             blocking => 1,
1180             );
1181              
1182             TTL job data for the job resets on the Redis server in accordance with
1183             the L attribute of the job object.
1184              
1185             =cut
1186             sub get_next_job {
1187 0     0 1   my $self = shift;
1188              
1189 0 0         !( scalar( @_ ) % 2 )
1190             || confess $self->_error( E_MISMATCH_ARG );
1191 0           my %args = ( @_ );
1192 0           my $queues = $args{queue};
1193 0           my $blocking = $args{blocking};
1194 0           my $only_id = $args{_only_id};
1195              
1196 0 0 0       $queues = [ $queues // () ]
1197             if !ref( $queues );
1198              
1199 0           foreach my $arg ( ( @{$queues} ) ) {
  0            
1200 0 0         defined _STRING( $arg )
1201             or confess $self->_error( E_MISMATCH_ARG );
1202             }
1203              
1204 0           my @keys = ();
1205 0           push @keys, map { $self->_qkey( $_ ) } @$queues;
  0            
1206              
1207 0 0         if ( @keys ) {
1208 0           @keys = shuffle( @keys );
1209              
1210 0           my $full_id;
1211 0 0         if ( $blocking ) {
1212             # 'BLPOP' waiting time of a given $self->timeout parameter
1213 0           my @cmd = ( 'BLPOP', ( @keys ), $self->timeout );
1214 0           while (1) {
1215 0           ( undef, $full_id ) = $self->_call_redis( @cmd );
1216             # if the job is no longer
1217 0 0         last unless $full_id;
1218              
1219 0           my $ret = $self->_get_next_job( $full_id, $only_id );
1220 0 0         return $ret if $ret;
1221             }
1222             } else {
1223             # 'LPOP' takes only one queue name at a time
1224 0           foreach my $key ( @keys ) {
1225 0 0         next unless $self->_call_redis( 'EXISTS', $key );
1226 0           my @cmd = ( 'LPOP', $key );
1227 0           while (1) {
1228 0           $full_id = $self->_call_redis( @cmd );
1229             # if the job is no longer
1230 0 0         last unless $full_id;
1231              
1232 0           my $ret = $self->_get_next_job( $full_id, $only_id );
1233 0 0         return $ret if $ret;
1234             }
1235             }
1236             }
1237             }
1238              
1239 0           return;
1240             }
1241              
1242             =head3 C $queue_name, blocking =E 1 )>
1243              
1244             Like L, but returns job identifier only.
1245              
1246             TTL job data for the job does not reset on the Redis server.
1247              
1248             =cut
1249             sub get_next_job_id {
1250 0     0 1   my $self = shift;
1251              
1252 0           return $self->get_next_job( @_, _only_id => 1 );
1253             }
1254              
1255             sub _get_next_job {
1256 0     0     my ( $self, $full_id, $only_id ) = @_;
1257              
1258 0           my ( $id, $expire_time ) = split ' ', $full_id;
1259 0           my $key = $self->_jkey( $id );
1260 0 0         if ( $self->_call_redis( 'EXISTS', $key ) ) {
1261 0 0         if ( $only_id ) {
1262 0           return $id;
1263             } else {
1264 0           my $job = $self->load_job( $id );
1265 0 0         if ( my $expire = $job->expire ) {
1266 0           $self->_call_redis( 'EXPIRE', $key, $expire );
1267             }
1268 0           return $job;
1269             }
1270             } else {
1271 0 0 0       if ( !$expire_time || time < $expire_time ) {
1272 0           confess format_message( '%s %s', $id, $self->_error( E_JOB_DELETED ) );
1273             }
1274             # If the queue contains a job identifier that has already been removed due
1275             # to expiration, the cycle will ensure the transition
1276             # to the next job ID selection
1277 0           return;
1278             }
1279             }
1280              
1281             =head3 C
1282              
1283             Saves job data changes to the Redis server. Job ID is obtained from
1284             the argument, which can be a L
1285             object.
1286              
1287             Returns the number of attributes that were updated if the job was found on the Redis
1288             server and C if it was not.
1289             When you change a single attribute, returns C<2> because L also changes.
1290              
1291             Changing the L attribute is ignored.
1292              
1293             The following examples illustrate uses of the C method:
1294              
1295             $jq->update_job( $job );
1296              
1297             TTL job data for the job resets on the Redis server in accordance with
1298             the L attribute of the job object.
1299              
1300             =cut
1301             sub update_job {
1302 0     0 1   my ( $self, $job ) = @_;
1303              
1304 0 0         _INSTANCE( $job, 'Redis::JobQueue::Job' )
1305             or confess $self->_error( E_MISMATCH_ARG );
1306              
1307 0           my @modified = $job->modified_attributes;
1308 0 0         return 0 unless @modified;
1309              
1310 0           my $id = $job->id;
1311 0           my $key = $self->_jkey( $id );
1312 0           $self->_call_redis( 'WATCH', $key );
1313 0 0         if ( !$self->_call_redis( 'EXISTS', $key ) ) {
1314 0           $self->_call_redis( 'UNWATCH' );
1315 0           return;
1316             }
1317              
1318             # transaction start
1319 0           $self->_call_redis( 'MULTI' );
1320              
1321 0           my $expire = $job->expire;
1322 0 0         if ( $expire ) {
1323 0           $self->_call_redis( 'EXPIRE', $key, $expire );
1324             } else {
1325 0           $self->_call_redis( 'PERSIST', $key );
1326             }
1327              
1328 0           my $updated = 0;
1329 0           foreach my $field ( @modified ) {
1330 0 0 0       if ( !$job_fnames{ $field } ) {
    0          
1331 0   0       $self->_call_redis( 'HSET', $key, $field, $job->meta_data( $field ) // q{} );
1332 0           ++$updated;
1333             } elsif ( $field ne 'expire' && $field ne 'id' ) {
1334 0   0       $self->_call_redis( 'HSET', $key, $field, $job->$field // q{} );
1335 0           ++$updated;
1336             } else {
1337             # Field 'expire' and 'id' shall remain unchanged
1338             }
1339             }
1340              
1341             # transaction end
1342 0   0       $self->_call_redis( 'EXEC' ) // return;
1343 0           $job->clear_modified;
1344              
1345 0           return $updated;
1346             }
1347              
1348             =head3 C
1349              
1350             Deletes job's data from Redis server.
1351             The Job ID is obtained from the argument, which can be either a string or
1352             a L object.
1353              
1354             Returns true if job and its metadata was successfully deleted from Redis server.
1355             False if jobs or its metadata wasn't found.
1356              
1357             The following examples illustrate uses of the C method:
1358              
1359             $jq->delete_job( $job );
1360             # or
1361             $jq->delete_job( $id );
1362              
1363             Use this method soon after receiving the results of the job to free memory on
1364             the Redis server.
1365              
1366             See description of the C data structure used on the Redis server
1367             at the L section.
1368              
1369             Note that job deletion time is proportional to number of jobs currently in the queue.
1370              
1371             =cut
1372             sub delete_job {
1373 0     0 1   my ( $self, $id_source ) = @_;
1374              
1375 0 0 0       defined( _STRING( $id_source ) ) || _INSTANCE( $id_source, 'Redis::JobQueue::Job' )
1376             || confess $self->_error( E_MISMATCH_ARG );
1377              
1378 0           $self->_error( E_NO_ERROR );
1379              
1380 0 0         return $self->_call_redis(
1381             $self->_lua_script_cmd( 'delete_job' ),
1382             0,
1383             ref( $id_source ) ? $id_source->id : $id_source,
1384             );
1385             }
1386              
1387             =head3 C
1388              
1389             Gets list of job IDs on the Redis server.
1390             These IDs are identifiers of job data structures, not only the identifiers which
1391             get derived from the queue by L.
1392              
1393             The following examples illustrate simple uses of the C method
1394             (IDs of all existing jobs):
1395              
1396             @jobs = $jq->get_job_ids;
1397              
1398             These are identifiers of jobs data structures related to the queue
1399             determined by the arguments.
1400              
1401             C takes arguments in key-value pairs.
1402             You can specify a queue name or job status
1403             (or a reference to an array of queue names or job statuses)
1404             to filter the list of identifiers.
1405              
1406             You can also specify an argument C (true or false).
1407              
1408             When filtering by the names of the queues and C is set to true,
1409             only the identifiers of the jobs which have not yet been derived from
1410             the queues using L are returned.
1411              
1412             Filtering by status returns the task IDs whose status is exactly the same
1413             as the specified status.
1414              
1415             The following examples illustrate uses of the C method:
1416              
1417             # filtering the identifiers of jobs data structures
1418             @ids = $jq->get_job_ids( queue => 'some_queue' );
1419             # or
1420             @ids = $jq->get_job_ids( queue => [ 'foo_queue', 'bar_queue' ] );
1421             # or
1422             @ids = $jq->get_job_ids( status => STATUS_COMPLETED );
1423             # or
1424             @ids = $jq->get_job_ids( status => [ STATUS_COMPLETED, STATUS_FAILED ] );
1425              
1426             # filter the IDs are in the queues
1427             @ids = $jq->get_job_ids( queued => 1, queue => 'some_queue' );
1428             # etc.
1429              
1430             =cut
1431             sub get_job_ids {
1432 0     0 1   my $self = shift;
1433              
1434 0 0         confess format_message( '%s (Odd number of elements in hash assignment)', $self->_error( E_MISMATCH_ARG ) )
1435             if ( scalar( @_ ) % 2 );
1436              
1437 0           my %args = @_;
1438              
1439             # Here are the arguments to references to arrays
1440 0           foreach my $field ( qw( queue status ) ) {
1441 0 0 0       $args{ $field } = [ $args{ $field } ] if exists( $args{ $field } ) && ref( $args{ $field } ) ne 'ARRAY';
1442             }
1443              
1444 0           my @queues = grep { _STRING( $_ ) } @{ $args{queue} };
  0            
  0            
1445 0           my @statuses = grep { _STRING( $_ ) } @{ $args{status} };
  0            
  0            
1446              
1447 0           $self->_error( E_NO_ERROR );
1448              
1449             my @ids = $self->_call_redis(
1450             $self->_lua_script_cmd( 'get_job_ids' ),
1451             0,
1452 0 0         $args{queued} ? 1 : 0,
    0          
    0          
1453             scalar( @queues ), # the number of queues to filter
1454             scalar( @queues ) ? @queues : (), # the queues to filter
1455             scalar( @statuses ) ? @statuses : (), # the statuses to filter
1456             );
1457              
1458 0           return @ids;
1459             }
1460              
1461             =head3 C
1462              
1463             Returns the address of the Redis server used by the queue
1464             (in form of 'host:port').
1465              
1466             The following example illustrates use of the C method:
1467              
1468             $redis_address = $jq->server;
1469              
1470             =cut
1471             sub server {
1472 0     0 1   my ( $self ) = @_;
1473              
1474 0           return $self->_server;
1475             }
1476              
1477             =head3 C
1478              
1479             This command is used to test connection to Redis server.
1480              
1481             Returns 1 if a connection is still alive or 0 otherwise.
1482              
1483             The following example illustrates use of the C method:
1484              
1485             $is_alive = $jq->ping;
1486              
1487             External connections to the server object (eg, C <$redis = Redis->new( ... );>),
1488             and the queue object can continue to work after calling ping only if the method returned 1.
1489              
1490             If there is no connection to the Redis server (methods return 0), the connection to the server closes.
1491             In this case, to continue working with the queue,
1492             you must re-create the C object with the L method.
1493             When using an external connection to the server,
1494             to check the connection to the server you can use the C<$redis->echo( ... )> call.
1495             This is useful to avoid closing the connection to the Redis server unintentionally.
1496              
1497             =cut
1498             sub ping {
1499 0     0 1   my ( $self ) = @_;
1500              
1501 0           $self->_error( E_NO_ERROR );
1502              
1503 0           my $ret;
1504             try {
1505 0     0     $ret = $self->_redis->ping;
1506             } catch {
1507 0     0     $self->_redis_exception( $_ );
1508 0           };
1509              
1510 0 0 0       return( ( $ret // '' ) eq 'PONG' ? 1 : 0 );
1511             }
1512              
1513             =head3 C
1514              
1515             Closes connection to the Redis server.
1516              
1517             The following example illustrates use of the C method:
1518              
1519             $jq->quit;
1520              
1521             It does not close the connection to the Redis server if it is an external connection provided
1522             to queue constructor as existing L object.
1523             When using an external connection (eg, C<$redis = Redis-> new (...);>),
1524             to close the connection to the Redis server, call C<$redis->quit> after calling this method.
1525              
1526             =cut
1527             sub quit {
1528 0     0 1   my ( $self ) = @_;
1529              
1530 0 0 0       return if $] >= 5.14 && ${^GLOBAL_PHASE} eq 'DESTRUCT';
1531              
1532 0           $self->_error( E_NO_ERROR );
1533              
1534 0 0         unless ( $self->_use_external_connection ) {
1535             try {
1536 0     0     $self->_redis->quit;
1537             } catch {
1538 0     0     $self->_redis_exception( $_ );
1539 0           };
1540             }
1541              
1542 0           return;
1543             }
1544              
1545             =head3 C
1546              
1547             Gets queue status from the Redis server.
1548             Queue name is obtained from the argument. The argument can be either a
1549             string representing a queue name or a
1550             L object.
1551              
1552             Returns a reference to a hash describing state of the queue or a reference
1553             to an empty hash when the queue wasn't found.
1554              
1555             The following examples illustrate uses of the C method:
1556              
1557             $qstatus = $jq->queue_status( $queue_name );
1558             # or
1559             $qstatus = $jq->queue_status( $job );
1560              
1561             The returned hash contains the following information related to the queue:
1562              
1563             =over 3
1564              
1565             =item * C
1566              
1567             The number of jobs in the active queue that are waiting to be selected by L and then processed.
1568              
1569             =item * C
1570              
1571             The number of ALL jobs tagged with the queue, i.e. including those that were processed before and other jobs,
1572             not presently waiting in the active queue.
1573              
1574             =item * C
1575              
1576             The age of the oldest job (the lifetime of the queue) in the active queue.
1577              
1578             =item * C
1579              
1580             The age of the youngest job in the active queue.
1581              
1582             =item * C
1583              
1584             Time it currently takes for a job to pass through the queue.
1585              
1586             =back
1587              
1588             Statistics based on the jobs that have not yet been removed.
1589             Some fields may be missing if the status of the job prevents determining
1590             the desired information (eg, there are no jobs in the queue).
1591              
1592             =cut
1593             # Statistics based on the jobs that have not yet been removed
1594             sub queue_status {
1595 0     0 1   my ( $self, $maybe_queue ) = @_;
1596              
1597 0 0 0       defined( _STRING( $maybe_queue ) ) || _INSTANCE( $maybe_queue, 'Redis::JobQueue::Job' )
1598             || confess $self->_error( E_MISMATCH_ARG );
1599              
1600 0 0         $maybe_queue = $maybe_queue->queue
1601             if ref $maybe_queue;
1602              
1603 0           $self->_error( E_NO_ERROR );
1604              
1605 0           my %qstatus = $self->_call_redis(
1606             $self->_lua_script_cmd( 'queue_status' ),
1607             0,
1608             $maybe_queue,
1609             Time::HiRes::time,
1610             );
1611              
1612 0           return \%qstatus;
1613             }
1614              
1615             #-- private methods ------------------------------------------------------------
1616              
1617             sub _redis_exception {
1618 0     0     my ( $self, $error ) = @_;
1619              
1620             # Use the error messages from Redis.pm
1621 0 0 0       if (
    0 0        
      0        
      0        
      0        
1622             $error =~ /^Could not connect to Redis server at /
1623             || $error =~ /^Can't close socket: /
1624             || $error =~ /^Not connected to any server/
1625             # Maybe for pub/sub only
1626             || $error =~ /^Error while reading from Redis server: /
1627             || $error =~ /^Redis server closed connection/
1628             ) {
1629 0           $self->_error( E_NETWORK );
1630             } elsif (
1631             $error =~ /[\S+] ERR command not allowed when used memory > 'maxmemory'/
1632             || $error =~ /[\S+] OOM command not allowed when used memory > 'maxmemory'/
1633             ) {
1634 0           $self->_error( E_MAX_MEMORY_LIMIT );
1635             } else {
1636 0           $self->_error( E_REDIS );
1637             }
1638              
1639 0 0         if ( $self->_transaction ) {
1640             try {
1641 0     0     $self->_redis->discard;
1642 0           };
1643 0           $self->_transaction( 0 );
1644             }
1645 0           die $error;
1646             }
1647              
1648             sub _redis_constructor {
1649 0     0     my ( $self ) = @_;
1650              
1651 0           $self->_error( E_NO_ERROR );
1652 0           my $redis;
1653              
1654             try {
1655 0     0     $redis = Redis->new(
1656             server => $self->_server,
1657             );
1658             } catch {
1659 0     0     $self->_redis_exception( $_ );
1660 0           };
1661 0           return $redis;
1662             }
1663              
1664             # Keep in mind the default 'redis.conf' values:
1665             # Close the connection after a client is idle for N seconds (0 to disable)
1666             # timeout 300
1667              
1668             # Send a request to Redis
1669             sub _call_redis {
1670 0     0     my $self = shift;
1671 0           my $method = shift;
1672              
1673 0           my @result;
1674 0           $self->_error( E_NO_ERROR );
1675              
1676 0           my $error;
1677             # if you use "$method eq 'HSET'" then use $_[0..2] to reduce data copies
1678             # $_[0] - key
1679             # $_[1] - field
1680             # $_[2] - value
1681 0 0 0       if ( $method eq 'HSET' && $_[1] eq $_ID_IN_QUEUE_FIELD ) {
    0 0        
    0 0        
      0        
1682 0           my ( $key, $field, $value ) = @_;
1683             try {
1684 0     0     @result = $self->_redis->$method(
1685             $key,
1686             $field,
1687             $value,
1688             );
1689             } catch {
1690 0     0     $error = $_;
1691 0           };
1692             } elsif ( $method eq 'HSET' && ( $_[1] =~ /^(workload|result)$/ || !$job_fnames{ $_[1] } ) ) {
1693 0           my $data_ref = \nfreeze( \$_[2] );
1694              
1695 0 0         if ( length( $$data_ref ) > $self->max_datasize ) {
1696 0 0         if ( $self->_transaction ) {
1697             try {
1698 0     0     $self->_redis->discard;
1699 0           };
1700 0           $self->_transaction( 0 );
1701             }
1702             # 'die' as maybe too long to analyze the data output from the 'confess'
1703 0           die format_message( '%s: %s', $self->_error( E_DATA_TOO_LARGE ), $_[1] );
1704             }
1705              
1706 0           my ( $key, $field ) = @_;
1707             try {
1708 0     0     @result = $self->_redis->$method(
1709             $key,
1710             $field,
1711             $$data_ref,
1712             );
1713             } catch {
1714 0     0     $error = $_;
1715 0           };
1716             } elsif ( $method eq 'HSET' && utf8::is_utf8( $_[2] ) ) {
1717             # In our case, the user sends data to the job queue and then gets it back.
1718             # Workload and result are fine with Unicode - they're properly serialized by Storable and stored as bytes in Redis;
1719             # Storable takes care about Unicode etc.
1720             # The main string job data fields (id, queue, job, status, message),
1721             # however, is not serialized for performance and convenience reasons, and stored in Redis as-is.
1722             # If there is Unicode in these fields, we have the following options:
1723             # 1. Assume everything is Unicode, turn utf-8 encoding in Redis.pm settings and take a substantial performance hit;
1724             # as we store the biggest parts of job data - workload and result - serialized already,
1725             # encoding and decoding them again is not a good idea.
1726             # 2. Assume that all fields is Unicode, encode and decode it;
1727             # this may lead to subtle errors if user provides it which is binary, not Unicode.
1728             # 3. Detect Unicode data and store "utf-8" flag along with data on redis,
1729             # to decode only utf-8 data when it is requested by user.
1730             # This makes data management more complicated.
1731             # 4. Assume that data is for application internal use,
1732             # and that application must ensure that it does not contain Unicode;
1733             # if Unicode is really needed, it should be either stored in workload or result,
1734             # or the application must take care about encoding and decoding Unicode data before sending to the job queue.
1735             # The job queue will throw an exception if Unicode data is encountered.
1736             #
1737             # We choose (4) as it is consistent, does not degrade performance and does not cause subtle errors with damaged data.
1738              
1739             # For non-serialized fields: UTF8 can not be transferred to the Redis server
1740 0           confess format_message( '%s (utf8 in %s)', $self->_error( E_MISMATCH_ARG ), $_[1] );
1741             } else {
1742 0           my @args = @_;
1743             try {
1744 0     0     @result = $self->_redis->$method( @args );
1745             } catch {
1746 0     0     $error = $_;
1747 0           };
1748             }
1749              
1750 0 0         $self->_redis_exception( $error )
1751             if $error;
1752              
1753 0 0         $self->_transaction( 1 )
1754             if $method eq 'MULTI';
1755 0 0         if ( $method eq 'EXEC' ) {
1756 0           $self->_transaction( 0 );
1757 0   0       $result[0] // return; # 'WATCH': the transaction is not entered at all
1758             }
1759              
1760 0 0 0       if ( $method eq 'HGET' and $_[1] =~ /^(workload|result)$/ ) {
1761 0 0         if ( $result[0] ) {
1762 0           $result[0] = ${ thaw( $result[0] ) };
  0            
1763 0 0         $result[0] = ${ $result[0] } if ref( $result[0] ) eq 'SCALAR';
  0            
1764             }
1765             }
1766              
1767 0 0         return wantarray ? @result : $result[0];
1768             }
1769              
1770             sub _jkey {
1771 0     0     my $self = shift;
1772              
1773 0           local $" = ':';
1774 0           return "$NAMESPACE:@_";
1775             }
1776              
1777             sub _qkey {
1778 0     0     my $self = shift;
1779              
1780 0           local $" = ':';
1781 0           return "$NAMESPACE:queue:@_";
1782             }
1783              
1784             sub _get_job_id {
1785 0     0     my ( $self, $id_source ) = @_;
1786              
1787 0 0 0       defined( _STRING( $id_source ) ) || _INSTANCE( $id_source, 'Redis::JobQueue::Job' )
1788             || confess $self->_error( E_MISMATCH_ARG );
1789              
1790 0 0         return ref( $id_source ) ? $id_source->id : $id_source;
1791             }
1792              
1793             sub _lua_script_cmd {
1794 0     0     my ( $self, $name ) = @_;
1795              
1796 0           my $sha1 = $self->_lua_scripts->{ $name };
1797 0 0         unless ( $sha1 ) {
1798 0           $sha1 = $self->_lua_scripts->{ $name } = sha1_hex( $lua_script_body{ $name } );
1799 0 0         unless ( ( $self->_call_redis( 'SCRIPT', 'EXISTS', $sha1 ) )[0] ) {
1800 0           return( 'EVAL', $lua_script_body{ $name } );
1801             }
1802             }
1803 0           return( 'EVALSHA', $sha1 );
1804             }
1805              
1806             sub _error {
1807 0     0     my ( $self, $error_code ) = @_;
1808              
1809 0           $self->_set_last_errorcode( $error_code );
1810 0           return $self->last_error;
1811             }
1812              
1813             #-- Closes and cleans up -------------------------------------------------------
1814              
1815 62     62   365 no Mouse::Util::TypeConstraints;
  62         89  
  62         468  
1816 62     62   7242 no Mouse; # keywords are removed from the package
  62         83  
  62         234  
1817             __PACKAGE__->meta->make_immutable();
1818              
1819             __END__