File Coverage

blib/lib/POE/Component/Metabase/Relay/Server/Queue.pm
Criterion Covered Total %
statement 107 153 69.9
branch 13 50 26.0
condition 3 18 16.6
subroutine 31 40 77.5
pod 1 2 50.0
total 155 263 58.9


line stmt bran cond sub pod time code
1             package POE::Component::Metabase::Relay::Server::Queue;
2             $POE::Component::Metabase::Relay::Server::Queue::VERSION = '0.36';
3             # ABSTRACT: Submission queue for the metabase relay
4              
5 1     1   4 use strict;
  1         1  
  1         25  
6 1     1   3 use warnings;
  1         1  
  1         21  
7 1     1   3 use POE qw[Component::EasyDBI];
  1         1  
  1         4  
8 1     1   45307 use POE::Component::Client::HTTP;
  1         67163  
  1         28  
9 1     1   6 use POE::Component::Resolver;
  1         2  
  1         40  
10 1     1   459 use POE::Component::Metabase::Client::Submit;
  1         12196  
  1         23  
11 1     1   6 use CPAN::Testers::Report ();
  1         1  
  1         12  
12 1     1   366 use Metabase::User::Profile ();
  1         1785  
  1         19  
13 1     1   381 use Metabase::User::Secret ();
  1         187  
  1         19  
14 1     1   416 use Module::Load::Conditional qw[can_load];
  1         12581  
  1         64  
15 1     1   8 use JSON ();
  1         3  
  1         16  
16 1     1   3 use Params::Util qw[_HASH];
  1         1  
  1         39  
17 1     1   4 use Time::HiRes ();
  1         2  
  1         13  
18 1     1   3 use Data::UUID;
  1         2  
  1         51  
19              
20 1     1   3 use constant DELAY => 150;
  1         1  
  1         64  
21              
22              
23             my $sql = {
24             'create' => 'CREATE TABLE IF NOT EXISTS queue ( id varchar(150), submitted varchar(32), attempts INTEGER, data BLOB )',
25             'insert' => 'INSERT INTO queue values(?,?,?,?)',
26             'delete' => 'DELETE FROM queue where id = ?',
27             'queue' => 'SELECT * FROM queue ORDER BY attempts ASC, submitted ASC limit ', # the limit is appended via "submissions"
28             'update' => 'UPDATE queue SET attempts = ? WHERE id = ?',
29             'addidx' => [
30             'CREATE INDEX IF NOT EXISTS queue_id ON queue ( id )',
31             'CREATE INDEX IF NOT EXISTS queue_att_sub ON queue ( attempts, submitted )',
32             ],
33             };
34              
35 1     1   444 use MooseX::POE;
  1         288444  
  1         4  
36 1     1   141940 use MooseX::Types::URI qw[Uri];
  1         125809  
  1         5  
