File Coverage

blib/lib/RPC/Lite/Client.pm
Criterion Covered Total %
statement 24 166 14.4
branch 0 68 0.0
condition 0 12 0.0
subroutine 8 31 25.8
pod 11 18 61.1
total 43 295 14.5


line stmt bran cond sub pod time code
1             package RPC::Lite::Client;
2            
3 3     3   15 use strict;
  3         7  
  3         91  
4            
5 3     3   25 use RPC::Lite;
  3         6  
  3         62  
6 3     3   1555 use RPC::Lite::MessageQuantizer;
  3         7  
  3         71  
7 3     3   1530 use RPC::Lite::Request;
  3         7  
  3         65  
8 3     3   1547 use RPC::Lite::Response;
  3         15  
  3         75  
9 3     3   1509 use RPC::Lite::Error;
  3         6  
  3         64  
10 3     3   15 use RPC::Lite::Notification;
  3         4  
  3         57  
11            
12 3     3   3322 use Data::Dumper;
  3         32925  
  3         6193  
13            
14             our $DEFAULTSERIALIZER = 'JSON';
15            
16             =pod
17            
18             =head1 NAME
19            
20             RPC::Lite::Client - Lightweight RPC client framework.
21            
22             =head1 SYNOPSIS
23            
24             use RPC::Lite::Client;
25            
26             my $client = RPC::Lite::Client->new(
27             {
28             Transport => 'TCP:Host=blah.foo.com,Port=10000',
29             Serializer => 'JSON', # JSON is actually the default,
30             # this argument is unnecessary
31             }
32             );
33            
34             my $result = $client->Request('HelloWorld');
35            
36             =head1 DESCRIPTION
37            
38             RPC::Lite::Client implements a very lightweight remote process
39             communications client framework. It can use arbitrary Transport
40             (RPC::Lite::Transport) and Serialization (RPC::Lite::Serializer)
41             mechanisms.
42            
43             =over 4
44            
45             =cut
46            
47 0 0   0 0   sub SerializerType { $_[0]->{serializertype} = $_[1] if @_ > 1; $_[0]->{serializertype} }
  0            
48 0 0   0 1   sub Serializer { $_[0]->{serializer} = $_[1] if @_ > 1; $_[0]->{serializer} }
  0            
49 0 0   0 1   sub Transport { $_[0]->{transport} = $_[1] if @_ > 1; $_[0]->{transport} }
  0            
50 0 0   0 0   sub IdCounter { $_[0]->{idcounter} = $_[1] if @_ > 1; $_[0]->{idcounter} }
  0            
51 0 0   0 0   sub CallbackIdMap { $_[0]->{callbackidmap} = $_[1] if @_ > 1; $_[0]->{callbackidmap} }
  0            
52 0 0   0 0   sub Connected { $_[0]->{connected} = $_[1] if @_ > 1; $_[0]->{connected} }
  0            
53 0 0   0 1   sub DieOnError { $_[0]->{dieonerror} = $_[1] if @_ > 1; $_[0]->{dieonerror} }
  0            
54 0 0   0 0   sub MessageQueue { $_[0]->{messagequeue} = $_[1] if @_ > 1; $_[0]->{messagequeue} }
  0            
55 0 0   0 0   sub MessageQuantizer { $_[0]->{messagequantizer} = $_[1] if @_ > 1; $_[0]->{messagequantizer} }
  0            
56 0 0   0 0   sub Stream { $_[0]->{stream} = $_[1] if @_ > 1; $_[0]->{stream} }
  0            
