File Coverage

blib/lib/Message/Passing/Output/Search/Elasticsearch.pm
Criterion Covered Total %
statement 12 23 52.1
branch 0 6 0.0
condition 0 3 0.0
subroutine 4 7 57.1
pod 1 1 100.0
total 17 40 42.5


line stmt bran cond sub pod time code
1             package Message::Passing::Output::Search::Elasticsearch;
2             $Message::Passing::Output::Search::Elasticsearch::VERSION = '0.004';
3             # ABSTRACT: index messages in Elasticsearch
4              
5 2     2   331686 use Moo;
  2         16  
  2         10  
6             use MooX::Types::MooseLike::Base
7 2     2   1104 qw( Str ArrayRef HashRef CodeRef is_CodeRef AnyOf ConsumerOf InstanceOf );
  2         9170  
  2         236  
8              
9 2     2   844 use Search::Elasticsearch::Async;
  2         11910  
  2         56  
10 2     2   12 use Promises backend => ['AnyEvent'];
  2         4  
  2         10  
11              
12             with 'Message::Passing::Role::Output';
13              
14              
15              
16             has es_params => (
17             is => 'ro',
18             isa => HashRef,
19             default => sub { {} },
20             );
21              
22              
23             has es => (
24             is => 'ro',
25             lazy => 1,
26             isa => ConsumerOf ['Search::Elasticsearch::Role::Client'],
27             builder => sub {
28 0     0     my $self = shift;
29 0           return Search::Elasticsearch::Async->new( %{ $self->es_params } );
  0            
30             },
31             );
32              
33              
34             has es_bulk_params => (
35             is => 'ro',
36             isa => HashRef,
37             default => sub { {} },
38             );
39              
40              
41             has es_bulk => (
42             is => 'ro',
43             lazy => 1,
44             isa => ConsumerOf [
45             'Search::Elasticsearch::Client::5_0::Role::Bulk',
46             'Search::Elasticsearch::Role::Is_Async'
47             ],
48             builder => sub {
49 0     0     my $self = shift;
50 0           return $self->es->bulk_helper( %{ $self->es_bulk_params } );
  0            
51             },
52             );
53              
54              
55             has type => (
56             is => 'ro',
57             required => 1,
58             isa => AnyOf [ Str, CodeRef ],
59             );
60              
61              
62             has index_name => (
63             is => 'ro',
64             required => 1,
65             isa => AnyOf [ Str, CodeRef ],
66             );
67              
68              
69             sub consume {
70 0     0 1   my ( $self, $data ) = @_;
71             return
72 0 0 0       unless defined $data && ref $data eq 'HASH';
73              
74             #if ( my $epochtime = delete $data->{epochtime} ) {
75             #$date = DateTime->from_epoch(epoch => $epochtime);
76             #}
77             #$date ||= DateTime->from_epoch(epoch => time());
78              
79 0 0         my $type =
80             is_CodeRef( $self->type )
81             ? $self->type->($data)
82             : $self->type;
83 0 0         my $index_name =
84             is_CodeRef( $self->index_name )
85             ? $self->index_name->($data)
86             : $self->index_name;
87              
88             #$self->_indexes->{$index_name} = 1;
89             # my $to_queue = {
90             # '@timestamp' => to_ISO8601DateTimeStr($date),
91             # '@tags' => [],
92             # '@type' => $type,
93             # '@source_host' => delete( $data->{hostname} ) || 'none',
94             # '@message' => exists( $data->{message} )
95             # ? delete( $data->{message} )
96             # : encode_json($data),
97             # '@fields' => $data,
98             # exists( $data->{uuid} ) ? ( id => delete( $data->{uuid} ) ) : (),
99             # };
100 0           $self->es_bulk->index(
101             { index => $index_name,
102             type => $type,
103             source => $data,
104             }
105             );
106             }
107              
108             1;
109              
110             __END__
111              
112             =pod
113              
114             =encoding UTF-8
115              
116             =head1 NAME
117              
118             Message::Passing::Output::Search::Elasticsearch - index messages in Elasticsearch
119              
120             =head1 VERSION
121              
122             version 0.004
123              
124             =head1 DESCRIPTION
125              
126             This output is intentionally kept simple to not add dependencies.
127             If you need a special format use a filter like
128             L<Message::Passing::Filter::ToLogstash> before sending messages to this
129             output.
130              
131             =head1 ATTRIBUTES
132              
133             =head2 es_params
134              
135             A hashref of L<Search::Elasticsearch::Async/"CREATING A NEW INSTANCE"> parameters.
136              
137             =head2 es
138              
139             A L<Search::Elasticsearch::Async> instance. Can either be passed directly or
140             gets constructed from L</es_params>.
141              
142             =head2 es_bulk_params
143              
144             A hashref of
145             L<Search::Elasticsearch::Client::5_0::Async::Bulk/"CREATING A NEW INSTANCE">
146             parameters.
147              
148             =head2 es_bulk
149              
150             A L<Search::Elasticsearch::Client::5_0::Async::Bulk> instance. Can either be
151             passed directly or gets constructed from L</es> and L</es_bulk_params> using
152             bulk_helper.
153              
154             =head2 type
155              
156             Can be either set to a fixed string or a coderef that's called for every
157             message to return the type depending on the contents of the message.
158              
159             =head2 index_name
160              
161             Can be either set to a fixed string or a coderef that's called for every
162             message to return the index name depending on the contents of the message.
163              
164             =head1 METHODS
165              
166             =head2 consume ($msg)
167              
168             Consumes a message, queuing it for consumption by Elasticsearch.
169             Assumes that the message is a hashref, skips silently in case it isn't.
170              
171             =head1 SEE ALSO
172              
173             =over
174              
175             =item L<Message::Passing>
176              
177             =back
178              
179             =head1 AUTHOR
180              
181             Alexander Hartmaier <abraxxa@cpan.org>
182              
183             =head1 COPYRIGHT AND LICENSE
184              
185             This software is copyright (c) 2017 by Alexander Hartmaier.
186              
187             This is free software; you can redistribute it and/or modify it under
188             the same terms as the Perl 5 programming language system itself.
189              
190             =cut