File Coverage

lib/Redis/JobQueue.pm
Criterion Covered Total %
statement 42 44 95.4
branch n/a
condition n/a
subroutine 15 15 100.0
pod n/a
total 57 59 96.6


line stmt bran cond sub pod time code
1             package Redis::JobQueue;
2              
3             =head1 NAME
4              
5             Redis::JobQueue - Job queue management implemented using Redis server.
6              
7             =head1 VERSION
8              
9             This documentation refers to C version 1.17
10              
11             =cut
12              
13             #-- Pragmas --------------------------------------------------------------------
14              
15 17     17   1603422 use 5.010;
  17         43  
16 17     17   54 use strict;
  17         16  
  17         243  
17 17     17   47 use warnings;
  17         23  
  17         577  
18              
19             # ENVIRONMENT ------------------------------------------------------------------
20              
21             our $VERSION = '1.17';
22              
23 17         639 use Exporter qw(
24             import
25 17     17   56 );
  17         20  
26             our @EXPORT_OK = qw(
27             DEFAULT_SERVER
28             DEFAULT_PORT
29             DEFAULT_TIMEOUT
30             NAMESPACE
31             E_NO_ERROR
32             E_MISMATCH_ARG
33             E_DATA_TOO_LARGE
34             E_NETWORK
35             E_MAX_MEMORY_LIMIT
36             E_JOB_DELETED
37             E_REDIS
38             );
39              
40             #-- load the modules -----------------------------------------------------------
41              
42 17     17   52 use Carp;
  17         20  
  17         698  
43 17     17   5939 use Data::UUID;
  17         7118  
  17         815  
44 17         751 use Digest::SHA1 qw(
45             sha1_hex
46 17     17   6237 );
  17         7154  
47 17         1211 use List::Util qw(
48             min
49             shuffle
50 17     17   77 );
  17         19  
51 17         84 use List::MoreUtils qw(
52             firstidx
53 17     17   6378 );
  17         110749  
54 17     17   7644 use Mouse;
  17         37499  
  17         89  
55 17     17   4723 use Mouse::Util::TypeConstraints;
  17         18  
  17         82  
56 17         985 use Params::Util qw(
57             _ARRAY0
58             _INSTANCE
59             _NONNEGINT
60             _STRING
61 17     17   7748 );
  17         30471  
62 17     17   9360 use Redis '1.976';
  17         308272  
  17         458  
63 17         917 use Redis::JobQueue::Job qw(
64             STATUS_CREATED
65             STATUS_WORKING
66             STATUS_COMPLETED
67             STATUS_FAILED
68 17     17   5176 );
  17         1385  
69 0           use Redis::JobQueue::Util qw(
70             format_message
71 17     17   14385 );
  0            
