File Coverage

blib/lib/Log/Saftpresse/Input/Redis.pm
Criterion Covered Total %
statement 9 34 26.4
branch 0 4 0.0
condition n/a
subroutine 3 10 30.0
pod 0 5 0.0
total 12 53 22.6


line stmt bran cond sub pod time code
1             package Log::Saftpresse::Input::Redis;
2              
3 1     1   1627 use Moose;
  1         2  
  1         7  
4              
5             # ABSTRACT: log input for reading a redis queue
6             our $VERSION = '1.6'; # VERSION
7              
8              
9             extends 'Log::Saftpresse::Input';
10              
11 1     1   6785 use Redis;
  1         32946  
  1         54  
12 1     1   11 use JSON qw(decode_json);
  1         3  
  1         15  
13              
14             has 'server' => ( is => 'ro', isa => 'Str',
15             default => '127.0.0.1:6379'
16             );
17             has 'sock' => ( is => 'ro', isa => 'Maybe[Str]' );
18             has 'db' => ( is => 'ro', isa => 'Int', default => 0 );
19              
20             has '_redis' => ( is => 'rw', isa => 'Redis', lazy => 1,
21             default => sub {
22             my $self = shift;
23             return $self->_connect_redis;
24             },
25             );
26              
27             has 'queue' => ( is => 'ro', isa => 'Str', default => 'logs' );
28              
29             has 'max_bulk' => ( is => 'ro', isa => 'Int', default => 100 );
30              
31             sub _connect_redis {
32 0     0     my $self = shift;
33 0 0         my $r = Redis->new(
34             defined $self->sock ? (
35             sock => $self->sock,
36             ) : (
37             server => $self->server,
38             ),
39             );
40 0           $r->select( $self->db );
41 0           return $r;
42             }
43              
44             sub io_handles {
45 0     0 0   my $self = shift;
46 0           return;
47             }
48              
49             sub queue_len {
50 0     0 0   my $self = shift;
51 0           return $self->_redis->llen($self->queue);
52             }
53              
54             sub read_events {
55 0     0 0   my ( $self ) = @_;
56 0           my @queue;
57             my @events;
58              
59 0           my $cnt = $self->queue_len;
60 0 0         if( $cnt > $self->max_bulk ) {
61 0           $cnt = $self->max_bulk;
62             }
63              
64 0           foreach (1..$cnt) {
65 0     0     $self->_redis->rpop($self->queue, sub {push @queue, $_[0]});
  0            
66             }
67 0           $self->_redis->wait_all_responses;
68              
69 0           foreach my $entry ( grep { defined $_ } @queue ) {
  0            
70 0           push( @events, decode_json($entry) );
71             }
72              
73 0           return @events;
74             }
75              
76             sub eof {
77 0     0 0   my $self = shift;
78 0           return 0; # queues dont have an end?
79             }
80              
81             sub can_read {
82 0     0 0   my $self = shift;
83 0           return $self->queue_len;
84             }
85              
86              
87             1;
88              
89             __END__
90              
91             =pod
92              
93             =encoding UTF-8
94              
95             =head1 NAME
96              
97             Log::Saftpresse::Input::Redis - log input for reading a redis queue
98              
99             =head1 VERSION
100              
101             version 1.6
102              
103             =head1 Description
104              
105             This input reads new events from a redis queue.
106              
107             =head1 Synopsis
108              
109             <Input myapp>
110             module = "Redis"
111             server = "127.0.0.1:6379"
112             # sock = "/path/to/socket"
113             db = 0
114             queue = "logs"
115             </Input>
116              
117             =head1 Format
118              
119             Format is expected to be in JSON format.
120             Each event must be a hash.
121              
122             =head1 AUTHOR
123              
124             Markus Benning <ich@markusbenning.de>
125              
126             =head1 COPYRIGHT AND LICENSE
127              
128             This software is Copyright (c) 1998 by James S. Seymour, 2015 by Markus Benning.
129              
130             This is free software, licensed under:
131              
132             The GNU General Public License, Version 2, June 1991
133              
134             =cut