File Coverage

blib/lib/POE/Component/RemoteTail.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package POE::Component::RemoteTail;
2              
3 2     2   457 use strict;
  2         3  
  2         56  
4 2     2   6 use warnings;
  2         2  
  2         44  
5 2     2   1420 use POE;
  0            
  0            
6             use POE::Component::RemoteTail::Job;
7             use POE::Component::IKC::Server;
8             use POE::Wheel::Run;
9             use POE::Filter::Reference;
10             use Class::Inspector;
11             use Data::Dumper;
12             use constant DEBUG => 0;
13             use UNIVERSAL::require;
14              
15             our $VERSION = '0.00001_00';
16              
17             *debug = DEBUG
18             ? sub {
19             my $mess = shift;
20             print STDERR $mess, "\n";
21             }
22             : sub { };
23              
24             sub spawn {
25             my $class = shift;
26             my $self = $class->new(@_);
27              
28             $self->{alias} ||= "tailer";
29             $self->{port} ||= 9999;
30             $self->{name} ||= "RemoteTail";
31              
32             POE::Component::IKC::Server->spawn(
33             port => $self->{port},
34             name => $self->{name},
35             );
36              
37             POE::Session->create(
38             object_states => [ $self => Class::Inspector->methods($class) ], );
39              
40             }
41              
42             sub new {
43             my $class = shift;
44             my %args = @_;
45              
46             return bless {%args}, $class;
47             }
48              
49             sub job {
50             my $self = shift;
51             my %args = @_;
52              
53             my $job = POE::Component::RemoteTail::Job->new(%args);
54             return $job;
55             }
56              
57             sub execute {
58             my ( $self, $kernel, $session, $heap, $arg ) =
59             @_[ OBJECT, KERNEL, SESSION, HEAP, ARG0 ];
60              
61             $heap->{postback} = $arg->{postback};
62             my $job = $arg->{job};
63             $job->{port} = $self->{port};
64              
65             $kernel->post( $session, "_spawn_child" => $job );
66             }
67              
68             sub stop_tail {
69             my ( $self, $kernel, $session, $heap, $job ) =
70             @_[ OBJECT, KERNEL, SESSION, HEAP, ARG0 ];
71              
72             debug("STOP:$job->{id}");
73             my $task = $heap->{task}->{ $job->{id} };
74             $task->kill;
75             delete $heap->{task}->{ $job->{id} };
76             undef $job;
77             }
78              
79             sub _start {
80             my ( $self, $kernel ) = @_[ OBJECT, KERNEL ];
81              
82             $kernel->alias_set( $self->{alias} );
83             $kernel->call( IKC => publish => $self->{alias}, ["_ikc_logger"] );
84             }
85              
86             sub _ikc_logger {
87             my ( $self, $kernel, $heap, $request ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
88              
89             my ( $data, $rsvp ) = @$request;
90             my ( $host, $log ) = @$data;
91              
92             $heap->{postback}->( $host, $log );
93             $kernel->call( IKC => post => $rsvp, 1 );
94             }
95              
96             sub _spawn_child {
97             my ( $self, $kernel, $session, $heap, $job, $sender ) =
98             @_[ OBJECT, KERNEL, SESSION, HEAP, ARG0, SENDER ];
99              
100             # prepare ...
101             my $class = $job->{process_class};
102             $class->require or die(@!);
103             $class->new();
104              
105             my %program = ( Program => sub { $class->process_entry($job) }, );
106             $SIG{CHLD} = "IGNORE";
107              
108             # run wheel
109             my $task = POE::Wheel::Run->new(
110             %program,
111             StdioFilter => POE::Filter::Line->new(),
112             StdoutEvent => "_got_child_stdout",
113             StderrEvent => "_got_child_stderr",
114             CloseEvent => "_got_child_close",
115             );
116              
117             $heap->{task}->{ $task->ID } = $task;
118             $job->{id} = $task->ID;
119             }
120              
121             sub _got_child_stdout {
122             my $stdout = $_[ARG0];
123             debug("STDOUT:$stdout");
124             }
125              
126             sub _got_child_stderr {
127             my $stderr = $_[ARG0];
128             debug("STDERR:$stderr");
129             }
130              
131             sub _got_child_close {
132             my ( $heap, $task_id ) = @_[ HEAP, ARG0 ];
133             delete $heap->{task}->{$task_id};
134             debug("CLOSE:$task_id");
135             }
136              
137             1;
138              
139             __END__