57            
58             =pod
59            
60             =item C
61            
62             Creates a new RPC::Lite::Client object. Takes a hash reference of arguments.
63            
64             =over 4
65            
66             =item Supported Arguments
67            
68             =over 4
69            
70             =item Serializer
71            
72             A string specifying the RPC::Lite::Serializer to use when communicating
73             with the server. See 'perldoc RPC::Lite::Serializers' for a list of
74             supported serializers.
75            
76             =item Transport
77            
78             A string specifying the transport layer to use to connect to the server.
79             The string is of the format:
80            
81             [:[=[,=...]]]
82            
83             Eg, for a TCP connection to the host 'blah.foo.com' on port 10000:
84            
85             TCP:Host=blah.foo.com,Port=10000
86            
87             See 'perldoc RPC::Lite::Transports' for a list of supported transport
88             mechanisms.
89            
90             =item ManualConnect
91            
92             A boolean value indicating whether or not you wish to connect manually,
93             rather than at object instantiation. If set to true, you are required
94             to call Connect() on the client object before attempting to make
95             requests.
96            
97             =item DieOnError
98            
99             If true, errors from the server will die(). If false, a warning will
100             be emitted (warn()) and undef will be returned from C. True
101             by default.
102            
103             =back
104            
105             =back
106            
107             =cut
108            
109             sub new
110             {
111 0     0 1   my $class = shift;
112 0           my $args = shift;
113            
114 0           my $self = {};
115 0           bless $self, $class;
116            
117 0           $self->Connected( 0 );
118            
119 0           $self->MessageQuantizer( RPC::Lite::MessageQuantizer->new() );
120            
121 0           $self->__InitializeSerializer( $args->{Serializer} );
122 0           $self->__InitializeTransport( $args->{Transport} );
123            
124 0           $self->MessageQueue( [] );
125 0           $self->IdCounter( 1 );
126 0           $self->CallbackIdMap( {} );
127 0           $self->Stream( '' );
128            
129             # default to death on error
130 0 0         $self->DieOnError( exists( $args->{DieOnError} ) ? $args->{DieOnError} : 1 );
131            
132 0 0         $self->Initialize( $args ) if ( $self->can( 'Initialize' ) );
133            
134 0 0         if ( !$args->{ManualConnect} )
135             {
136 0 0         if ( !$self->Connect() )
137             {
138 0           print "Could not connect to server!\n";
139 0           exit 1;
140             }
141             }
142            
143 0           return $self;
144             }
145            
146             sub __InitializeSerializer
147             {
148 0     0     my $self = shift;
149 0           my $serializerType = shift;
150            
151 0 0         $serializerType = $DEFAULTSERIALIZER if ( !length( $serializerType ) );
152            
153 0           my $serializerClass = 'RPC::Lite::Serializer::' . $serializerType;
154            
155 0           eval "use $serializerClass";
156 0 0         if ( $@ )
157             {
158 0           die( "Could not load serializer of type [$serializerClass]" );
159             }
160            
161 0           my $serializer = $serializerClass->new();
162 0 0         if ( !defined( $serializer ) )
163             {
164 0           die( "Could not construct serializer: $serializerClass" );
165             }
166            
167 0           $self->SerializerType( $serializerType );
168 0           $self->Serializer( $serializer );
169             }
170            
171             sub __InitializeTransport
172             {
173 0     0     my $self = shift;
174            
175 0           my $transportSpec = shift;
176            
177 0           my ( $transportType, $transportArgString ) = split( ':', $transportSpec, 2 );
178            
179 0           my $transportClass = 'RPC::Lite::Transport::' . $transportType;
180            
181 0           eval "use $transportClass";
182 0 0         if ( $@ )
183             {
184 0           die( "Could not load transport of type [$transportClass]" );
185             }
186            
187 0           my $transport = $transportClass->new( $transportArgString );
188 0 0         if ( !defined( $transport ) )
189             {
190 0           die( "Could not construct transport: $transportClass" );
191             }
192            
193 0           $self->Transport( $transport );
194             }
195            
196             ############
197             # These are public methods
198            
199             =pod
200            
201             =item C
202            
203             Explicitly connects to the server. If this method is not called, the client will
204             attempt to automatically connect when the first request is sent.
205            
206             =cut
207            
208             sub Connect
209             {
210 0     0 1   my $self = shift;
211            
212 0 0         return 1 if ( $self->Connected() );
213            
214 0 0         return 0 if ( !$self->Transport->Connect() );
215            
216 0           my $handshakeContent = sprintf( $RPC::Lite::HANDSHAKEFORMATSTRING, $RPC::Lite::VERSION, $self->SerializerType(), $self->Serializer->GetVersion() );
217 0           $self->Transport->WriteData( $self->MessageQuantizer->Pack( $handshakeContent ) );
218            
219 0           $self->Connected( 1 );
220 0           return 1;
221             }
222            
223             =pod
224            
225             =item C
226            
227             Sends a request to the server. Returns a native object that is the result of the request.
228            
229             =cut
230            
231             sub Request
232             {
233 0     0 1   my $self = shift;
234            
235 0           my $response = $self->RequestResponseObject( @_ );
236            
237             # if it's an error (user has turned off fatal errors), return undef, otherwise return the result
238 0 0         return $response->isa( 'RPC::Lite:Error' ) ? undef : $response->Result;
239             }
240            
241             =pod
242            
243             =item C
244            
245             Sends an asynchronous request to the server. Takes a callback code
246             reference. After calling this, you'll probably want to call
247             HandleResponse in a loop to check for a response from the server, at
248             which point your callback will be executed and passed a native object
249             which is the result of the call.
250            
251             =cut
252            
253             sub AsyncRequest
254             {
255 0     0 1   my $self = shift;
256 0           my $callBack = shift;
257 0           my $methodName = shift;
258            
259             # __SendRequest returns the Id the given request was assigned
260 0           my $requestId = $self->__SendRequest( RPC::Lite::Request->new( $methodName, \@_ ) );
261 0           $self->CallbackIdMap->{$requestId} = [ $callBack, 0 ]; # coderef, bool: wants RPC::Lite::Response object
262             }
263            
264             =pod
265            
266             =item C
267            
268             Sends a request to the server. Returns an RPC::Lite::Response object.
269            
270             May be mixed in with calls to AsyncRequest. Not threadsafe.
271            
272             =cut
273            
274             # FIXME better name?
275             sub RequestResponseObject
276             {
277 0     0 1   my $self = shift;
278 0           my $method = shift;
279            
280 0           my $request = RPC::Lite::Request->new( $method, \@_ ); # pass arrayref of remaining args as method params
281 0           $self->__SendRequest( $request );
282            
283 0           my $response;
284             # Loop until the matching response comes back (i.e. this is blocking).
285             # We throw away any response with a mismatched Id assuming it was generated
286             # by an AsyncRequest call, in which case __GetResponse will run the callback.
287             # Note that this isn't threadsafe, because we might throw away a response to a
288             # non-async request generated by another thread. The moral of the story is,
289             # separate threads need separate Client objects, or async requests.
290             # XXX: Most of the client code will probably not be threadsafe. Maybe we should just state that up front. Or should we make an attempt to be threadsafe?
291 0   0       do {
292 0           $response = $self->__GetResponse();
293             } until (defined $response and $response->Id == $request->Id);
294            
295 0           return $response;
296             }
297            
298             =pod
299            
300             =item C
301            
302             Sends an asynchronous request to the server. Takes a callback code
303             reference. After calling this, you'll probably want to call
304             HandleResponse in a loop to check for a response from the server, at
305             which point your callback will be executed and passed an RPC::Lite::Response
306             object holding the result of the call.
307            
308             =cut
309            
310             sub AsyncRequestResponseObject
311             {
312 0     0 1   my $self = shift;
313 0           my $callBack = shift;
314 0           my $methodName = shift;
315            
316             # __SendRequest returns the Id the given request was assigned
317 0           my $requestId = $self->__SendRequest( RPC::Lite::Request->new( $methodName, \@_ ) );
318 0           $self->CallbackIdMap->{$requestId} = [ $callBack, 1 ]; # coderef, bool: wants RPC::Lite::Response object
319             }
320            
321            
322             =pod
323            
324             =item C
325            
326             Sends a 'notification' to the server. That is, it makes a request,
327             but expects no response.
328            
329             =cut
330            
331             sub Notify
332             {
333 0     0 1   my $self = shift;
334 0           $self->__SendRequest( RPC::Lite::Notification->new( shift, \@_ ) ); # method and params arrayref
335             }
336            
337             # FIXME sub NotifyResponse, for trapping local transport errors cleanly?
338            
339            
340             =pod
341            
342             =item C
343            
344             Checks for a response from the server. Useful mostly in conjunction
345             with AsyncRequest. You can pass a timeout, or the Transport's default
346             timeout will be used. Returns an Error object if there was an error,
347             otherwise returns undef.
348            
349             =cut
350            
351             sub HandleResponse
352             {
353 0     0 1   my $self = shift;
354 0           my $timeout = shift;
355            
356 0           return $self->__GetResponse($timeout);
357             }
358            
359             ##############
360             # The following are private methods.
361            
362             sub __SendRequest
363             {
364 0     0     my ( $self, $request ) = @_; # request could be a Notification
365            
366 0 0         return -1 if ( !$self->Connected() );
367            
368 0           my $id = $self->IdCounter( $self->IdCounter + 1 );
369 0           $request->Id( $id );
370 0           my $serializedContent = $self->Serializer->Serialize( $request );
371 0           $self->Transport->WriteData( $self->MessageQuantizer->Pack( $serializedContent ) );
372            
373 0           return $id;
374             }
375            
376             sub __GetResponse
377             {
378 0     0     my $self = shift;
379 0           my $timeout = shift;
380            
381             # if our queue is empty, try to get some new messages
382 0           my $message;
383 0 0         if ( !@{ $self->MessageQueue } )
  0            
384             {
385 0           my $newData = $self->Transport->ReadData( $timeout );
386            
387 0 0 0       if ( !defined( $newData ) or !length( $newData ) )
388             {
389 0 0 0       if ( $timeout or $self->Transport->Timeout )
390             {
391 0           return; # no error, just no response yet
392             }
393             else
394             {
395 0           return RPC::Lite::Error->new( " Error reading data from server !" );
396             }
397             }
398            
399 0           $self->Stream( $self->Stream . $newData );
400            
401 0           $self->__ProcessStream();
402             }
403            
404 0           $message= shift @{ $self->MessageQueue() };
  0            
405            
406 0 0         return undef if ( !defined( $message ) );
407            
408 0           my $response = $self->Serializer->Deserialize( $message );
409            
410 0 0         if ( !defined( $response ) )
411             {
412 0           return RPC::Lite::Error->new( " Could not deserialize response !" );
413             }
414            
415 0 0         if ( $response->isa( 'RPC::Lite::Error' ) )
416             {
417            
418             # NOTE: We had some code here that tried to reconstruct Error.pm
419             # objects that came over the wire, but that doesn't work very
420             # well in other languages, along with some other drawbacks in
421             # implementation. We need to look more at the best way to deal
422             # with errors.
423            
424             # this is the default, and is simplest for most users
425 0 0         if ( $self->DieOnError() )
426             {
427 0           die( $response->Error );
428             }
429             else
430             {
431 0           warn( $response->Error );
432             }
433             }
434            
435 0 0         if ( exists( $self->CallbackIdMap->{ $response->Id } ) )
436             {
437 0           my ( $codeRef, $wantsResponseObject ) = @{ $self->CallbackIdMap->{ $response->Id } };
  0            
438            
439             # wrap the callback in some sanity checking
440 0 0 0       if ( defined( $codeRef ) && ref( $codeRef ) eq 'CODE' )
441             {
442 0 0         if ( $wantsResponseObject )
443             {
444 0           $codeRef->( $response );
445             }
446             else
447             {
448 0           $codeRef->( $response->Result );
449             }
450             }
451            
452 0           delete $self->CallbackIdMap->{ $response->Id };
453             }
454             else
455             {
456 0           return $response;
457             }
458             }
459            
460             # FIXME this is in Session, it's a shame it's cut and pasted here
461             sub __ProcessStream
462             {
463 0     0     my $self = shift;
464            
465 0 0         return undef if ( !length( $self->Stream ) );
466            
467 0           my $quantized = $self->MessageQuantizer->Quantize( $self->Stream );
468            
469 0           push( @{$self->MessageQueue}, @{ $quantized->{messages} } );
  0            
  0            
470            
471 0           $self->Stream( $quantized->{remainder} );
472            
473 0           return scalar( @{$self->MessageQueue} );
  0            
474             }
475             1;