File Coverage

blib/lib/POE/Component/Client/MogileFS.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package POE::Component::Client::MogileFS;
2              
3 1     1   23463 use warnings;
  1         2  
  1         31  
4 1     1   6 use strict;
  1         2  
  1         39  
5              
6 1     1   6 use Carp qw(carp croak);
  1         7  
  1         71  
7 1     1   377 use POE qw(Wheel::Run Filter::Reference);
  0            
  0            
8             use MogileFS::Client;
9              
10             sub spawn {
11             my ($class, %args) = @_;
12             %args = (
13             max_concurrent => 10,
14             alias => __PACKAGE__,
15             todo => [],
16             %args
17             );
18             my $self = bless {}, $class;
19             $self->{session_id} = POE::Session->create(
20             object_states => [
21             $self => {
22             _start => '_start',
23             _stop => '_stop',
24             add_tasks => 'handle_add_task',
25             shutdown => 'shutdown',
26             next_task => 'next_task',
27             task_result => 'handle_task_result',
28             task_done => 'handle_task_done',
29             task_debug => 'handle_task_debug',
30             killme => 'handle_killme',
31             },
32             ],
33             heap => { args => \%args,
34             todo => $args{todo},
35             },
36             )->ID;
37              
38             return $self;
39             }
40              
41             sub session_id {
42             return $_[0]->{session_id};
43             }
44              
45             sub shutdown {
46             my ($kernel, $heap) = @_[KERNEL, HEAP];
47             if (@{$heap->{todo}}) {
48             $kernel->delay('shutdown', 1);
49             return;
50             }
51             else {
52             $kernel->yield('_stop');
53             return;
54             }
55             if (keys %{$heap->{task}}) {
56             $kernel->delay('shutdown', 1);
57             return;
58             }
59             else {
60             $kernel->yield('_stop');
61             return;
62             }
63             }
64              
65             sub handle_task_result {
66             my ($kernel,$heap,$result) = @_[KERNEL,HEAP,ARG0];
67             return unless defined $heap->{args}->{result};
68             if (ref($heap->{args}->{result}) eq 'ARRAY') {
69             $kernel->post($heap->{args}->{result}->[0],
70             $heap->{args}->{result}->[1], $result);
71             }
72             }
73              
74             sub handle_task_debug {
75             my ($kernel,$heap,$result) = @_[KERNEL,HEAP,ARG0];
76             return unless defined $heap->{args}->{debug};
77             if (ref($heap->{args}->{debug}) eq 'ARRAY') {
78             $kernel->post($heap->{args}->{debug}->[0],
79             $heap->{args}->{debug}->[1], $result);
80             }
81             }
82              
83             sub _start {
84             my ($kernel, $heap) = @_[KERNEL, HEAP];
85             $kernel->alias_set($heap->{args}->{alias});
86             $kernel->sig(CHLD => 'killme');
87             $kernel->yield('next_task');
88             }
89              
90             sub next_task {
91             my ($kernel, $heap) = @_[KERNEL, HEAP];
92             my $max_con = $heap->{args}->{max_concurrent};
93             while ( scalar keys %{$heap->{task}} < $max_con ) {
94             my $next_task = shift @{$heap->{todo}};
95             if (defined $next_task) {
96             my $filter = POE::Filter::Reference->new('Storable');
97             my $task = POE::Wheel::Run->new(
98             Program => sub { do_stuff($next_task) },
99             StdoutFilter => $filter,
100             StdoutEvent => 'task_result',
101             StderrEvent => 'task_debug',
102             CloseEvent => 'task_done',
103             CloseOnCall => 1,
104             );
105             $heap->{task}->{ $task->ID } = $task;
106             }
107             else {
108             delete $heap->{todo};
109             $kernel->delay('next_task', 1);
110             last;
111             }
112             }
113             }
114              
115             sub _stop {
116             my ($kernel, $heap) = @_[KERNEL, HEAP];
117             $kernel->alias_remove($heap->{args}->{alias});
118             delete $heap->{args};
119             delete $heap->{todo};
120             delete $heap->{task};
121             }
122              
123             sub handle_add_task {
124             my ($kernel, $heap, $tasks) = @_[KERNEL, HEAP, ARG0];
125             push @{$heap->{todo}}, @{$tasks};
126             $kernel->yield('next_task');
127             }
128              
129             sub handle_task_done {
130             my ( $kernel, $heap, $task_id ) = @_[ KERNEL, HEAP, ARG0 ];
131             $heap->{task}->{$task_id}->kill(9);
132             delete $heap->{task}->{$task_id};
133             unless (scalar keys %{$heap->{task}}) {
134             delete $heap->{task};
135             }
136             $kernel->yield("next_task");
137             }
138              
139             sub handle_killme {
140             my ($kernel, $pid, $child_error) = @_[KERNEL, ARG1, ARG2];
141             #we could do something here or something
142             $kernel->sig_handled;
143             }
144              
145             #not a POE function
146             #it's run from separate process
147             #can only run single MogileFS::Client methods that are run on the
148             #$mogc object
149             #so no printing to a file handle, use store_content instead
150             #
151             sub do_stuff {
152             binmode(STDOUT); # Required for this to work on MSWin32
153             my $task = shift;
154             my $filter = POE::Filter::Reference->new('Storable');
155             croak 'no domain in todo' unless $task->{domain};
156             croak 'no trackers in todo' unless @{$task->{trackers}};
157             croak 'no MogileFS::Client method in todo' unless $task->{method};
158             croak 'no taskname in todo' unless $task->{taskname};
159             my ($success, $mogc, $mogmethod);
160             carp "$@" unless eval {
161             $mogc = MogileFS::Client->new(
162             domain => $task->{domain},
163             hosts => $task->{trackers},
164             )
165             };
166             carp 'no MogileFS::Client object' unless defined $mogc;
167             $mogmethod = $task->{method};
168             $success = eval {
169             $mogc->$mogmethod(@{$task->{args}});
170             };
171             carp "$@" unless $success;
172             my %result = (
173             status => $success,
174             task => $task,
175             );
176             my $output = $filter->put( [ \%result ] );
177             print @$output;
178             }
179              
180             =head1 NAME
181              
182             POE::Component::Client::MogileFS - an async MogileFS client for POE
183              
184             =head1 VERSION
185              
186             Version 0.02
187              
188             =cut
189              
190             our $VERSION = '0.02';
191              
192             =head1 SYNOPSIS
193              
194             use POE qw(Component::Client::MogileFS);
195              
196             my $num = 500;
197             my @tasks;
198             foreach (1..$num) {
199             my $key = $_ .'x'. int(rand($num)) . time();
200             my $data = $key x 1000;
201             push @tasks, {
202             method => 'store_content',
203             domain => 'testdomain',
204             trackers => [qw/192.168.0.31:6001/],
205             args => [$key, 'testclass', $data],
206             taskname => $key.':testclass:testdomain',
207             };
208             }
209              
210             POE::Session->create(
211             inline_states => {
212             _start => \&start_session,
213             storesomestuff => \&storesomestuff,
214             debugging => \&debugging,
215             }
216             );
217              
218             sub start_session {
219             my ($kernel, $heap, $session) = @_[KERNEL, HEAP, SESSION];
220             POE::Component::Client::MogileFS->spawn(
221             alias => 'mog',
222             max_concurrent => 50,
223             result => [$session, 'storesomestuff'],
224             debug => [$session, 'debugging'],
225             );
226             $kernel->post('mog', 'add_tasks', \@tasks);
227             }
228              
229             my $count = 0;
230             sub storesomestuff {
231             my ($kernel,$result) = @_[KERNEL,ARG0];
232             if ($result->{status}) {
233             $count++;
234             print "$count RESULT ".$result->{task}->{taskname};
235             print " SUCCESS ".$result->{status}."\n";
236             }
237             else {
238             print "$count RESULT ".$result->{task}->{taskname};
239             print " FAILED\n";
240             $kernel->post('mog', 'add_tasks', [$result->{task}]);
241             }
242             $kernel->post('mog', 'shutdown') if $count == $num;
243             }
244              
245             sub debugging {
246             my $result = $_[ARG0];
247             print "DEBUG $result\n";
248             }
249              
250             $poe_kernel->run();
251              
252             =head1 DESCRIPTION
253              
254             POE::Component::Client::MogileFS is a POE component that uses Wheel::Run
255             to fork off child processes which will execute MogileFS::Client methods
256             with your provided data asyncronously. By default it will not allow more
257             than 10 concurrent connections, but you can adjust that as needed.
258              
259             This is my first go at a POE::Component so the api may change in future,
260             and I'm really open to suggestions for improvement/features.
261              
262             =head1 FUNCTIONS
263              
264             =head2 spawn
265              
266             Can take the following arguments:
267              
268             alias => 'alias of mogilefs session or __PACKAGE__ by default',
269             max_concurrent => 'max number of concurrent children - default 10',
270             result => ['session alias or id', 'eventname'],
271             debug => ['session alias or id', 'eventname'],
272             todo => ['list of hashes of jobs todo']
273              
274             =head2 session_id
275              
276             Returns the session id
277              
278             =head2 add_tasks
279              
280             $kernel->post('session', 'add_tasks', \@tasks);
281              
282             Takes an arraref of hashes, each hash represents one MogileFS::Client method
283             to call and should have the following keys:
284              
285             {method => 'MogileFS::Client method name',
286             domain => 'domain to use',
287             trackers => [array ref of trackers],
288             args => [arrayref, of, args, for, method],
289             taskname => 'name of this task',
290             }
291              
292             =head2 shutdown
293              
294             $kernel->post('session', 'shutdown');
295              
296             Kills the MogileFS session and cleans up.
297              
298             =head2 result
299              
300             If result is set in spawn, then your event will get a hashref in ARG0.
301              
302             $_[ARG0]->{status} is whatever is returned from the MogileFS method you
303             called, typically undef means it failed.
304              
305             $_[ARG0]->{task} is the task you originally gave add_task.
306              
307             This should allow you to retry if something fails and it's appropriate to
308             do so (the synopsis contains an example of doing so, using store_content).
309              
310             =head2 debug
311              
312             Returns back all the warnings and errors MogileFS::Client will spew in
313             ARG0. Mostly just useful for debugging.
314              
315             =head1 STUFF
316              
317             This module is kinda simplistic in what MogileFS::Client methods you can
318             use. Essentially all it does is create a new MogileFS::Client object in
319             each child, call the method you provide on that object with the arguments
320             you specified and return the result. Obviously this won't work for any
321             methods that want more than one operation on the same object. For example
322             my $fh = $mogc->newfile( ... ); print $fh 'foobar'; close $fh; isn't going
323             to work. Instead use store_content;
324              
325             The session won't go away until you shutdown. This is to allow you to do
326             things like this:
327              
328             $kernel->post('session', 'add_tasks', \@tasks);
329             $kernel->post('session', 'add_tasks', \@moretasks);
330              
331             and to re-add your tasks from result if they failed.
332              
333             =head1 AUTHOR
334              
335             mock, C<< >>
336              
337             =head1 BUGS
338              
339             Please report any bugs or feature requests to
340             C, or through the web interface at
341             L.
342             I will be notified, and then you'll automatically be notified of progress on
343             your bug as I make changes.
344              
345             =head1 SUPPORT
346              
347             You can find documentation for this module with the perldoc command.
348              
349             perldoc POE::Component::Client::MogileFS
350              
351             You can also look for information at:
352              
353             =over 4
354              
355             =item * AnnoCPAN: Annotated CPAN documentation
356              
357             L
358              
359             =item * CPAN Ratings
360              
361             L
362              
363             =item * RT: CPAN's request tracker
364              
365             L
366              
367             =item * Search CPAN
368              
369             L
370              
371             =back
372              
373             =head1 COPYRIGHT & LICENSE
374              
375             Copyright 2007 RJ Research, Inc, all rights reserved.
376              
377             This program is free software; you can redistribute it and/or modify it
378             under the same terms as Perl itself.
379              
380             =cut
381              
382             1; # End of POE::Component::Client::MogileFS