37              
38             {
39 1     1   1214 use Moose::Util::TypeConstraints;
  1         2  
  1         4  
40              
41             my $ps = subtype as 'Str', where { $poe_kernel->alias_resolve( $_ ) };
42             coerce $ps, from 'Str', via { $poe_kernel->alias_resolve( $_ )->ID };
43              
44             has 'session' => (
45             is => 'ro',
46             isa => $ps,
47             coerce => 1,
48             );
49              
50 1     1   1359 no Moose::Util::TypeConstraints;
  1         2  
  1         4  
51             }
52              
53             has 'event' => (
54             is => 'ro',
55             isa => 'Str',
56             );
57              
58             has 'profile' => (
59             is => 'ro',
60             isa => 'Metabase::User::Profile',
61             required => 1,
62             );
63              
64             has 'secret' => (
65             is => 'ro',
66             isa => 'Metabase::User::Secret',
67             required => 1,
68             );
69              
70             has 'dsn' => (
71             is => 'ro',
72             isa => 'Str',
73             required => 1,
74             );
75              
76             has 'uri' => (
77             is => 'ro',
78             isa => Uri,
79             coerce => 1,
80             required => 1,
81             );
82              
83             has 'username' => (
84             is => 'ro',
85             isa => 'Str',
86             default => '',
87             );
88              
89             has 'password' => (
90             is => 'ro',
91             isa => 'Str',
92             default => '',
93             );
94              
95             has 'debug' => (
96             is => 'rw',
97             isa => 'Bool',
98             default => 0,
99             );
100              
101             has 'multiple' => (
102             is => 'ro',
103             isa => 'Bool',
104             default => 0,
105             writer => '_set_multiple',
106             );
107              
108             has 'db_opts' => (
109             is => 'ro',
110             isa => 'HashRef',
111             default => sub {{}},
112             );
113              
114             has 'no_relay' => (
115             is => 'rw',
116             isa => 'Bool',
117             default => 0,
118             trigger => sub {
119             my( $self, $new, $old ) = @_;
120             return if ! $self->_has_easydbi;
121             $self->yield( '_process_queue' ) if ! $new;
122             },
123             );
124              
125             has 'no_curl' => (
126             is => 'ro',
127             isa => 'Bool',
128             default => 0,
129             );
130              
131             has 'submissions' => (
132             is => 'rw',
133             isa => 'Int',
134             default => 10,
135             );
136              
137             has '_uuid' => (
138             is => 'ro',
139             isa => 'Data::UUID',
140             lazy_build => 1,
141             init_arg => undef,
142             );
143              
144             has '_easydbi' => (
145             is => 'ro',
146             isa => 'POE::Component::EasyDBI',
147             lazy_build => 1,
148             init_arg => undef,
149             );
150              
151             has _http_alias => (
152             is => 'ro',
153             isa => 'Str',
154             init_arg => undef,
155             writer => '_set_http_alias',
156             );
157              
158             has _resolver => (
159             is => 'ro',
160             isa => 'POE::Component::Resolver',
161             init_arg => undef,
162             writer => '_set_resolver',
163             );
164              
165             has '_processing' => (
166             is => 'ro',
167             isa => 'HashRef',
168             default => sub {{}},
169             );
170              
171             sub _build__easydbi {
172 1     1   2 my $self = shift;
173 1 50       33 POE::Component::EasyDBI->new(
174             alias => '',
175             dsn => $self->dsn,
176             username => $self->username,
177             password => $self->password,
178             ( _HASH( $self->db_opts ) ? ( options => $self->db_opts ) : () ),
179             );
180             }
181              
182             sub _build__uuid {
183 0     0   0 Data::UUID->new();
184             }
185              
186             sub spawn {
187 1     1 1 32 shift->new(@_);
188             }
189              
190             sub START {
191 1     1 0 262 my ($kernel,$self) = @_[KERNEL,OBJECT];
192 1         4 $self->_build_table;
193 1         49 $kernel->yield( 'do_vacuum', 'process' );
194 1 50 33     86 if ( !$self->no_curl && can_load( modules => { 'POE::Component::Curl::Multi' => '0.08' } ) ) {
    50          
195 0         0 $self->_set_multiple( 0 );
196 0         0 $self->_set_http_alias( join '-', __PACKAGE__, $self->get_session_id );
197 0         0 POE::Component::Curl::Multi->spawn(
198             Alias => $self->_http_alias,
199             FollowRedirects => 2,
200             );
201             }
202             elsif ( $self->multiple ) {
203 0         0 $self->_set_resolver( BINGOS::POE::Component::Resolver->new() );
204             }
205             else {
206 1         48 $self->_set_http_alias( join '-', __PACKAGE__, $self->get_session_id );
207 1         33 POE::Component::Client::HTTP->spawn(
208             Alias => $self->_http_alias,
209             FollowRedirects => 2,
210             );
211             }
212 1 50       1534 $kernel->yield( '_process_queue' ) if ! $self->no_relay;
213 1         35 return;
214             }
215              
216              
217             sub _build_table {
218 1     1   2 my $self = shift;
219              
220 1         38 $self->_easydbi;
221              
222 1 50       50 if ( $self->dsn =~ /^dbi\:SQLite/i ) {
223 1         40 $self->_easydbi->do(
224             sql => 'PRAGMA synchronous = OFF',
225             event => '_generic_db_result',
226             _ts => $self->_time,
227             );
228             }
229              
230             $self->_easydbi->do(
231             sql => $sql->{create},
232 1         159 event => '_generic_db_result',
233             _ts => $self->_time,
234             );
235              
236 1         57 foreach my $idx ( @{ $sql->{addidx} } ) {
  1         14  
237 2         129 $self->_easydbi->do(
238             sql => $idx,
239             event => '_generic_db_result',
240             _ts => $self->_time,
241             );
242             }
243             }
244              
245             event 'do_vacuum' => sub {
246 1     1   1012 my ($kernel,$self,$process) = @_[KERNEL,OBJECT,ARG0];
247 1 50       40 $self->_easydbi->do(
248             sql => 'VACUUM',
249             event => '_generic_db_result',
250             _ts => $self->_time,
251             ( $process ? ( _process => 1 ) : () ),
252             );
253              
254 1         83 $kernel->delay( 'do_vacuum' => DELAY * 60 );
255 1         84 return;
256             };
257              
258             event 'shutdown' => sub {
259 1     1   395 my ($kernel,$self) = @_[KERNEL,OBJECT];
260 1         5 $kernel->alarm_remove_all();
261 1         102 $kernel->post( $self->_easydbi->ID, 'shutdown' );
262 1         101 $kernel->post(
263             $self->_http_alias,
264             'shutdown',
265             );
266 1 50 33     120 $self->_resolver->_really_shutdown
267             if $self->multiple && $self->_resolver;
268 1         4 return;
269             };
270              
271             event '_generic_db_result' => sub {
272 5     5   1163382 my ($kernel,$self,$result) = @_[KERNEL,OBJECT,ARG0];
273 5 50       16 if ( $result->{error} ) {
274 0 0       0 warn "DB error (" . ( $self->_time - $result->{_ts} ) . "s): " . JSON->new->pretty(1)->encode( $result ) . "\n" if $self->debug;
275             }
276 5 100       16 $kernel->yield( '_process_queue' ) if $result->{_process};
277 5         61 return;
278             };
279              
280             event 'submit' => sub {
281 0     0   0 my ($kernel,$self,$fact) = @_[KERNEL,OBJECT,ARG0];
282 0 0 0     0 return unless $fact and $fact->isa('Metabase::Fact');
283 0         0 my $timestamp = $self->_time;
284 0         0 $kernel->yield( '_dispatch_event', 'enqueued', $fact );
285             $self->_easydbi->do(
286             sql => $sql->{insert},
287 0 0       0 event => '_generic_db_result',
288             placeholders => [ $self->_uuid->create_b64(), $timestamp, 0, $self->_encode_fact( $fact ) ],
289             ( $self->no_relay ? () : ( _process => 1 ) ),
290             _ts => $timestamp,
291             );
292 0         0 return;
293             };
294              
295             event '_process_queue' => sub {
296 2     2   303 my ($kernel,$self) = @_[KERNEL,OBJECT];
297 2 50       93 return if $self->no_relay;
298             # Processing event
299 2         19 $kernel->delay( '_process_queue', DELAY );
300             $self->_easydbi->arrayhash(
301 2 50       206 sql => $sql->{queue} . ( $self->multiple ? $self->submissions : 1 ),
302             event => '_queue_db_result',
303             _ts => $self->_time,
304             );
305 2         143 return;
306             };
307              
308             event '_queue_db_result' => sub {
309 2     2   1854 my ($kernel,$self,$result) = @_[KERNEL,OBJECT,ARG0];
310 2 50 33     13 if ( $result->{error} and $self->debug ) {
311 0         0 warn $result->{error}, "\n";
312 0         0 return;
313             }
314 2         3 foreach my $row ( @{ $result->{result} } ) {
  2         7  
315             # Have we seen this report before?
316 0 0       0 if ( exists $self->_processing->{ $row->{id} } ) {
317 0         0 next;
318             } else {
319 0         0 $self->_processing->{ $row->{id} }++;
320             }
321              
322             # Submit event ?
323 0         0 my $report = $self->_decode_fact( $row->{data} );
324             POE::Component::Metabase::Client::Submit->submit(
325             event => '_submit_status',
326             profile => $self->profile,
327             secret => $self->secret,
328             fact => $report,
329             uri => $self->uri->as_string,
330 0 0       0 context => [ $row->{id}, $row->{attempts}, $self->_time ],
331             ( $self->multiple ? ( resolver => $self->_resolver ) : ( http_alias => $self->_http_alias ) ),
332             );
333              
334             }
335 2         5 return;
336             };
337              
338             event '_dispatch_event' => sub {
339 0     0   0 my ($kernel,$self,@args) = @_[KERNEL,OBJECT,ARG0..$#_];
340 0 0 0     0 return unless $self->session and $self->event;
341 0         0 $kernel->post( $self->session, $self->event, @args );
342 0         0 return;
343             };
344              
345             event '_clear_processing' => sub {
346 0     0   0 my($kernel,$self,$id) = @_[KERNEL,OBJECT,ARG0];
347             delete $self->_processing->{ $id } if exists
348 0 0       0 $self->_processing->{ $id };
349              
350 0         0 return;
351             };
352              
353             event '_submit_status' => sub {
354 0     0   0 my ($kernel,$self,$res) = @_[KERNEL,OBJECT,ARG0];
355 0         0 my ($id,$attempts,$starttime) = @{ $res->{context} };
  0         0  
356 0         0 my $timestamp = $self->_time;
357 0         0 $kernel->delay_set( '_clear_processing' => DELAY, $id );
358 0 0       0 if ( $res->{success} ) {
359             # Success event
360 0 0       0 warn "Submit '$id' (" . ( $timestamp - $starttime ) . "s) success\n" if $self->debug;
361             $self->_easydbi->do(
362             sql => $sql->{delete},
363 0 0       0 event => '_generic_db_result',
364             placeholders => [ $id ],
365             ( $self->no_relay ? () : ( _process => 1 ) ),
366             _ts => $timestamp,
367             );
368             }
369             else {
370 0 0       0 warn "Submit '$id' (" . ( $timestamp - $starttime ) . "s) error: $res->{error}\n" . ( defined $res->{content} ? "$res->{content}\n" : '' ) if $self->debug;
    0          
371 0 0 0     0 if ( defined $res->{content} and $res->{content} =~ /GUID conflicts with an existing object/i ) {
372             # Duplicate event
373             $self->_easydbi->do(
374             sql => $sql->{delete},
375 0         0 event => '_generic_db_result',
376             placeholders => [ $id ],
377             _ts => $timestamp,
378             );
379             }
380             else {
381             # Re-enqueue event
382 0         0 $attempts++;
383             $self->_easydbi->do(
384             sql => $sql->{update},
385 0         0 event => '_generic_db_result',
386             placeholders => [ $attempts, $id ],
387             _ts => $timestamp,
388             );
389             }
390             }
391 0         0 return;
392             };
393              
394             sub _time {
395 7     7   97 return Time::HiRes::time;
396             }
397              
398             sub _encode_fact {
399 0     0     my $self = shift;
400 0           return JSON->new->encode( shift->as_struct );
401             }
402              
403             sub _decode_fact {
404 0     0     my $self = shift;
405 0           return CPAN::Testers::Report->from_struct( JSON->new->decode( shift ) );
406             }
407              
408 1     1   1595 no MooseX::POE;
  1         2  
  1         7  
409              
410             __PACKAGE__->meta->make_immutable;
411              
412             package
413             BINGOS::POE::Component::Resolver;
414              
415 1     1   184 use base qw[POE::Component::Resolver];
  1         1  
  1         171  
416              
417             #noop
418             sub shutdown {
419 0     0     return;
420             }
421              
422             sub _really_shutdown {
423 0     0     shift->SUPER::shutdown;
424             }
425              
426             1;
427              
428             __END__
429              
430             =pod
431              
432             =encoding UTF-8
433              
434             =head1 NAME
435              
436             POE::Component::Metabase::Relay::Server::Queue - Submission queue for the metabase relay
437              
438             =head1 VERSION
439              
440             version 0.36
441              
442             =head1 DESCRIPTION
443              
444             POE::Component::Metabase::Relay::Server::Queue is the submission queue for L<POE::Component::Metabase::Relay::Server>.
445              
446             It is based on L<POE::Component::EasyDBI> database and uses L<POE::Component::Metabase::Client::Submit> to send
447             reports to a L<Metabase> server.
448              
449             =for Pod::Coverage DELAY
450              
451             =for Pod::Coverage START
452              
453             =head1 CONSTRUCTOR
454              
455             =over
456              
457             =item C<spawn>
458              
459             Spawns a new component session and creates a SQLite database if it doesn't already exist.
460              
461             Takes a number of mandatory parameters:
462              
463             'dsn', a DBI DSN to use to store the submission queue;
464             'profile', a Metabase::User::Profile object;
465             'secret', a Metabase::User::Secret object;
466             'uri', the uri of metabase server to submit to;
467              
468             and a number of optional parameters:
469              
470             'username', a DSN username if required;
471             'password', a DSN password if required;
472             'db_opts', a hashref of DBD options that is passed to POE::Component::EasyDBI;
473             'debug', enable debugging information;
474             'multiple', set to true to enable the Queue to use multiple PoCo-Client-HTTPs, default 0;
475             'no_relay', set to true to disable report submissions to the Metabase, default 0;
476             'no_curl', set to true to disable automatic usage of POE::Component::Curl::Multi, default 0;
477             'submissions', an int to control the number of parallel http clients ( used only if multiple == 1 ), default 10;
478              
479             =back
480              
481             =head1 INPUT EVENTS
482              
483             =over
484              
485             =item C<submit>
486              
487             Takes one parameter a L<Metabase::Fact> to submit.
488              
489             =item C<shutdown>
490              
491             Terminates the component.
492              
493             =back
494              
495             =head1 SEE ALSO
496              
497             L<Metabase>
498              
499             L<Metabase::User::Profile>
500              
501             L<Metabase::User::Secret>
502              
503             L<POE::Component::Metabase::Client::Submit>
504              
505             L<POE::Component::Metabase::Relay::Server>
506              
507             L<POE::Component::EasyDBI>
508              
509             =head1 AUTHOR
510              
511             Chris Williams <chris@bingosnet.co.uk>
512              
513             =head1 COPYRIGHT AND LICENSE
514              
515             This software is copyright (c) 2016 by Chris Williams.
516              
517             This is free software; you can redistribute it and/or modify it under
518             the same terms as the Perl 5 programming language system itself.
519              
520             =cut