File Coverage

blib/lib/MultiThread.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             ########################################
2             #
3             # Author: David Spadea
4             # Web: http://www.spadea.net
5             #
6             # This code is release under the same terms
7             # as the PERL interpreter.
8             #
9             ########################################
10              
11             package MultiThread;
12              
13             our $VERSION = '0.9';
14              
15             package MultiThread::Base;
16              
17             require 5.008;
18              
19 1     1   36680 use strict;
  1         3  
  1         40  
20 1     1   6 use warnings;
  1         2  
  1         29  
21              
22 1     1   5369 use threads;
  0            
  0            
23             use threads::shared;
24             use Thread::Queue;
25             use Data::Dumper;
26             use Sys::CPU;
27              
28             use Storable qw(freeze thaw);
29              
30             sub new
31             {
32              
33             my $class = shift;
34              
35             my $self = {};
36            
37             share($$self{ProcessingCount});
38              
39             $$self{ProcessingCount} = 0;
40             share($$self{Shutdown});
41             share($$self{Responses});
42              
43             $$self{Threads} = [];
44              
45             $self = bless($self, $class);
46              
47             return $self;
48              
49             }
50              
51             sub _request_queue
52             {
53             my $self = shift;
54             return $$self{Requests};
55             }
56              
57              
58             sub _response_queue
59             {
60             my $self = shift;
61             return $$self{Responses};
62             }
63              
64             sub shutdown
65             {
66             my $self = shift;
67             $$self{Shutdown} = 1;
68              
69             foreach my $thread (@{ $$self{Threads} } )
70             {
71             $thread->join if ($thread->tid);
72             }
73              
74             return 1;
75             }
76              
77             sub worker
78             {
79             my $self = shift;
80             my $workersub = shift;
81             my $inputq = shift;
82             my $outputq = shift;
83              
84             while(1)
85             {
86             my $ticket = $inputq->dequeue_nb;
87             if (! $ticket)
88             {
89             # Only shut down if all work has been processed (no requests).
90             if ($$self{Shutdown})
91             {
92             return 0;
93             }
94              
95             sleep 1;
96             }
97             else
98             {
99              
100             $ticket = thaw($ticket);
101            
102             #printf ("%s thr %d request: %s",
103             # ref($self), threads->tid, Dumper($$ticket{Request}) );
104              
105             my @resp = eval { $workersub->( @{ $$ticket{Request} }) };
106              
107             my $exception = $@ if $@;
108              
109             $$ticket{Response} = \@resp;
110             $$ticket{Request} = \@resp; # in case we're sending this downstream
111             $$ticket{Exception} = $exception;
112              
113             my $resp = freeze( $ticket );
114              
115             $outputq->enqueue( $resp );
116              
117             $exception = undef;
118             $resp = undef;
119             }
120             }
121             }
122              
123             sub pending_responses
124             {
125             my $self = shift;
126              
127             return ( $$self{ProcessingCount} + $$self{Responses}->pending ) > 0 ? 1 : 0;
128             }
129              
130             sub send_request
131             {
132             my $self = shift;
133             my @request = @_;
134              
135             $$self{TicketNumber}++; # no need to lock. Only modified in main thread.
136              
137             # OriginalRequest is set here and never modified. It should be sent back to the caller in the response queue.
138             my $reqticket = { TicketNumber => $$self{TicketNumber}, Request => \@request, OriginalRequest => \@request };
139              
140             $$self{Requests}->enqueue(freeze($reqticket));
141             $$self{ProcessingCount}++;
142              
143             return $$self{TicketNumber};
144             }
145              
146             sub get_response
147             {
148             my $self = shift;
149              
150             my %opts = @_;
151              
152             my $resp;
153              
154             if ($opts{NoWait})
155             {
156             $resp = thaw ( $$self{Responses}->dequeue_nb );
157             }
158             else
159             {
160             $resp = thaw ( $$self{Responses}->dequeue );
161             }
162              
163             delete $$resp{Request}; # Was probably modified. Remove to eliminate confusion.
164              
165             $$self{ProcessingCount}-- if $resp;
166             return $resp;
167             }
168              
169              
170             package MultiThread::Pipeline;
171              
172             =head1 MultiThread::Pipeline
173              
174             use MultiThread;
175            
176             my $pool = MultiThread::WorkerPool->new( EntryPoint => \&add_one );
177             my $pipeline = MultiThread::Pipeline->new( Pipeline => [ $pool, \&add_two ] );
178              
179             # Push 10 requests into the queue for processing.
180             # Worker processing will begin immediately.
181             map {
182             $ticketnum = $pipeline->send_request( $_ );
183             } ( 1..10 );
184              
185             # Gather responses back from the response queue. They may
186             # not be in the original order. Use the TicketNumber or OriginalRequest
187             # attributes of the ticket to identify the work unit. TicketNumber will
188             # correspond to the ticket number returned by $workpool->send_request().
189             #
190             # DO NOT count on TicketNumber being an integer. It may be necessary
191             # to use alphanumeric at some point to avoid numeric overflows for large
192             # workloads or long-running processes. Simply compare TicketNumbers
193             # as strings, and you'll be safe.
194              
195             while ( $pipeline->pending_responses )
196             {
197             # get_response has a NoWait => 1 option for non-blocking reads
198             # if you'd rather write a polling loop instead.
199            
200             my $ticket = $pipeline->get_response; # or get_response( NoWait => 1)
201             printf "Answer was %s\n", $$ticket{Response}->[0];
202             }
203              
204             $pipeline->shutdown;
205              
206             sub add_one {
207             my $input = shift;
208             return $input + 1;
209             }
210              
211             sub add_two {
212             my $input = shift;
213             return $input + 2;
214             }
215              
216             =cut
217              
218             =head1 PURPOSE
219              
220             This module implements a Pipeline multithreading model. Several concurrent
221             threads are started -- one for each subroutine in the pipeline. The subs and
222             other MultiThread objects are daisy-chained together by queues. The output queue
223             of one step in the pipeline is the input queue of the following step.
224              
225             In the contrived example above, add_one is run by a WorkerPool object, and the
226             WorkerPool object is placed first in the pipeline. It takes the request
227             and adds one to it, returning the result. The result of add_one is fed as a request
228             directly into add_two, which adds two and returns the result. Because add_two is the final
229             step in the chain, its output will be returned to the user via the get_response method.
230              
231             MultiThread::Pipeline is great when you have multiple steps that take different times to complete.
232             MultiThread::Pipeline handles the inter-step queuing for you, so you don't need to worry about
233             what happens when one step outruns another. Each step simply processes asynchronously
234             as quickly as it can.
235              
236             One major consideration with MultiThread::Pipeline versus MultiThread::WorkerPool is that
237             MultiThread::Pipeline starts one thread for every sub in the pipeline, without regard for the
238             number of CPUs on the system.
239              
240             =cut
241              
242              
243             =head1 METHODS
244              
245             =cut
246              
247             require 5.008;
248              
249             use strict;
250             use warnings;
251              
252             use base qw( MultiThread::Base );
253             use Thread::Queue;
254             use Data::Dumper;
255              
256             use Storable qw(freeze thaw);
257              
258             =head2 new
259              
260             Create a new MultiThread::Pipeline object.
261              
262             MultiThread::Pipeline->new( %opts);
263              
264             =head3 Pipeline
265              
266             This required parameter takes an arrayref of coderefs or other MultiThread objects
267             which represent the pipeline.
268              
269             A single thread will be started for each coderef, and they will be daisychained together
270             in the order given in the array. The first sub will consume the original request,
271             and the last sub in the chain will return its results to the caller.
272              
273             You can also mix in other pre-instantiated MultiThread objects, and they will
274             function as expected. In the synopsis example, the first step in the Pipeline is a
275             WorkerPool, the results of which are fed into the &add_two sub. You can theoretically
276             use as many MultiThread objects as you want in a Pipeline and they should all play nice
277             together.
278              
279             =cut
280              
281              
282             sub new
283             {
284              
285             my $class = shift;
286             my %opts = @_;
287              
288             unless ( $opts{Pipeline} )
289             {
290             print "You must supply a arrayref of coderefs using the Pipeline parameter!\n";
291             return undef;
292             }
293              
294             my $self = $class->SUPER::new;
295             $$self{Pipeline} = $opts{Pipeline};
296            
297             my %defaults = (
298             );
299              
300             map {
301             $opts{$_} = $defaults{$_} unless defined $opts{$_};
302             } keys %defaults;
303            
304             # Allow MultiThread objects to lead the PipeLine. This should work
305             # whether the object is a WorkerPool or another PipeLine.
306            
307             #printf ("First Pipeline ref is a %s\n", ref ($$self{Pipeline}[0]));
308            
309             if (ref ($$self{Pipeline}[0]) =~ /^MultiThread/ )
310             {
311             $$self{Requests} = $$self{Pipeline}[0]->_request_queue;
312             }
313             elsif ( ref($$self{Pipeline}[0]) eq 'CODE' )
314             {
315             $$self{Requests} = Thread::Queue->new();
316             }
317            
318              
319              
320             $self = bless($self, $class);
321              
322             $self->start_pipeline($$self{Pipeline});
323              
324             return $self;
325              
326             }
327              
328             # Bridge the output and input queues of two back-to-back MultiThread::* objects.
329             # Because the objects are already constructed, we can't dictate the queues they
330             # use internally. This sub will run in its own thread and will be inserted
331             # automatically into the pipeline wherever two MultiThread objects are
332             # back to back.
333             sub _bridge_queues
334             {
335             #print "Bridging values: " . Dumper(\@_);
336             return (@_);
337             }
338              
339             sub start_pipeline
340             {
341             my $self = shift;
342             my $entrypoints = shift;
343              
344             my ($inputq, $outputq);
345             my (@newentries);
346              
347             my $nextworker = 0;
348             foreach my $step ( @{$entrypoints} )
349             {
350             $nextworker++;
351             push @newentries, $step;
352            
353             if ( ref($step) =~ /^MultiThread/ and ref( $entrypoints->[$nextworker] ) =~ /^MultiThread/ )
354             {
355             #print "Detected back-to-back MultiThread objects. Inserting bridge...\n";
356             push @newentries, \&_bridge_queues;
357             }
358             }
359            
360             #print "New pipeline: " . Dumper(\@newentries);
361            
362             $entrypoints = \@newentries;
363              
364             $inputq = $$self{Requests};
365              
366             $nextworker = 0;
367             foreach my $worker (@{$entrypoints})
368             {
369             $nextworker++;
370             #printf "Worker is a %s\n", ref($worker);
371             if (ref($worker) eq 'CODE')
372             {
373             # We need to look ahead in the pipeline to see if the next
374             # object to be chained in has an existing input queue. If so,
375             # we need to use that as our current-item outputq.
376            
377             if ( defined $entrypoints->[$nextworker] and ref( $entrypoints->[$nextworker] ) =~ /^MultiThread/ )
378             {
379             $outputq = $entrypoints->[$nextworker]->_request_queue;
380             #print "Got outputq $outputq\n";
381             }
382             else
383             {
384             $outputq = Thread::Queue->new;
385             }
386            
387             my $t = threads->create(\&MultiThread::Base::worker, $self, $worker, $inputq, $outputq);
388             push (@{ $$self{Threads} }, $t) if ($t);
389             }
390             elsif ( ref($worker) =~ '^MultiThread' )
391             {
392             # Next worker will NEVER be a MultiThread object
393             # because we re-wrote the pipeline to break up MT objects
394             # with a _bridge_queues CODEREF.
395            
396             $outputq = $worker->_response_queue;
397             }
398              
399             $inputq = $outputq;
400             }
401              
402             $$self{Responses} = $outputq;
403              
404             return 1;
405             }
406              
407             # In a Pipeline situation where the Pipeline may contain other MultiThread::*
408             # objects, we need to shut them down first before shutting down the parent
409             # structures. Otherwise we'll be leaking threads like crazy.
410              
411             sub shutdown
412             {
413             my $self = shift;
414            
415             foreach my $worker (@{ $self->{Pipeline} } )
416             {
417             if ( ref($worker) =~ /^MultiThread/ )
418             {
419             $worker->shutdown;
420             }
421             }
422            
423             return $self->SUPER::shutdown;
424             }
425              
426              
427              
428             package MultiThread::WorkerPool;
429              
430             =head1 MultiThread::WorkerPool
431              
432             use MultiThread;
433              
434             my $workerpool = MultiThread::WorkerPool->new( EntryPoint => \&add_one );
435              
436             # Push 10 requests into the queue for processing.
437             # Worker processing will begin immediately.
438             map {
439             $ticketnum = $workerpool->send_request( $_ );
440             } ( 1..10 );
441              
442             # Gather responses back from the response queue. They may
443             # not be in the original order. Use the TicketNumber or OriginalRequest
444             # attributes of the ticket to identify the work unit. TicketNumber will
445             # correspond to the ticket number returned by $workpool->send_request().
446             #
447             # DO NOT count on TicketNumber being an integer. It may be necessary
448             # to use alphanumeric at some point to avoid numeric overflows for large
449             # workloads or long-running processes. Simply compare TicketNumbers
450             # as strings, and you'll be safe.
451              
452             while ( $workerpool->pending_responses )
453             {
454             # get_response has a NoWait => 1 option for non-blocking reads
455             # if you'd rather write a polling loop instead.
456            
457             my $ticket = $workerpool->get_response; # or get_response( NoWait => 1)
458             printf "Answer was %s\n", $$ticket{Response}->[0];
459             }
460              
461             $workerpool->shutdown;
462              
463             sub add_one {
464             my $input = shift;
465             return $input + 1;
466             }
467              
468              
469             =cut
470              
471             =head1 PURPOSE
472              
473             This module implements a WorkerPool multithreading model. Several concurrent
474             threads are started using a single sub for processing. All requests are serviced
475             in parallel using the sub provided.
476              
477             MultiThread::WorkerPool is ideal when you have many items that must all be processed
478             similarly, as quickly as possible. Simply write the sub that will handle the processing
479             and hand it off to MultiThread::WorkerPool to run several instances of your sub
480             to process your work items.
481              
482             All items are put onto a single work queue, and the first available thread will
483             consume and process it. All threads in a Worker Pool are identical. Compare this
484             to a MultiThread::Pipeline, where each thread runs a different subroutine.
485              
486             =cut
487              
488             =head1 METHODS
489              
490             =cut
491              
492              
493             require 5.008;
494              
495             use strict;
496             use warnings;
497              
498             # This has to be before "use Thread::Queue"!
499             use base qw(MultiThread::Base);
500              
501             use threads::shared;
502             use Thread::Queue;
503             use Data::Dumper;
504              
505             =head2 new
506              
507             Create a new MultiThread::WorkerPool object.
508              
509             MultiThread::WorkerPool->new( %opts );
510              
511             =head3 MaxWorkers
512              
513             The MaxWorkers parameter overrides automatic detection of CPU count. Normally,
514             the WorkerPool will figure out how many CPUs are on the host machine, and will
515             start an equal number of workers. If it incorrectly detects CPU count for your machine,
516             or if you know it's safe to start more or less, you can use this parameter
517             to do so.
518              
519             =head3 EntryPoint
520              
521             Pass in a sub reference to the initial sub to be called in each thread. This sub will
522             be called for each item on the queue, and will run in parallel with itself.
523              
524             =cut
525              
526             sub new
527             {
528              
529             my $class = shift;
530             my %opts = @_;
531              
532             unless ( $opts{EntryPoint} )
533             {
534             print "You must supply a coderef using the EntryPoint parameter!\n";
535             return undef;
536             }
537              
538             my %defaults = (
539             MaxWorkers => &get_CPU_count()
540             );
541              
542              
543             map {
544             $opts{$_} = $defaults{$_} unless defined $opts{$_};
545             } keys %defaults;
546              
547             my $self = $class->SUPER::new;
548              
549             $$self{EntryPoint} = $opts{EntryPoint};
550             $$self{MaxWorkers} = $opts{MaxWorkers};
551            
552             #printf "Starting %d worker threads.\n", $$self{MaxWorkers};
553              
554             $self = bless($self, $class);
555              
556             $self->start_pool;
557              
558             return $self;
559              
560             }
561              
562             =head1 worker_count
563              
564             Returns the number of worker threads in this WorkerPool instance.
565              
566             =cut
567              
568             sub worker_count
569             {
570             my $self = shift;
571             return $self->{MaxWorkers};
572             }
573              
574             # I think this can be combined with MultiThread::Pipeline::start_pipeline and moved to MultiThread::Base.
575             # They're very similar.
576             sub start_pool
577             {
578             my $self = shift;
579              
580             my $class = ref($self);
581              
582             my $inputq = Thread::Queue->new;
583             my $outputq = Thread::Queue->new;
584             my $entrypoint = $$self{EntryPoint};
585              
586             $$self{Requests} = $inputq;
587             $$self{Responses} = $outputq;
588              
589             share($inputq);
590              
591             for (my $x = 0; $x < $$self{MaxWorkers}; $x++)
592             {
593             my $t = threads->create(\&MultiThread::Base::worker, $self, $entrypoint, $inputq, $outputq);
594              
595             push (@{ $$self{Threads} }, $t) if ($t);
596             }
597              
598             return 1;
599             }
600              
601             sub get_CPU_count
602             {
603             my $procs = Sys::CPU::cpu_count();
604             return $procs ? $procs : 1; # In case cpu_count returns 0 or undef
605             }
606              
607             =head1 COMMON INSTANCE METHODS
608              
609             Both MultiThread::WorkerPool and MultiThread::Pipeline derive from MultiThread::Base,
610             so they share a number of methods.
611              
612             =head2 pending_responses
613              
614             Returns a boolean signifying whether there are still outstanding requests to be
615             processed. This will return true until the last response has been collected.
616              
617             This method takes no arguments.
618              
619              
620             =head2 get_response
621              
622             When worker subs finish their work, their return values are put back onto a response queue
623             for collection. Call this method on your WorkerPool or Pipeline object to retrieve the
624             return tickets, one at a time.
625              
626             The value returned is not only the return value of the worker thread. It is a hash containing
627             TicketNumber, OriginalRequest, Response, and Exception.
628              
629             =head3 Exception
630              
631             If the Pipeline or WorkerPool die()'s, this will be set to the die() message. This is so that
632             a single request's problem does not prevent other requests from running to completion.
633              
634             I do not provide a facility for allowing one request to kill the whole program because there's
635             no way of knowing the state of each request at the time the program died. If you really want
636             that behavior, do something like this in your get_response loop:
637              
638             die($$ticket{Exception}) if ($$ticket{Exception});
639              
640              
641             =head3 OriginalRequest
642              
643             The original request as given to send_request. This is necessary because the Request
644             is set to the return value of each sub for use in Pipelining.
645              
646             =head3 Response
647              
648             The return values of the final sub in a Pipeline or the Worker in a WorkerPool. This will be an
649             ARRAYREF because PERL subs can return arrays. You'll need to dereference it. Versions of MultiThread
650             prior to 0.9 only handled a single return value, which is why dereferencing was not necessary prior to
651             that version.
652              
653             =head3 TicketNumber
654              
655             The request number assigned to the request and returned to the caller of send_request. This
656             number persists throughout the process for the purpose of matching up the response with the
657             request.
658              
659             =head2 send_request
660              
661             This enqueues a request for processing. It takes no arguments of its own; all arguments
662             given will be passed directly to the subs you provide as @_. Call this exactly as you
663             would call your worker/pipeline methods directly. Just remember that any arguments
664             given must be serializable by the Storable module.
665              
666             This sub returns a unique ticket number (unique within the scope of the current instance).
667             This ticket number will be present in the response as well, so you can match up the
668             request with the response ticket if you need to.
669              
670             =head2 shutdown
671              
672             Tell the WorkerPool or Pipeline that it should finish its pending processing, stop the worker
673             processes, and exit. Shutdown will wait for all threads to exit before returning to the caller.
674             In cases of nested objects, e.g. Pipelines containing WorkerPools, the parent object will call
675             shutdown() on its child MultiThread objects as well.
676              
677             =cut
678              
679              
680             =head1 BUGS
681              
682             Be careful that you're passing serializable data types that can be freeze()'d and thaw()'d.
683             These modules make extensive use of Thread::Queue, which requires all structures
684             be serialized before being passed onto the queues.
685              
686              
687             =head1 AUTHOR
688              
689             David Spadea
690             http://www.spadea.net
691              
692             =cut
693              
694             =head1 COPYRIGHT & LICENSE
695              
696             Copyright 2008 David Spadea, all rights reserved.
697              
698             This program is free software; you can redistribute it and/or modify it
699             under the same terms as Perl itself.
700              
701              
702             =cut
703              
704             1;
705