File Coverage

blib/lib/RPC/Lite/Server.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package RPC::Lite::Server;
2              
3 3     3   18 use strict;
  3         6  
  3         119  
4              
5 3     3   3565 use threads;
  0            
  0            
6             use threads::shared;
7              
8             use RPC::Lite::Session;
9             use RPC::Lite::SessionManager;
10              
11             use RPC::Lite::Request;
12             use RPC::Lite::Response;
13             use RPC::Lite::Error;
14             use RPC::Lite::Signature;
15              
16             use Data::Dumper;
17              
18             my $DEBUG = $ENV{RPC_LITE_DEBUG};
19              
20             my $systemPrefix = 'system';
21             my $workerThreadsDefault = 10;
22              
23             =pod
24              
25             =head1 NAME
26              
27             RPC::Lite::Server - Lightweight RPC server framework.
28              
29             =head1 SYNOPSIS
30              
31             use strict;
32              
33             use RPC::Lite::Server;
34              
35             my $server = ExampleServer->new(
36             {
37             Transports => [ 'TCP:ListenPort=10000,LocalAddr=localhost' ],
38             Threaded => 1,
39             }
40             );
41              
42             $server->Loop;
43              
44             ###########################
45              
46             package ExampleServer;
47              
48             use base qw(RPC::Lite::Server);
49              
50             sub Initialize
51             {
52             my $self = shift;
53              
54             $self->AddSignature( 'GetTime=int:' ); # optional signatures
55             }
56              
57             sub GetTime
58             {
59             return time();
60             }
61              
62             ...
63              
64             =head1 DESCRIPTION
65              
66             RPC::Lite::Server implements a very lightweight remote process
67             communications server framework. It can use arbitrary Transport
68             (RPC::Lite::Transport) and Serialization (RPC::Lite::Serializer)
69             mechanisms. It supports optional method signatures and threading.
70              
71             =cut
72              
73             my %defaultMethods = (
74             "$systemPrefix.Uptime" => \&_Uptime,
75             "$systemPrefix.RequestCount" => \&_RequestCount,
76             "$systemPrefix.SystemRequestCount" => \&_SystemRequestCount,
77             "$systemPrefix.GetSignatures" => \&_GetSignatures,
78             "$systemPrefix.GetSignature" => \&_GetSignature,
79             );
80              
81             sub SessionManager { $_[0]->{sessionmanager} = $_[1] if @_ > 1; $_[0]->{sessionmanager} }
82             sub StartTime { $_[0]->{starttime} = $_[1] if @_ > 1; $_[0]->{starttime} }
83             sub Threaded { $_[0]->{threaded} = $_[1] if @_ > 1; $_[0]->{threaded} }
84             sub ThreadPool { $_[0]->{threadpool} = $_[1] if @_ > 1; $_[0]->{threadpool} }
85             sub PoolJobs { $_[0]->{pooljobs} = $_[1] if @_ > 1; $_[0]->{pooljobs} }
86             sub WorkerThreads { $_[0]->{workerthreads} = $_[1] if @_ > 1; $_[0]->{workerthreads} }
87             sub Signatures { $_[0]->{signatures} = $_[1] if @_ > 1; $_[0]->{signatures} }
88              
89             sub RequestCount
90             {
91             lock( $_[0]->{requestcount} );
92             $_[0]->{requestcount} = $_[1] if @_ > 1;
93             return $_[0]->{requestcount};
94             }
95              
96             sub SystemRequestCount
97             {
98             lock( $_[0]->{systemrequestcount} );
99             $_[0]->{systemrequestcount} = $_[1] if @_ > 1;
100             return $_[0]->{systemrequestcount};
101             }
102              
103             sub __IncRequestCount { $_[0]->__IncrementSharedField( 'requestcount' ) }
104             sub __IncSystemRequestCount { $_[0]->__IncrementSharedField( 'systemrequestcount' ) }
105              
106             # helper for atomic counters
107             sub __IncrementSharedField
108             {
109             my $self = shift;
110             my $fieldName = shift;
111              
112             lock( $self->{$fieldName} );
113             return ++$self->{$fieldName};
114             }
115              
116             =pod
117              
118             =over 4
119              
120             =item C
121              
122             Creates a new RPC::Lite::Server object. Takes a hash reference to specify
123             arguments.
124              
125             =over 4
126              
127             =item Supported Arguments
128              
129             =over 4
130              
131             =item Transports
132              
133             An array reference to transport specifications which will determine which
134             transport layers are initialized by the Session Manager.
135              
136             =item Threaded
137              
138             Boolean value indicating whether or not the server should operate in a
139             threaded mode where requests are handed to worker threads for completion.
140              
141             This functionality depends on having the Thread::Pool module installed.
142              
143             This functionality can also seriously impact the way a server must be
144             implemented to handle concurrency, etc. It is not recommended that
145             this option be used unless you understand the necessary precautions
146             that must be taken when implementing threaded applications.
147              
148             =item WortherThreads
149              
150             Specifies the number of worker threads to use when threading is enabled.
151             Defaults to 10.
152              
153             =back
154              
155             =back
156              
157             =cut
158              
159             sub new
160             {
161             my $class = shift;
162             my $args = shift;
163              
164             my $self = { requestcount => undef, systemrequestcount => undef };
165             bless $self, $class;
166             share( $self->{requestcount} );
167             share( $self->{systemrequestcount} );
168              
169             $self->StartTime( time() ); # no need to share; set once and copied to children
170             $self->RequestCount( 0 );
171             $self->SystemRequestCount( 0 );
172              
173             $self->Threaded( $args->{Threaded} );
174             $self->WorkerThreads( defined( $args->{WorkerThreads} ) ? $args->{WorkerThreads} : $workerThreadsDefault );
175              
176             $self->Signatures( {} );
177              
178             $self->Initialize( $args ) if ( $self->can( 'Initialize' ) );
179              
180             $self->__InitializeThreadPool();
181             $self->__InitializeSessionManager( $args->{Transports} );
182            
183             $self->SessionManager->StartListening();
184            
185             return $self;
186             }
187              
188             sub __InitializeSessionManager
189             {
190             my $self = shift;
191             my $transportSpecs = shift;
192              
193             my $sessionManager = RPC::Lite::SessionManager->new(
194             {
195             TransportSpecs => $transportSpecs,
196             }
197             );
198              
199             die( "Could not create SessionManager!" ) if !$sessionManager;
200              
201             $self->SessionManager( $sessionManager );
202             }
203              
204             sub __InitializeThreadPool
205             {
206             my $self = shift;
207              
208             # abort if threading not requested, or already initialized
209             return if !$self->Threaded or $self->ThreadPool;
210              
211             if ( __PACKAGE__->IsThreadingSupported )
212             {
213             __Debug( 'threading enabled' );
214             eval "use Thread::Pool";
215             my $pool = Thread::Pool->new(
216             {
217             'workers' => $self->WorkerThreads,
218             'do' => sub { my $result = $self->__DispatchRequest( @_ ); return $result; },
219             }
220             );
221             $self->ThreadPool( $pool );
222             $self->PoolJobs( {} );
223             }
224             else
225             {
226             __Debug( 'threading requested, but not available' );
227             warn "Disabling threading for lack of Thread::Pool module."; # FIXME is this useful, or is the __Debug enough?
228             $self->Threaded( 0 );
229             }
230             }
231              
232             ############
233             # These are public methods that server authors may call.
234              
235             =pod
236              
237             =item C
238              
239             Returns true if server multithreading support is available, false otherwise.
240              
241             This is a class method, eg:
242              
243             my $server = RPC::Lite::Server->new(
244             {
245             ...,
246             Threaded => RPC::Lite::Server::IsThreadingSupported() ? 1 : 0,
247             }
248             );
249              
250             WARNING: Calling this before forking and doing some threading stuff in the child process
251             may hang the child. The ithreads docs even say it doesn't play well with fork(). It is
252             just mentioned here because it isn't obvious that calling this method does any thread stuff.
253              
254             =cut
255              
256             sub IsThreadingSupported
257             {
258             # FIXME need to make 'use threads' conditional on having ithreads available, otherwise a perl compiled without threads will just die when using this module.
259             eval "use Thread::Pool";
260             return $@ ? 0 : 1;
261             }
262              
263              
264             =pod
265              
266             =item C
267              
268             Loops, calling HandleRequest, and HandleResponses, does not return. Useful for a trivial server that doesn't need
269             to do anything else in its event loop.
270              
271             =cut
272              
273             sub Loop
274             {
275             my $self = shift;
276              
277             while ( 1 )
278             {
279             $self->HandleRequests();
280             $self->HandleResponses();
281             }
282             }
283              
284             =pod
285              
286             =item C
287              
288             Handles all pending requests, dispatching them to the underlying RPC implementation class.
289              
290             Instead of calling C some servers may implement their own update loops,
291             calling C repeatedly.
292              
293             =cut
294              
295             sub HandleRequests
296             {
297             my $self = shift;
298              
299             $self->SessionManager->PumpSessions();
300              
301             my @readySessions = $self->SessionManager->GetReadySessions();
302              
303             foreach my $session ( @readySessions )
304             {
305             my $request = $session->GetRequest();
306             next if !defined $request;
307              
308             if ( $self->Threaded ) # asynchronous operation
309             {
310             __Debug( "passing request to thread pool" );
311             # god, dirty, we need to save this return value or the
312             # results will be discarded...
313             my $jobId = $self->ThreadPool->job( $request );
314             $self->PoolJobs->{$jobId} = $session->Id();
315             }
316             else # synchronous
317             {
318             my $result = $self->__DispatchRequest( $request );
319             $session->Write( $result ) if defined( $result );
320             }
321             }
322             }
323              
324             =pod
325              
326             =item C
327              
328             When threading is enabled, this method looks for completed requests
329             and returns them to the requesting client.
330              
331             =cut
332              
333             # pump the thread pool and write out responses to clients
334             sub HandleResponses
335             {
336             my $self = shift;
337              
338             return if !$self->Threaded;
339              
340             my @readyJobs = $self->ThreadPool->results();
341             __Debug( "jobs finished: " . scalar( @readyJobs ) ) if @readyJobs;
342             foreach my $jobId ( @readyJobs )
343             {
344             my $response = $self->ThreadPool->result( $jobId );
345             my $sessionId = $self->PoolJobs->{$jobId};
346             my $session = $self->SessionManager->GetSession( $sessionId );
347             if ( defined( $session ) )
348             {
349             $session->Write( $response );
350             __Debug( " id:$jobId" );
351             }
352             delete $self->PoolJobs->{$jobId};
353             }
354             }
355              
356             =pod
357              
358             =item C
359              
360             Adds a signature for the given method. Signatures can be used to verify
361             that clients and servers agree on method specifications. However, they
362             are optional because most RPC implementations are done with close
363             coupling of server and client development where developers are unlikely
364             to need verification of server/client agreement.
365              
366             See RPC::Lite::Signature for details on the format for specifying
367             signatures.
368              
369             =cut
370              
371             sub AddSignature
372             {
373             my $self = shift;
374             my $signatureString = shift;
375              
376             my $signature = RPC::Lite::Signature->new( $signatureString );
377              
378             if ( !$self->can( $signature->MethodName() ) )
379             {
380             warn( "Attempted to add a signature for a method [" . $signature->MethodName . "] we are not capable of!" );
381             return;
382             }
383              
384             $self->Signatures->{ $signature->MethodName } = $signature;
385             }
386              
387             #
388             #############
389              
390             ##############
391             # The following are private methods.
392              
393             sub __FindMethod
394             {
395             my ( $self, $methodName ) = @_;
396              
397             __Debug( "looking for method in: " . ref( $self ) );
398             my $coderef = $self->can( $methodName ) || $defaultMethods{$methodName};
399              
400             return $coderef;
401             }
402              
403             sub __DispatchRequest
404             {
405             my $self = shift;
406             my $request = shift;
407              
408             my $method = $self->__FindMethod( $request->Method );
409             my $response = undef;
410              
411             if ( $method )
412             {
413              
414             # implementation package has the method, so we call it with the params
415             __Debug( "dispatching to: " . $request->Method );
416             eval { $response = $method->( $self, @{ $request->Params } ) }; # may return a pre-encoded Response, or just some data
417             __Debug( " returned:\n\n" );
418             __Debug( Dumper $response );
419             if ( $@ )
420             {
421             __Debug( "method died" );
422              
423             # attempt to detect an Error.pm object
424             my $error = $@;
425             if ( UNIVERSAL::isa( $@, 'Error' ) )
426             {
427             $error = { %{$@} }; # copy the blessed hashref into a ref to a plain one
428             }
429              
430             $response = RPC::Lite::Error->new( $error ); # FIXME security issue - exposing implementation details to the client
431             }
432             elsif ( !UNIVERSAL::isa( $response, 'RPC::Lite::Response' ) )
433             {
434              
435             # method just returned some plain data, so we construct a Response object with it
436            
437             $response = RPC::Lite::Response->new( $response );
438             }
439              
440             # else, the method returned a Response object already so we just let it be
441             }
442             else
443             {
444              
445             # implementation package doesn't have the method
446             $response = RPC::Lite::Error->new( "unknown method: " . $request->Method );
447             }
448              
449             $response->Id( $request->Id ); # make sure the response's id matches the request's id
450              
451             use Data::Dumper;
452             __Debug( "returning:\n\n" );
453             __Debug( Dumper $response );
454              
455             ###########################################################
456             ## keep track of how many method calls we've handled...
457             if ( $request->Method !~ /^$systemPrefix\./ )
458             {
459             $self->__IncRequestCount();
460             }
461             else
462             {
463             $self->__IncSystemRequestCount();
464             }
465              
466             return $response;
467             }
468              
469             #=============
470              
471             sub __Debug
472             {
473             return if !$DEBUG;
474              
475             my $message = shift;
476             my ( $package, $filename, $line, $subroutine ) = caller( 1 );
477             my $threadId = threads->tid;
478             print STDERR "[$threadId] $subroutine: $message\n";
479             }
480              
481             #=============
482              
483             sub _Uptime
484             {
485             my $self = shift;
486              
487             return time() - $self->StartTime;
488             }
489              
490             sub _RequestCount
491             {
492             my $self = shift;
493              
494             return $self->RequestCount;
495             }
496              
497             sub _SystemRequestCount
498             {
499             my $self = shift;
500              
501             return $self->SystemRequestCount;
502             }
503              
504             sub _GetSignatures
505             {
506             my $self = shift;
507              
508             my @signatures;
509              
510             foreach my $methodName ( keys( %{ $self->Signatures } ) )
511             {
512             my $signature = $self->Signatures->{$methodName};
513              
514             push( @signatures, $signature->AsString() );
515             }
516              
517             return \@signatures;
518             }
519              
520             sub _GetSignature
521             {
522             my $self = shift;
523             my $methodName = shift;
524              
525             return $self->Signatures->{$methodName}->AsString();
526             }
527              
528             =pod
529              
530             =back
531              
532             =head1 AUTHORS
533              
534             Andrew Burke (aburke@bitflood.org)
535             Jeremy Muhlich (jmuhlich@bitflood.org)
536              
537             =cut
538              
539             1;