72             use Storable qw(
73             nfreeze
74             thaw
75             );
76             use Try::Tiny;
77              
78             #-- declarations ---------------------------------------------------------------
79              
80             =head1 SYNOPSIS
81              
82             use 5.010;
83             use strict;
84             use warnings;
85              
86             #-- Common
87             use Redis::JobQueue qw(
88             DEFAULT_SERVER
89             DEFAULT_PORT
90             );
91              
92             my $connection_string = DEFAULT_SERVER.':'.DEFAULT_PORT;
93             my $jq = Redis::JobQueue->new( redis => $connection_string );
94              
95             #-- Producer
96             my $job = $jq->add_job(
97             {
98             queue => 'xxx',
99             workload => \'Some stuff',
100             expire => 12*60*60, # 12h,
101             }
102             );
103              
104             #-- Worker
105             sub xxx {
106             my $job = shift;
107              
108             my $workload = ${ $job->workload };
109             # do something with workload;
110              
111             $job->result( 'XXX JOB result comes here' );
112             }
113              
114             while ( $job = $jq->get_next_job(
115             queue => 'xxx',
116             blocking => 1,
117             ) ) {
118             $job->status( 'working' );
119             $jq->update_job( $job );
120              
121             # do my stuff
122             xxx( $job );
123              
124             $job->status( 'completed' );
125             $jq->update_job( $job );
126             }
127              
128             #-- Consumer
129             my $id = $ARGV[0];
130             my $status = $jq->get_job_data( $id, 'status' );
131              
132             if ( $status eq 'completed' ) {
133             # it is now safe to remove it from JobQueue, since it's completed
134             my $job = $jq->load_job( $id );
135              
136             $jq->delete_job( $id );
137             say 'Job result: ', ${ $job->result };
138             } else {
139             say "Job is not complete, has current '$status' status";
140             }
141              
142             To see a brief but working code example of the C
143             package usage look at the L section.
144              
145             Description of the used by C data
146             structures (on Redis server) is provided in L section.
147              
148             =head1 ABSTRACT
149              
150             The C package is a set of Perl modules which
151             allows creation of a simple job queue based on Redis server capabilities.
152              
153             =head1 DESCRIPTION
154              
155             The main features of the package are:
156              
157             =over 3
158              
159             =item *
160              
161             Supports the automatic creation of job queues, job status monitoring,
162             updating the job data set, obtaining a consistent job from the queue,
163             removing jobs, and the classification of possible errors.
164              
165             =item *
166              
167             Contains various reusable components that can be used separately or together.
168              
169             =item *
170              
171             Provides an object oriented API.
172              
173             =item *
174              
175             Support of storing arbitrary job-related data structures.
176              
177             =item *
178              
179             Simple methods for organizing producer, worker, and consumer clients.
180              
181             =back
182              
183             =head3 Atributes
184              
185             =over
186              
187             =item C
188              
189             An id that uniquely identifies the job, scalar.
190              
191             =item C
192              
193             Queue name in which job is placed, scalar.
194              
195             =item C
196              
197             For how long (seconds) job data structures will be kept in memory.
198              
199             =item C
200              
201             Job status, scalar. See L L section
202             for the list of pre-defined statuses.
203             Can be also set to any arbitrary value.
204              
205             =item C, C
206              
207             User-set data structures which will be serialized before stored in Redis server.
208             Suitable for passing large amounts of data.
209              
210             =item C<*>
211              
212             Any other custom-named field passed to L or L method
213             will be stored as metadata scalar in Redis server.
214             Suitable for storing scalar values with fast access
215             (will be serialized before stored in Redis server).
216              
217             =back
218              
219             =cut
220              
221             =head2 EXPORT
222              
223             None by default.
224              
225             The following additional constants, defining defaults for various parameters, are available for export:
226              
227             =over
228              
229             =item C
230              
231             Maximum size of the data stored in C, C: 512MB.
232              
233             =cut
234             use constant MAX_DATASIZE => 512*1024*1024; # A String value can be at max 512 Megabytes in length.
235              
236             =item C
237              
238             Default address of the Redis server - C<'localhost'>.
239              
240             =cut
241             use constant DEFAULT_SERVER => 'localhost';
242              
243             =item C
244              
245             Default port of the Redis server - 6379.
246              
247             =cut
248             use constant DEFAULT_PORT => 6379;
249              
250             =item C
251              
252             Maximum time (in seconds) to wait for a new job from the queue,
253             0 - unlimited.
254              
255             =cut
256             use constant DEFAULT_TIMEOUT => 0; # 0 for an unlimited timeout
257              
258             =item C
259              
260             Namespace used for keys on the Redis server - C<'JobQueue'>.
261              
262             =cut
263             use constant NAMESPACE => 'JobQueue';
264              
265             =item Error codes are identified
266              
267             More details about error codes are provided in L section.
268              
269             =back
270              
271             Possible error codes:
272              
273             =cut
274              
275             =over 3
276              
277             =item C
278              
279             0 - No error
280              
281             =cut
282             use constant E_NO_ERROR => 0;
283              
284             =item C
285              
286             1 - Invalid argument of C or other L.
287              
288             =cut
289             use constant E_MISMATCH_ARG => 1;
290              
291             =item C
292              
293             2 - Provided data is too large.
294              
295             =cut
296             use constant E_DATA_TOO_LARGE => 2;
297              
298             =item C
299              
300             3 - Error connecting to Redis server.
301              
302             =cut
303             use constant E_NETWORK => 3;
304              
305             =item C
306              
307             4 - Command failed because its execution requires more than allowed memory, set in C.
308              
309             =cut
310             use constant E_MAX_MEMORY_LIMIT => 4;
311              
312             =item C
313              
314             5 - Job's data was removed.
315              
316             =cut
317             use constant E_JOB_DELETED => 5;
318              
319             =item C
320              
321             6 - Other error on Redis server.
322              
323             =back
324              
325             =cut
326             use constant E_REDIS => 6;
327              
328             our @ERROR = (
329             'No error',
330             'Invalid argument',
331             'Data is too large',
332             'Error in connection to Redis server',
333             "Command not allowed when used memory > 'maxmemory'",
334             'job was removed by maxmemory-policy',
335             'job was removed prior to use',
336             'Redis error',
337             );
338              
339             my $_ID_IN_QUEUE_FIELD = '__id_in_queue__';
340              
341             my $NAMESPACE = NAMESPACE;
342              
343             my @job_fields = Redis::JobQueue::Job->job_attributes; # sorted list
344             splice @job_fields, ( firstidx { $_ eq 'meta_data' } @job_fields ), 1;
345             my %job_fnames = map { $_ => 1 } @job_fields;
346              
347             my $uuid = new Data::UUID;
348              
349             my %lua_script_body;
350             my $lua_id_rgxp = '([^:]+)$';
351              
352             my $lua_body_start = <<"END_BODY_START";
353             local job_id = ARGV[1]
354             local data_fields = { unpack( ARGV, 2 ) }
355              
356             local job_key = '${NAMESPACE}:'..job_id
357             local job_exists = redis.call( 'EXISTS', job_key )
358             local job_data = {}
359              
360             if job_exists == 1 then
361             END_BODY_START
362              
363             my $lua_body_end = <<"END_BODY_END";
364             end
365             return { job_exists, unpack( job_data ) }
366             END_BODY_END
367              
368             # Deletes the job data in Redis server
369             $lua_script_body{delete_job} = <<"END_DELETE_JOB";
370             local job_id = ARGV[1]
371              
372             local job_key = '${NAMESPACE}:'..job_id
373             if redis.call( 'EXISTS', job_key ) == 1 then
374             redis.call( 'LREM', '${NAMESPACE}:queue:'..redis.call( 'HGET', job_key, 'queue' ), 0, redis.call( 'HGET', job_key, '${_ID_IN_QUEUE_FIELD}' ) )
375             return redis.call( 'DEL', job_key )
376             else
377             return nil
378             end
379             END_DELETE_JOB
380              
381             # Adds a job to the queue on the Redis server
382             $lua_script_body{load_job} = <<"END_LOAD_JOB";
383             $lua_body_start
384              
385             for _, field in ipairs( redis.call( 'HKEYS', job_key ) ) do
386             if field ~= '${_ID_IN_QUEUE_FIELD}' then
387             -- return the field names and values for the data fields
388             table.insert( job_data, field )
389             table.insert( job_data, redis.call( 'HGET', job_key, field ) )
390             end
391             end
392              
393             $lua_body_end
394             END_LOAD_JOB
395              
396             # Data of the job is requested from the Redis server
397             $lua_script_body{get_job_data} = <<"END_GET_JOB_DATA";
398             $lua_body_start
399              
400             for _, field in ipairs( data_fields ) do
401             table.insert( job_data, redis.call( 'HGET', job_key, field ) )
402             end
403              
404             $lua_body_end
405             END_GET_JOB_DATA
406              
407             # Gets queue status from the Redis server
408             $lua_script_body{queue_status} = <<"END_QUEUE_STATUS";
409             local queue = ARGV[1]
410             local tm = tonumber( ARGV[2] ) -- a floating seconds since the epoch
411              
412             local queue_key = '${NAMESPACE}:queue:'..queue
413             local queue_status = {}
414              
415             -- if it is necessary to determine the status of a particular queue
416             if queue then
417             -- Queue length
418             queue_status[ 'length' ] = redis.call( 'LLEN', queue_key )
419             -- if the queue is set
420             if queue_status[ 'length' ] > 0 then
421             -- for each item in the queue
422             for _, in_queue_id in ipairs( redis.call( 'LRANGE', queue_key, 0, -1 ) ) do
423             -- select the job ID and determine the value of a field 'created'
424             local created = redis.call( 'HGET', '${NAMESPACE}:'..in_queue_id:match( '^(%S+)' ), 'created' )
425             if created then
426             created = tonumber( created ) -- values are stored as strings (a floating seconds since the epoch)
427             -- initialize the calculated values
428             if not queue_status[ 'max_job_age' ] then
429             queue_status[ 'max_job_age' ] = 0
430             queue_status[ 'min_job_age' ] = 0
431             end
432             -- time of birth
433             if queue_status[ 'max_job_age' ] == 0 or created < queue_status[ 'max_job_age' ] then
434             queue_status[ 'max_job_age' ] = created
435             end
436             if queue_status[ 'min_job_age' ] == 0 or created > queue_status[ 'min_job_age' ] then
437             queue_status[ 'min_job_age' ] = created
438             end
439             end
440             end
441              
442             -- time of birth -> age
443             if queue_status[ 'max_job_age' ] then -- queue_status[ 'min_job_age' ] also ~= 0
444             -- The age of the old job (the lifetime of the queue)
445             queue_status[ 'max_job_age' ] = tm - queue_status[ 'max_job_age' ]
446             -- The age of the younger job
447             queue_status[ 'min_job_age' ] = tm - queue_status[ 'min_job_age' ]
448             -- Duration of queue activity (the period during which new jobs were created)
449             queue_status[ 'lifetime' ] = queue_status[ 'max_job_age' ] - queue_status[ 'min_job_age' ]
450             end
451             end
452             end
453              
454             -- all jobs in the queue, including inactive ones
455             queue_status[ 'all_jobs' ] = 0
456             -- review all job keys on the server, including inactive ones
457             for _, key in ipairs( redis.call( 'KEYS', '${NAMESPACE}:*' ) ) do
458             -- counting on the basic structures of jobs
459             if key:find( '^${NAMESPACE}:${lua_id_rgxp}' ) then
460             -- consider only the structure related to a given queue
461             if redis.call( 'HGET', key, 'queue' ) == queue then
462             queue_status[ 'all_jobs' ] = queue_status[ 'all_jobs' ] + 1
463             end
464             end
465             end
466              
467             local result_status = {}
468             -- memorize the names and values of what was possible to calculate
469             for key, val in pairs( queue_status ) do
470             table.insert( result_status, key )
471             table.insert( result_status, tostring( val ) )
472             end
473              
474             return result_status
475             END_QUEUE_STATUS
476              
477             # Gets a list of job IDs on the Redis server
478             $lua_script_body{get_job_ids} = <<"END_GET_JOB_IDS";
479             local is_queued = tonumber( ARGV[1] )
480             local queues = { unpack( ARGV, 3, 2 + ARGV[2] ) }
481             local statuses = { unpack( ARGV, 3 + ARGV[2] ) }
482              
483             local tmp_ids = {}
484              
485             if is_queued == 1 then -- jobs in the queues
486             -- if not limited to specified queues
487             if #queues == 0 then
488             -- determine the queues still have not served the job
489             for _, queue_key in ipairs( redis.call( 'KEYS', '${NAMESPACE}:queue:*' ) ) do
490             table.insert( queues, queue_key:match( '${lua_id_rgxp}' ) )
491             end
492             end
493             -- view each specified queue
494             for _, queue in ipairs( queues ) do
495             -- for each identifier contained in the queue
496             for _, in_queue_id in ipairs( redis.call( 'LRANGE', '${NAMESPACE}:queue:'..queue, 0, -1 ) ) do
497             -- distinguish and remember the ID of the job
498             tmp_ids[ in_queue_id:match( '^(%S+)' ) ] = 1
499             end
500             end
501             else -- all jobs on the server
502             local all_job_ids = {}
503             for _, key in ipairs( redis.call( 'KEYS', '${NAMESPACE}:*' ) ) do
504             -- considering only the basic structure of jobs
505             if key:find( '^${NAMESPACE}:${lua_id_rgxp}' ) then
506             -- forming a "hash" of jobs IDs
507             all_job_ids[ key:match( '${lua_id_rgxp}' ) ] = 1
508             end
509             end
510             -- if a restriction is set on the queues
511             if #queues > 0 then
512             -- analyze each job on a server
513             for job_id, _ in pairs( all_job_ids ) do
514             -- get the name of the job queue
515             local tmp_queue = redis.call( 'HGET', '${NAMESPACE}:'..job_id, 'queue' )
516             -- associate job queue name with the names of the queues from the restriction
517             for _, queue in ipairs( queues ) do
518             if tmp_queue == queue then
519             -- memorize the appropriate job ID
520             tmp_ids[ job_id ] = 1
521             break
522             end
523             end
524             end
525             else
526             -- if there is no restriction on the queues, then remember all the jobs on the server
527             tmp_ids = all_job_ids
528             end
529             end
530              
531             -- if the restriction is set by the statuses
532             if #statuses > 0 then
533             -- analyze each job from a previously created "hash"
534             for job_id, _ in pairs( tmp_ids ) do
535             -- determine the current status of the job
536             local job_status = redis.call( 'HGET', '${NAMESPACE}:'..job_id, 'status' )
537             -- associate job status with the statuses from the restriction
538             for _, status in ipairs( statuses ) do
539             if job_status == status then
540             -- mark the job satisfying the restriction
541             tmp_ids[ job_id ] = 2 -- Filter by status sign
542             break
543             end
544             end
545             end
546             -- reanalyze each job from a previously created "hash"
547             for job_id, _ in pairs( tmp_ids ) do
548             if tmp_ids[ job_id ] ~= 2 then
549             -- remove unfiltered by status
550             tmp_ids[ job_id ] = nil
551             end
552             end
553             end
554              
555             local job_ids = {}
556             for id, _ in pairs( tmp_ids ) do
557             -- remember identifiers of selected jobs
558             table.insert( job_ids, id )
559             end
560              
561             return job_ids
562             END_GET_JOB_IDS
563              
564             #-- constructor ----------------------------------------------------------------
565              
566             =head2 CONSTRUCTOR
567              
568             =head3 C $server, timeout =E $timeout, check_maxmemory =E $mode )>
569              
570             Creates a new C object to communicate with Redis server.
571             If invoked without any arguments, the constructor C creates and returns
572             a C object that is configured with the default settings and uses
573             local C server.
574              
575             In the parameters of the constructor 'redis' may be either a Redis object
576             or a server string or a hash reference of parameters to create a Redis object.
577              
578             Optional C boolean argument (default is true)
579             defines if attempt is made to find out maximum available memory from Redis.
580              
581             In some cases Redis implementation forbids such request,
582             but setting to false can be used as a workaround.
583              
584             =head3 Caveats related to connection with Redis server
585              
586             =over 3
587              
588             =item *
589              
590             According to L documentation:
591              
592             This module consider that any data sent to the Redis server is a raw octets string,
593             even if it has utf8 flag set.
594             And it doesn't do anything when getting data from the Redis server.
595              
596             =item *
597              
598             Non-serialize-able fields (like status or message) passed in UTF-8 can not be
599             correctly restored from Redis server. To avoid potential data corruction, passing
600             UTF-8 encoded value causes error.
601              
602             =item *
603              
604             L value is used when a L class object is
605             passed to the C constructor without additional C argument.
606              
607             =back
608              
609             This example illustrates C call with all possible arguments:
610              
611             my $jq = Redis::JobQueue->new(
612             redis => "$server:$port", # Connection info for Redis which hosts queue
613             timeout => $timeout, # wait time (in seconds)
614             # for blocking call of get_next_job.
615             # Set 0 for unlimited wait time
616             );
617             # or
618             my $jq = Redis::JobQueue->new(
619             redis => { # The hash reference of parameters
620             # to create a Redis object
621             server => "$server:$port",
622             },
623             );
624              
625             The following examples illustrate other uses of the C method:
626              
627             $jq = Redis::JobQueue->new();
628             my $next_jq = Redis::JobQueue->new( $jq );
629              
630             my $redis = Redis->new( server => "$server:$port" );
631             $next_jq = Redis::JobQueue->new(
632             $redis,
633             timeout => $timeout,
634             );
635             # or
636             $next_jq = Redis::JobQueue->new(
637             redis => $redis,
638             timeout => $timeout,
639             );
640              
641             An invalid argument causes die (C).
642              
643             =cut
644             around BUILDARGS => sub {
645             my $orig = shift;
646             my $class = shift;
647              
648             if ( _INSTANCE( $_[0], 'Redis' ) ) {
649             my $redis = shift;
650             return $class->$orig(
651             # have to look into the Redis object ...
652             redis => $redis->{server},
653             # it is impossible to know from Redis now ...
654             #timeout => $redis->???,
655             _redis => $redis,
656             @_
657             );
658             } elsif ( _INSTANCE( $_[0], 'Test::RedisServer' ) ) {
659             # to test only
660             my $redis = shift;
661             # have to look into the Test::RedisServer object ...
662             my $conf = $redis->conf;
663             $conf->{server} = '127.0.0.1:'.$conf->{port} unless exists $conf->{server};
664             return $class->$orig(
665             redis => $conf->{server},
666             _redis => Redis->new( %$conf ),
667             @_
668             );
669             } elsif ( _INSTANCE( $_[0], __PACKAGE__ ) ) {
670             my $jq = shift;
671             return $class->$orig(
672             redis => $jq->_server,
673             _redis => $jq->_redis,
674             timeout => $jq->timeout,
675             @_
676             );
677             } else {
678             my %args = @_;
679             my $redis = $args{redis};
680             if ( _INSTANCE( $redis, 'Redis' ) ) {
681             delete $args{redis};
682             return $class->$orig(
683             # have to look into the Redis object ...
684             redis => $redis->{server},
685             # it is impossible to know from Redis now ...
686             #timeout => $redis->???,
687             _redis => $redis,
688             %args,
689             );
690             }
691             elsif ( ref( $redis ) eq 'HASH' ) {
692             $args{_use_external_connection} = 0;
693             my $conf = $redis;
694             $conf->{server} = DEFAULT_SERVER.':'.DEFAULT_PORT unless exists $conf->{server};
695             delete $args{redis};
696             return $class->$orig(
697             redis => $conf->{server},
698             _redis => Redis->new( %$conf ),
699             %args,
700             );
701             } else {
702             return $class->$orig( %args );
703             }
704             }
705             };
706              
707             sub BUILD {
708             my ( $self ) = @_;
709              
710             $self->_redis( $self->_redis_constructor )
711             unless ( $self->_redis );
712             if ( $self->_check_maxmemory ) {
713             my ( undef, $max_datasize ) = $self->_call_redis( 'CONFIG', 'GET', 'maxmemory' );
714             defined( _NONNEGINT( $max_datasize ) )
715             or die $self->_error( E_NETWORK );
716             $self->max_datasize( min $max_datasize, $self->max_datasize )
717             if $max_datasize;
718             }
719              
720             my ( $major, $minor ) = $self->_redis->info->{redis_version} =~ /^(\d+)\.(\d+)/;
721             if ( $major < 2 || $major == 2 && $minor <= 4 ) {
722             $self->_error( E_REDIS );
723             confess 'Needs Redis server version 2.6 or higher';
724             }
725             }
726              
727             #-- public attributes ----------------------------------------------------------
728              
729             =head2 METHODS
730              
731             The following methods are available for object of the C class:
732              
733             =cut
734              
735             =head3 C
736              
737             Accessor to the C attribute.
738              
739             Returns current value of the attribute if called without an argument.
740              
741             Non-negative integer value can be used to specify a new value of the maximum
742             waiting time for queue (of the L method). Use
743             C = 0 for an unlimited wait time.
744              
745             =cut
746             has 'timeout' => (
747             is => 'rw',
748             isa => 'Redis::JobQueue::Job::NonNegInt',
749             default => DEFAULT_TIMEOUT,
750             );
751              
752             =head3 C
753              
754             Provides access to the C attribute.
755              
756             Returns current value of the attribute if called with no argument.
757              
758             Non-negative integer value can be used to specify a new value for the maximum
759             size of data in the attributes of a
760             L object.
761              
762             The check is done before sending data to the module L,
763             after possible processing by methods of module L
764             (attributes L, L
765             and L).
766             It is automatically serialized.
767              
768             The C attribute value is used in the L
769             and data entry job operations on the Redis server.
770              
771             The L uses the smaller of the values of 512MB and
772             C limit from a F file.
773              
774             =cut
775             has 'max_datasize' => (
776             is => 'rw',
777             isa => 'Redis::JobQueue::Job::NonNegInt',
778             default => MAX_DATASIZE,
779             );
780              
781             =head3 C
782              
783             Returns the code of the last identified error.
784              
785             See L section for description of error codes.
786              
787             =cut
788             has 'last_errorcode' => (
789             reader => 'last_errorcode',
790             writer => '_set_last_errorcode',
791             isa => 'Int',
792             default => 0,
793             );
794              
795             #-- private attributes ---------------------------------------------------------
796              
797             has '_server' => (
798             is => 'rw',
799             init_arg => 'redis',
800             isa => 'Str',
801             default => DEFAULT_SERVER.':'.DEFAULT_PORT,
802             trigger => sub {
803             my $self = shift;
804             $self->_server( $self->_server.':'.DEFAULT_PORT )
805             unless $self->_server =~ /:/;
806             },
807             );
808              
809             has '_redis' => (
810             is => 'rw',
811             isa => 'Maybe[Redis]',
812             default => undef,
813             );
814              
815             has '_transaction' => (
816             is => 'rw',
817             isa => 'Bool',
818             default => undef,
819             );
820              
821             has '_lua_scripts' => (
822             is => 'ro',
823             isa => 'HashRef[Str]',
824             lazy => 1,
825             init_arg => undef,
826             builder => sub { return {}; },
827             );
828              
829             has '_check_maxmemory' => (
830             is => 'ro',
831             init_arg => 'check_maxmemory',
832             isa => 'Bool',
833             default => 1,
834             );
835              
836             has '_use_external_connection' => (
837             is => 'rw',
838             isa => 'Bool',
839             default => 1,
840             );
841              
842             #-- public methods -------------------------------------------------------------
843              
844             =head3 C
845              
846             Returns error message of the last identified error.
847              
848             See L section for more info on errors.
849              
850             =cut
851             sub last_error {
852             my ( $self ) = @_;
853              
854             return $ERROR[ $self->last_errorcode ];
855             }
856              
857             =head3 C 1 )>
858              
859             Adds a job to the queue on the Redis server.
860              
861             The first argument should be either L
862             object (which is modified by the method) or a reference to a hash representing
863             L - in the latter case
864             a new L object is created.
865              
866             Returns a L object with a new
867             unique identifier.
868              
869             Job status is set to L.
870              
871             C optionally takes arguments in key-value pairs.
872              
873             The following example illustrates a C call with all possible arguments:
874              
875             my $job_data = {
876             id => '4BE19672-C503-11E1-BF34-28791473A258',
877             queue => 'lovely_queue', # required
878             job => 'strong_job', # optional attribute
879             expire => 12*60*60,
880             status => 'created',
881             workload => \'Some stuff',
882             result => \'JOB result comes here',
883             };
884              
885             my $job = Redis::JobQueue::Job->new( $job_data );
886              
887             my $resulting_job = $jq->add_job( $job );
888             # or
889             $resulting_job = $jq->add_job(
890             $pre_job,
891             LPUSH => 1,
892             );
893              
894             If used with the optional argument C 1>, the job is placed at the beginnig of
895             the queue and will be returned by the next call to get_next_job.
896              
897             TTL of job data on Redis server is set in accordance with the L
898             attribute of the L object. Make
899             sure it's higher than the time needed to process all the jobs in the queue.
900              
901             =cut
902             sub add_job {
903             my $self = shift;
904              
905             $self->_error( E_NO_ERROR );
906             ref( $_[0] ) eq 'HASH' || _INSTANCE( $_[0], 'Redis::JobQueue::Job' )
907             || confess $self->_error( E_MISMATCH_ARG );
908             my $job = _INSTANCE( $_[0], 'Redis::JobQueue::Job' ) ? shift : Redis::JobQueue::Job->new( shift );
909              
910             my %args = ( @_ );
911              
912             my ( $id, $key );
913             do {
914             $self->_call_redis( 'UNWATCH' );
915             $key = $self->_jkey( $id = $uuid->create_str );
916             $self->_call_redis( 'WATCH', $key );
917             } while ( $self->_call_redis( 'EXISTS', $self->_jkey( $id ) ) );
918              
919             $job->id( $id );
920             my $expire = $job->expire;
921              
922             # transaction start
923             $self->_call_redis( 'MULTI' );
924              
925             $self->_call_redis( 'HSET', $key, $_, $job->$_ // q{} )
926             for @job_fields;
927             $self->_call_redis( 'HSET', $key, $_, $job->meta_data( $_ ) // q{} )
928             for keys %{ $job->meta_data };
929              
930             my $id_in_list = $id;
931              
932             if ( $expire ) {
933             $id_in_list .= ' '.( time + $expire );
934             $self->_call_redis( 'EXPIRE', $key, $expire );
935             }
936              
937             $self->_call_redis( 'HSET', $key, $_ID_IN_QUEUE_FIELD, $id_in_list );
938              
939             $self->_call_redis( $args{LPUSH} ? 'LPUSH' : 'RPUSH', $self->_qkey( $job->queue ), $id_in_list );
940              
941             # transaction end
942             $self->_call_redis( 'EXEC' ) // return;
943              
944             $job->clear_modified;
945             return $job;
946             }
947              
948             =head3 C
949              
950             Data of the job is requested from the Redis server. First argument can be either
951             a job ID or L object.
952              
953             Returns C when the job was not found on Redis server.
954              
955             The method returns the jobs data from the Redis server.
956             See L for the list of standard jobs data fields.
957              
958             The method returns a reference to a hash of the standard jobs data fields if only the first
959             argument is specified.
960              
961             If given a key name C<$data_key>, it returns data corresponding to
962             the key or C when the value is undefined or key is not in the data or metadata.
963              
964             The following examples illustrate uses of the C method:
965              
966             my $data_href = $jq->get_job_data( $id );
967             # or
968             $data_href = $jq->get_job_data( $job );
969             # or
970             my $data_key = 'foo';
971             my $data = $jq->get_job_data( $job->id, $data_key );
972              
973             You can specify a list of names of key data or metadata.
974             In this case it returns the corresponding list of data. For example:
975              
976             my ( $status, $foo ) = $jq->get_job_data( $job->id, 'status', 'foo' );
977              
978             See L for more informations about
979             the jobs metadata.
980              
981             =cut
982             sub get_job_data {
983             my ( $self, $id_source, @data_keys ) = @_;
984              
985             my $job_id = $self->_get_job_id( $id_source );
986             my $data_fields = scalar @data_keys;
987             my @right_keys = map { _STRING( $_ ) || q{} } @data_keys;
988             my %right_names = map { $_ => 1 } grep { $_ } @right_keys;
989              
990             my @additional = ();
991             if ( $data_fields ) {
992             if ( exists $right_names{elapsed} ) {
993             foreach my $field ( qw( started completed failed ) ) {
994             push @additional, $field if !exists $right_names{ $field };
995             }
996             }
997             } else {
998             @additional = @job_fields;
999             }
1000             my @all_fields = ( @right_keys, @additional );
1001             my $total_fields = scalar( @all_fields );
1002              
1003             $self->_error( E_NO_ERROR );
1004              
1005             my $tm = time;
1006             my ( $job_exists, @data ) = $self->_call_redis(
1007             $self->_lua_script_cmd( 'get_job_data' ),
1008             0,
1009             $job_id,
1010             @all_fields,
1011             );
1012              
1013             return unless $job_exists;
1014              
1015             for ( my $i = 0; $i < $total_fields; ++$i ) {
1016             my $field = $all_fields[ $i ];
1017             if ( $field ne 'elapsed' && ( $field =~ /^(workload|result)$/ || !$job_fnames{ $field } ) ) {
1018             $data[ $i ] = ${ thaw( $data[ $i ] ) }
1019             if $data[ $i ];
1020             }
1021             }
1022              
1023             if ( !$data_fields ) {
1024             my %result_data;
1025             for ( my $i = 0; $i < $total_fields; ++$i ) {
1026             $result_data{ $all_fields[ $i ] } = $data[ $i ];
1027             }
1028              
1029             if ( my $started = $result_data{started} ) {
1030             $result_data{elapsed} = (
1031             $result_data{completed}
1032             || $result_data{failed}
1033             || time
1034             ) - $started;
1035             } else {
1036             $result_data{elapsed} = undef;
1037             }
1038              
1039             return \%result_data;
1040             } else {
1041             for ( my $i = 0; $i < $data_fields; ++$i ) {
1042             if ( $right_keys[ $i ] eq 'elapsed' ) {
1043             if ( my $started = $data[ firstidx { $_ eq 'started' } @all_fields ] ) {
1044             $data[ $i ] = (
1045             $data[ firstidx { $_ eq 'completed' } @all_fields ]
1046             || $data[ firstidx { $_ eq 'failed' } @all_fields ]
1047             || $tm
1048             ) - $started;
1049             } else {
1050             $data[ $i ] = undef;
1051             }
1052             }
1053              
1054             return $data[0] unless wantarray;
1055             }
1056              
1057             return @data;
1058             }
1059             }
1060              
1061             =head3 C
1062              
1063             The list of names of metadata fields of the job is requested from the Redis server.
1064             First argument can be either a job ID or
1065             L object.
1066              
1067             Returns empty list when the job was not found on Redis server or
1068             the job does not have metadata.
1069              
1070             The following examples illustrate uses of the C method:
1071              
1072             my @fields = $jq->get_job_meta_fields( $id );
1073             # or
1074             @fields = $jq->get_job_meta_fields( $job );
1075             # or
1076             @fields = $jq->get_job_meta_fields( $job->id );
1077              
1078             See L for more informations about
1079             the jobs metadata.
1080              
1081             =cut
1082             sub get_job_meta_fields {
1083             my ( $self, $id_source ) = @_;
1084              
1085             return grep { !$job_fnames{ $_ } && $_ ne $_ID_IN_QUEUE_FIELD } $self->_call_redis( 'HKEYS', $self->_jkey( $self->_get_job_id( $id_source ) ) );
1086             }
1087              
1088             =head3 C
1089              
1090             Loads job data from the Redis server. The argument is either job ID or
1091             a L object.
1092              
1093             Method returns the object corresponding to the loaded job.
1094             Returns C if the job is not found on the Redis server.
1095              
1096             The following examples illustrate uses of the C method:
1097              
1098             $job = $jq->load_job( $id );
1099             # or
1100             $job = $jq->load_job( $job );
1101              
1102             =cut
1103             sub load_job {
1104             my ( $self, $id_source ) = @_;
1105              
1106             my $job_id = $self->_get_job_id( $id_source );
1107              
1108             $self->_error( E_NO_ERROR );
1109              
1110             my ( $job_exists, @job_data ) = $self->_call_redis(
1111             $self->_lua_script_cmd( 'load_job' ),
1112             0,
1113             $job_id,
1114             @job_fields,
1115             );
1116              
1117             return unless $job_exists;
1118              
1119             my ( $pre_job, $key, $val );
1120             while ( @job_data ) {
1121             $key = shift @job_data;
1122             $val = shift @job_data;
1123             if ( $job_fnames{ $key } ) {
1124             $pre_job->{ $key } = $val;
1125             } else {
1126             $pre_job->{meta_data}->{ $key } = $val;
1127             }
1128             }
1129              
1130             foreach my $field ( qw( workload result ) ) {
1131             $pre_job->{ $field } = ${ thaw( $pre_job->{ $field } ) }
1132             if $pre_job->{ $field };
1133             }
1134             if ( $pre_job->{meta_data} ) {
1135             my $meta_data = $pre_job->{meta_data};
1136             foreach my $field ( keys %$meta_data ) {
1137             $meta_data->{ $field } = ${ thaw( $meta_data->{ $field } ) }
1138             if $meta_data->{ $field };
1139             }
1140             }
1141              
1142             my $new_job = Redis::JobQueue::Job->new( $pre_job );
1143             $new_job->clear_modified;
1144              
1145             return $new_job;
1146             }
1147              
1148             =head3 C $queue_name, blocking =E 1 )>
1149              
1150             Selects the job identifier which is at the beginning of the queue.
1151              
1152             C takes arguments in key-value pairs.
1153             You can specify a queue name or a reference to an array of queue names.
1154             Queues from the list are processed in random order.
1155              
1156             By default, each queue is processed in a separate request with the result
1157             returned immediately if a job is found (waiting) in that queue. If no waiting
1158             job found, returns undef.
1159             In case optional C argument is true, all queues are processed in
1160             a single request to Redis server and if no job is found waiting in queue(s),
1161             get_next_job execution will be paused for up to C seconds or until
1162             a job becomes available in any of the listed queues.
1163              
1164             Use C = 0 for an unlimited wait time.
1165             Default - C is false (0).
1166              
1167             Method returns the job object corresponding to the received job identifier.
1168             Returns the C if there is no job in the queue.
1169              
1170             These examples illustrate a C call with all the valid arguments:
1171              
1172             $job = $jq->get_next_job(
1173             queue => 'xxx',
1174             blocking => 1,
1175             );
1176             # or
1177             $job = $jq->get_next_job(
1178             queue => [ 'aaa', 'bbb' ],
1179             blocking => 1,
1180             );
1181              
1182             TTL job data for the job resets on the Redis server in accordance with
1183             the L attribute of the job object.
1184              
1185             =cut
1186             sub get_next_job {
1187             my $self = shift;
1188              
1189             !( scalar( @_ ) % 2 )
1190             || confess $self->_error( E_MISMATCH_ARG );
1191             my %args = ( @_ );
1192             my $queues = $args{queue};
1193             my $blocking = $args{blocking};
1194             my $only_id = $args{_only_id};
1195              
1196             $queues = [ $queues // () ]
1197             if !ref( $queues );
1198              
1199             foreach my $arg ( ( @{$queues} ) ) {
1200             defined _STRING( $arg )
1201             or confess $self->_error( E_MISMATCH_ARG );
1202             }
1203              
1204             my @keys = ();
1205             push @keys, map { $self->_qkey( $_ ) } @$queues;
1206              
1207             if ( @keys ) {
1208             @keys = shuffle( @keys );
1209              
1210             my $full_id;
1211             if ( $blocking ) {
1212             # 'BLPOP' waiting time of a given $self->timeout parameter
1213             my @cmd = ( 'BLPOP', ( @keys ), $self->timeout );
1214             while (1) {
1215             ( undef, $full_id ) = $self->_call_redis( @cmd );
1216             # if the job is no longer
1217             last unless $full_id;
1218              
1219             my $ret = $self->_get_next_job( $full_id, $only_id );
1220             return $ret if $ret;
1221             }
1222             } else {
1223             # 'LPOP' takes only one queue name at a time
1224             foreach my $key ( @keys ) {
1225             next unless $self->_call_redis( 'EXISTS', $key );
1226             my @cmd = ( 'LPOP', $key );
1227             while (1) {
1228             $full_id = $self->_call_redis( @cmd );
1229             # if the job is no longer
1230             last unless $full_id;
1231              
1232             my $ret = $self->_get_next_job( $full_id, $only_id );
1233             return $ret if $ret;
1234             }
1235             }
1236             }
1237             }
1238              
1239             return;
1240             }
1241              
1242             =head3 C $queue_name, blocking =E 1 )>
1243              
1244             Like L, but returns job identifier only.
1245              
1246             TTL job data for the job does not reset on the Redis server.
1247              
1248             =cut
1249             sub get_next_job_id {
1250             my $self = shift;
1251              
1252             return $self->get_next_job( @_, _only_id => 1 );
1253             }
1254              
1255             sub _get_next_job {
1256             my ( $self, $full_id, $only_id ) = @_;
1257              
1258             my ( $id, $expire_time ) = split ' ', $full_id;
1259             my $key = $self->_jkey( $id );
1260             if ( $self->_call_redis( 'EXISTS', $key ) ) {
1261             if ( $only_id ) {
1262             return $id;
1263             } else {
1264             my $job = $self->load_job( $id );
1265             if ( my $expire = $job->expire ) {
1266             $self->_call_redis( 'EXPIRE', $key, $expire );
1267             }
1268             return $job;
1269             }
1270             } else {
1271             if ( !$expire_time || time < $expire_time ) {
1272             confess format_message( '%s %s', $id, $self->_error( E_JOB_DELETED ) );
1273             }
1274             # If the queue contains a job identifier that has already been removed due
1275             # to expiration, the cycle will ensure the transition
1276             # to the next job ID selection
1277             return;
1278             }
1279             }
1280              
1281             =head3 C
1282              
1283             Saves job data changes to the Redis server. Job ID is obtained from
1284             the argument, which can be a L
1285             object.
1286              
1287             Returns the number of attributes that were updated if the job was found on the Redis
1288             server and C if it was not.
1289             When you change a single attribute, returns C<2> because L also changes.
1290              
1291             Changing the L attribute is ignored.
1292              
1293             The following examples illustrate uses of the C method:
1294              
1295             $jq->update_job( $job );
1296              
1297             TTL job data for the job resets on the Redis server in accordance with
1298             the L attribute of the job object.
1299              
1300             =cut
1301             sub update_job {
1302             my ( $self, $job ) = @_;
1303              
1304             _INSTANCE( $job, 'Redis::JobQueue::Job' )
1305             or confess $self->_error( E_MISMATCH_ARG );
1306              
1307             my @modified = $job->modified_attributes;
1308             return 0 unless @modified;
1309              
1310             my $id = $job->id;
1311             my $key = $self->_jkey( $id );
1312             $self->_call_redis( 'WATCH', $key );
1313             if ( !$self->_call_redis( 'EXISTS', $key ) ) {
1314             $self->_call_redis( 'UNWATCH' );
1315             return;
1316             }
1317              
1318             # transaction start
1319             $self->_call_redis( 'MULTI' );
1320              
1321             my $expire = $job->expire;
1322             if ( $expire ) {
1323             $self->_call_redis( 'EXPIRE', $key, $expire );
1324             } else {
1325             $self->_call_redis( 'PERSIST', $key );
1326             }
1327              
1328             my $updated = 0;
1329             foreach my $field ( @modified ) {
1330             if ( !$job_fnames{ $field } ) {
1331             $self->_call_redis( 'HSET', $key, $field, $job->meta_data( $field ) // q{} );
1332             ++$updated;
1333             } elsif ( $field ne 'expire' && $field ne 'id' ) {
1334             $self->_call_redis( 'HSET', $key, $field, $job->$field // q{} );
1335             ++$updated;
1336             } else {
1337             # Field 'expire' and 'id' shall remain unchanged
1338             }
1339             }
1340              
1341             # transaction end
1342             $self->_call_redis( 'EXEC' ) // return;
1343             $job->clear_modified;
1344              
1345             return $updated;
1346             }
1347              
1348             =head3 C
1349              
1350             Deletes job's data from Redis server.
1351             The Job ID is obtained from the argument, which can be either a string or
1352             a L object.
1353              
1354             Returns true if job and its metadata was successfully deleted from Redis server.
1355             False if jobs or its metadata wasn't found.
1356              
1357             The following examples illustrate uses of the C method:
1358              
1359             $jq->delete_job( $job );
1360             # or
1361             $jq->delete_job( $id );
1362              
1363             Use this method soon after receiving the results of the job to free memory on
1364             the Redis server.
1365              
1366             See description of the C data structure used on the Redis server
1367             at the L section.
1368              
1369             Note that job deletion time is proportional to number of jobs currently in the queue.
1370              
1371             =cut
1372             sub delete_job {
1373             my ( $self, $id_source ) = @_;
1374              
1375             defined( _STRING( $id_source ) ) || _INSTANCE( $id_source, 'Redis::JobQueue::Job' )
1376             || confess $self->_error( E_MISMATCH_ARG );
1377              
1378             $self->_error( E_NO_ERROR );
1379              
1380             return $self->_call_redis(
1381             $self->_lua_script_cmd( 'delete_job' ),
1382             0,
1383             ref( $id_source ) ? $id_source->id : $id_source,
1384             );
1385             }
1386              
1387             =head3 C
1388              
1389             Gets list of job IDs on the Redis server.
1390             These IDs are identifiers of job data structures, not only the identifiers which
1391             get derived from the queue by L.
1392              
1393             The following examples illustrate simple uses of the C method
1394             (IDs of all existing jobs):
1395              
1396             @jobs = $jq->get_job_ids;
1397              
1398             These are identifiers of jobs data structures related to the queue
1399             determined by the arguments.
1400              
1401             C takes arguments in key-value pairs.
1402             You can specify a queue name or job status
1403             (or a reference to an array of queue names or job statuses)
1404             to filter the list of identifiers.
1405              
1406             You can also specify an argument C (true or false).
1407              
1408             When filtering by the names of the queues and C is set to true,
1409             only the identifiers of the jobs which have not yet been derived from
1410             the queues using L are returned.
1411              
1412             Filtering by status returns the task IDs whose status is exactly the same
1413             as the specified status.
1414              
1415             The following examples illustrate uses of the C method:
1416              
1417             # filtering the identifiers of jobs data structures
1418             @ids = $jq->get_job_ids( queue => 'some_queue' );
1419             # or
1420             @ids = $jq->get_job_ids( queue => [ 'foo_queue', 'bar_queue' ] );
1421             # or
1422             @ids = $jq->get_job_ids( status => STATUS_COMPLETED );
1423             # or
1424             @ids = $jq->get_job_ids( status => [ STATUS_COMPLETED, STATUS_FAILED ] );
1425              
1426             # filter the IDs are in the queues
1427             @ids = $jq->get_job_ids( queued => 1, queue => 'some_queue' );
1428             # etc.
1429              
1430             =cut
1431             sub get_job_ids {
1432             my $self = shift;
1433              
1434             confess format_message( '%s (Odd number of elements in hash assignment)', $self->_error( E_MISMATCH_ARG ) )
1435             if ( scalar( @_ ) % 2 );
1436              
1437             my %args = @_;
1438              
1439             # Here are the arguments to references to arrays
1440             foreach my $field ( qw( queue status ) ) {
1441             $args{ $field } = [ $args{ $field } ] if exists( $args{ $field } ) && ref( $args{ $field } ) ne 'ARRAY';
1442             }
1443              
1444             my @queues = grep { _STRING( $_ ) } @{ $args{queue} };
1445             my @statuses = grep { _STRING( $_ ) } @{ $args{status} };
1446              
1447             $self->_error( E_NO_ERROR );
1448              
1449             my @ids = $self->_call_redis(
1450             $self->_lua_script_cmd( 'get_job_ids' ),
1451             0,
1452             $args{queued} ? 1 : 0,
1453             scalar( @queues ), # the number of queues to filter
1454             scalar( @queues ) ? @queues : (), # the queues to filter
1455             scalar( @statuses ) ? @statuses : (), # the statuses to filter
1456             );
1457              
1458             return @ids;
1459             }
1460              
1461             =head3 C
1462              
1463             Returns the address of the Redis server used by the queue
1464             (in form of 'host:port').
1465              
1466             The following example illustrates use of the C method:
1467              
1468             $redis_address = $jq->server;
1469              
1470             =cut
1471             sub server {
1472             my ( $self ) = @_;
1473              
1474             return $self->_server;
1475             }
1476              
1477             =head3 C
1478              
1479             This command is used to test connection to Redis server.
1480              
1481             Returns 1 if a connection is still alive or 0 otherwise.
1482              
1483             The following example illustrates use of the C method:
1484              
1485             $is_alive = $jq->ping;
1486              
1487             External connections to the server object (eg, C <$redis = Redis->new( ... );>),
1488             and the queue object can continue to work after calling ping only if the method returned 1.
1489              
1490             If there is no connection to the Redis server (methods return 0), the connection to the server closes.
1491             In this case, to continue working with the queue,
1492             you must re-create the C object with the L method.
1493             When using an external connection to the server,
1494             to check the connection to the server you can use the C<$redis->echo( ... )> call.
1495             This is useful to avoid closing the connection to the Redis server unintentionally.
1496              
1497             =cut
1498             sub ping {
1499             my ( $self ) = @_;
1500              
1501             $self->_error( E_NO_ERROR );
1502              
1503             my $ret;
1504             try {
1505             $ret = $self->_redis->ping;
1506             } catch {
1507             $self->_redis_exception( $_ );
1508             };
1509              
1510             return( ( $ret // '' ) eq 'PONG' ? 1 : 0 );
1511             }
1512              
1513             =head3 C
1514              
1515             Closes connection to the Redis server.
1516              
1517             The following example illustrates use of the C method:
1518              
1519             $jq->quit;
1520              
1521             It does not close the connection to the Redis server if it is an external connection provided
1522             to queue constructor as existing L object.
1523             When using an external connection (eg, C<$redis = Redis-> new (...);>),
1524             to close the connection to the Redis server, call C<$redis->quit> after calling this method.
1525              
1526             =cut
1527             sub quit {
1528             my ( $self ) = @_;
1529              
1530             return if $] >= 5.14 && ${^GLOBAL_PHASE} eq 'DESTRUCT';
1531              
1532             $self->_error( E_NO_ERROR );
1533              
1534             unless ( $self->_use_external_connection ) {
1535             try {
1536             $self->_redis->quit;
1537             } catch {
1538             $self->_redis_exception( $_ );
1539             };
1540             }
1541              
1542             return;
1543             }
1544              
1545             =head3 C
1546              
1547             Gets queue status from the Redis server.
1548             Queue name is obtained from the argument. The argument can be either a
1549             string representing a queue name or a
1550             L object.
1551              
1552             Returns a reference to a hash describing state of the queue or a reference
1553             to an empty hash when the queue wasn't found.
1554              
1555             The following examples illustrate uses of the C method:
1556              
1557             $qstatus = $jq->queue_status( $queue_name );
1558             # or
1559             $qstatus = $jq->queue_status( $job );
1560              
1561             The returned hash contains the following information related to the queue:
1562              
1563             =over 3
1564              
1565             =item * C
1566              
1567             The number of jobs in the active queue that are waiting to be selected by L and then processed.
1568              
1569             =item * C
1570              
1571             The number of ALL jobs tagged with the queue, i.e. including those that were processed before and other jobs,
1572             not presently waiting in the active queue.
1573              
1574             =item * C
1575              
1576             The age of the oldest job (the lifetime of the queue) in the active queue.
1577              
1578             =item * C
1579              
1580             The age of the youngest job in the active queue.
1581              
1582             =item * C
1583              
1584             Time it currently takes for a job to pass through the queue.
1585              
1586             =back
1587              
1588             Statistics based on the jobs that have not yet been removed.
1589             Some fields may be missing if the status of the job prevents determining
1590             the desired information (eg, there are no jobs in the queue).
1591              
1592             =cut
1593             # Statistics based on the jobs that have not yet been removed
1594             sub queue_status {
1595             my ( $self, $maybe_queue ) = @_;
1596              
1597             defined( _STRING( $maybe_queue ) ) || _INSTANCE( $maybe_queue, 'Redis::JobQueue::Job' )
1598             || confess $self->_error( E_MISMATCH_ARG );
1599              
1600             $maybe_queue = $maybe_queue->queue
1601             if ref $maybe_queue;
1602              
1603             $self->_error( E_NO_ERROR );
1604              
1605             my %qstatus = $self->_call_redis(
1606             $self->_lua_script_cmd( 'queue_status' ),
1607             0,
1608             $maybe_queue,
1609             Time::HiRes::time,
1610             );
1611              
1612             return \%qstatus;
1613             }
1614              
1615             #-- private methods ------------------------------------------------------------
1616              
1617             sub _redis_exception {
1618             my ( $self, $error ) = @_;
1619              
1620             # Use the error messages from Redis.pm
1621             if (
1622             $error =~ /^Could not connect to Redis server at /
1623             || $error =~ /^Can't close socket: /
1624             || $error =~ /^Not connected to any server/
1625             # Maybe for pub/sub only
1626             || $error =~ /^Error while reading from Redis server: /
1627             || $error =~ /^Redis server closed connection/
1628             ) {
1629             $self->_error( E_NETWORK );
1630             } elsif (
1631             $error =~ /[\S+] ERR command not allowed when used memory > 'maxmemory'/
1632             || $error =~ /[\S+] OOM command not allowed when used memory > 'maxmemory'/
1633             ) {
1634             $self->_error( E_MAX_MEMORY_LIMIT );
1635             } else {
1636             $self->_error( E_REDIS );
1637             }
1638              
1639             if ( $self->_transaction ) {
1640             try {
1641             $self->_redis->discard;
1642             };
1643             $self->_transaction( 0 );
1644             }
1645             die $error;
1646             }
1647              
1648             sub _redis_constructor {
1649             my ( $self ) = @_;
1650              
1651             $self->_error( E_NO_ERROR );
1652             my $redis;
1653              
1654             try {
1655             $redis = Redis->new(
1656             server => $self->_server,
1657             );
1658             } catch {
1659             $self->_redis_exception( $_ );
1660             };
1661             return $redis;
1662             }
1663              
1664             # Keep in mind the default 'redis.conf' values:
1665             # Close the connection after a client is idle for N seconds (0 to disable)
1666             # timeout 300
1667              
1668             # Send a request to Redis
1669             sub _call_redis {
1670             my $self = shift;
1671             my $method = shift;
1672              
1673             my @result;
1674             $self->_error( E_NO_ERROR );
1675              
1676             my $error;
1677             # if you use "$method eq 'HSET'" then use $_[0..2] to reduce data copies
1678             # $_[0] - key
1679             # $_[1] - field
1680             # $_[2] - value
1681             if ( $method eq 'HSET' && $_[1] eq $_ID_IN_QUEUE_FIELD ) {
1682             my ( $key, $field, $value ) = @_;
1683             try {
1684             @result = $self->_redis->$method(
1685             $key,
1686             $field,
1687             $value,
1688             );
1689             } catch {
1690             $error = $_;
1691             };
1692             } elsif ( $method eq 'HSET' && ( $_[1] =~ /^(workload|result)$/ || !$job_fnames{ $_[1] } ) ) {
1693             my $data_ref = \nfreeze( \$_[2] );
1694              
1695             if ( length( $$data_ref ) > $self->max_datasize ) {
1696             if ( $self->_transaction ) {
1697             try {
1698             $self->_redis->discard;
1699             };
1700             $self->_transaction( 0 );
1701             }
1702             # 'die' as maybe too long to analyze the data output from the 'confess'
1703             die format_message( '%s: %s', $self->_error( E_DATA_TOO_LARGE ), $_[1] );
1704             }
1705              
1706             my ( $key, $field ) = @_;
1707             try {
1708             @result = $self->_redis->$method(
1709             $key,
1710             $field,
1711             $$data_ref,
1712             );
1713             } catch {
1714             $error = $_;
1715             };
1716             } elsif ( $method eq 'HSET' && utf8::is_utf8( $_[2] ) ) {
1717             # In our case, the user sends data to the job queue and then gets it back.
1718             # Workload and result are fine with Unicode - they're properly serialized by Storable and stored as bytes in Redis;
1719             # Storable takes care about Unicode etc.
1720             # The main string job data fields (id, queue, job, status, message),
1721             # however, is not serialized for performance and convenience reasons, and stored in Redis as-is.
1722             # If there is Unicode in these fields, we have the following options:
1723             # 1. Assume everything is Unicode, turn utf-8 encoding in Redis.pm settings and take a substantial performance hit;
1724             # as we store the biggest parts of job data - workload and result - serialized already,
1725             # encoding and decoding them again is not a good idea.
1726             # 2. Assume that all fields is Unicode, encode and decode it;
1727             # this may lead to subtle errors if user provides it which is binary, not Unicode.
1728             # 3. Detect Unicode data and store "utf-8" flag along with data on redis,
1729             # to decode only utf-8 data when it is requested by user.
1730             # This makes data management more complicated.
1731             # 4. Assume that data is for application internal use,
1732             # and that application must ensure that it does not contain Unicode;
1733             # if Unicode is really needed, it should be either stored in workload or result,
1734             # or the application must take care about encoding and decoding Unicode data before sending to the job queue.
1735             # The job queue will throw an exception if Unicode data is encountered.
1736             #
1737             # We choose (4) as it is consistent, does not degrade performance and does not cause subtle errors with damaged data.
1738              
1739             # For non-serialized fields: UTF8 can not be transferred to the Redis server
1740             confess format_message( '%s (utf8 in %s)', $self->_error( E_MISMATCH_ARG ), $_[1] );
1741             } else {
1742             my @args = @_;
1743             try {
1744             @result = $self->_redis->$method( @args );
1745             } catch {
1746             $error = $_;
1747             };
1748             }
1749              
1750             $self->_redis_exception( $error )
1751             if $error;
1752              
1753             $self->_transaction( 1 )
1754             if $method eq 'MULTI';
1755             if ( $method eq 'EXEC' ) {
1756             $self->_transaction( 0 );
1757             $result[0] // return; # 'WATCH': the transaction is not entered at all
1758             }
1759              
1760             if ( $method eq 'HGET' and $_[1] =~ /^(workload|result)$/ ) {
1761             if ( $result[0] ) {
1762             $result[0] = ${ thaw( $result[0] ) };
1763             $result[0] = ${ $result[0] } if ref( $result[0] ) eq 'SCALAR';
1764             }
1765             }
1766              
1767             return wantarray ? @result : $result[0];
1768             }
1769              
1770             sub _jkey {
1771             my $self = shift;
1772              
1773             local $" = ':';
1774             return "$NAMESPACE:@_";
1775             }
1776              
1777             sub _qkey {
1778             my $self = shift;
1779              
1780             local $" = ':';
1781             return "$NAMESPACE:queue:@_";
1782             }
1783              
1784             sub _get_job_id {
1785             my ( $self, $id_source ) = @_;
1786              
1787             defined( _STRING( $id_source ) ) || _INSTANCE( $id_source, 'Redis::JobQueue::Job' )
1788             || confess $self->_error( E_MISMATCH_ARG );
1789              
1790             return ref( $id_source ) ? $id_source->id : $id_source;
1791             }
1792              
1793             sub _lua_script_cmd {
1794             my ( $self, $name ) = @_;
1795              
1796             my $sha1 = $self->_lua_scripts->{ $name };
1797             unless ( $sha1 ) {
1798             $sha1 = $self->_lua_scripts->{ $name } = sha1_hex( $lua_script_body{ $name } );
1799             unless ( ( $self->_call_redis( 'SCRIPT', 'EXISTS', $sha1 ) )[0] ) {
1800             return( 'EVAL', $lua_script_body{ $name } );
1801             }
1802             }
1803             return( 'EVALSHA', $sha1 );
1804             }
1805              
1806             sub _error {
1807             my ( $self, $error_code ) = @_;
1808              
1809             $self->_set_last_errorcode( $error_code );
1810             return $self->last_error;
1811             }
1812              
1813             #-- Closes and cleans up -------------------------------------------------------
1814              
1815             no Mouse::Util::TypeConstraints;
1816             no Mouse; # keywords are removed from the package
1817             __PACKAGE__->meta->make_immutable();
1818              
1819             __END__