File Coverage

blib/lib/POE/Component/Client/opentick/Protocol.pm
Criterion Covered Total %
statement 208 257 80.9
branch 35 68 51.4
condition 9 27 33.3
subroutine 48 60 80.0
pod 11 11 100.0
total 311 423 73.5


line stmt bran cond sub pod time code
1             package POE::Component::Client::opentick::Protocol;
2             #
3             # opentick.com POE client
4             #
5             # Protocol handling (only operates on data, no socket handling)
6             #
7             # infi/2008
8             #
9             # $Id: Protocol.pm 56 2009-01-08 16:51:14Z infidel $
10             #
11             # See docs/implementation-notes.txt for a detailed explanation of how
12             # this module works.
13             #
14             # Full user POD documentation after __END__
15             #
16              
17 2     2   9 use strict;
  2         4  
  2         76  
18 2     2   9 use warnings;
  2         4  
  2         52  
19 2     2   12 use Carp qw( croak );
  2         4  
  2         105  
20 2     2   12 use Data::Dumper;
  2         4  
  2         89  
21 2     2   11 use POE;
  2         4  
  2         14  
22              
23             # Ours.
24 2     2   769 use POE::Component::Client::opentick::Constants;
  2         5  
  2         532  
25 2     2   13 use POE::Component::Client::opentick::Util;
  2         4  
  2         174  
26 2     2   12 use POE::Component::Client::opentick::Output;
  2         4  
  2         134  
27 2     2   11 use POE::Component::Client::opentick::Error;
  2         4  
  2         102  
28 2     2   6591 use POE::Component::Client::opentick::ProtocolMsg;
  2         5  
  2         88  
29              
30             ###
31             ### Variables
32             ###
33              
34 2     2   17 use vars qw( $VERSION $TRUE $FALSE $KEEP $DELETE $poe_kernel );
  2         3  
  2         8628  
35              
36             ($VERSION) = q$Revision: 56 $ =~ /(\d+)/;
37             *TRUE = \1;
38             *FALSE = \0;
39             *KEEP = \0;
40             *DELETE = \1;
41              
42             # These arguments are for this object; pass the rest on.
43             my %valid_args = (
44             alias => $KEEP,
45             debug => $KEEP,
46             rawdata => $KEEP,
47             );
48              
49             my $state_base = 'POE::Component::Client::opentick::ProtocolMsg';
50              
51             ########################################################################
52             ### Public methods ###
53             ########################################################################
54              
55             sub new
56             {
57 1     1 1 7 my( $class, @args ) = @_;
58 1 50       5 croak( "$class requires an even number of parameters" ) if( @args & 1 );
59              
60 1         7 my $self = {
61             # User prefs
62             alias => OTDefault( 'alias' ),
63             rawdata => $FALSE, # user prefers to receive raw response
64             # data instead of ::Record objects
65             debug => $FALSE,
66             # Protocol settings
67             heartbeat => OTDefault( 'heartbeat' ), # beat delay in secs
68             request_timeout => OTDefault( 'request_timeout' ), # request timeout
69             # Protocol state
70             requests => {}, # outstanding requests keyed on ID
71             # stamp = timestamp
72             # cmd_id = command ID
73             # respcount = response count
74             # cancel_rqid = cancel request ID
75             # sender = sender POE ID
76             partial_data => '', # stash incomplete s
77             # Object containers
78             state_obj => undef, # object reference for ProtocolMsg
79             # handlers => {}, # loaded ProtocolMsg subclasses
80             # Statistical information
81             messages_sent => 0,
82             messages_recv => 0,
83             records_recv => 0,
84             errors_recv => 0,
85             };
86              
87 1         15 bless( $self, $class );
88              
89 1         7 my @leftovers = $self->initialize( @args );
90              
91             # Create a protocol state handler object with the leftover args
92 1         14 $self->{state_obj} =
93             POE::Component::Client::opentick::ProtocolMsg->new( @leftovers );
94              
95             # $self->_load_handler_subclasses();
96              
97 1         6 return( $self );
98             }
99              
100             # Initialize this object instance
101             sub initialize
102             {
103 1     1 1 6 my( $self, %args ) = @_;
104              
105             # Keep our things...
106 1         5 for( keys( %args ) )
107             {
108             # grab them regardless
109 7 100       31 $self->{lc $_} = $args{$_} if( exists( $valid_args{lc $_} ) );
110             # delete them if true
111 7 50       19 delete( $args{ $_ } ) if( $valid_args{lc $_} );
112             }
113              
114             # ... return the rest.
115 1         8 return( %args );
116             }
117              
118             # Construct a packet, register the request, and put the data on the wire
119             # XXX: Should we throttle outstanding requests here?
120             sub prepare_packet
121             {
122 4     4 1 9 my( $self, $sender_id, $cmd_id, @fields ) = @_;
123              
124             # Abort packet sending if non-existent request cancelled
125 4 50 33     15 if( OTCancel( $cmd_id ) && !$self->_request_exists( $fields[0] ) )
126             {
127 0         0 $self->_send_notification(
128             POE::Component::Client::opentick::Error->new(
129             CommandID => $cmd_id,
130             Message => 'No such request: ' . $fields[0],
131             )
132             );
133 0         0 return;
134             }
135              
136 4         19 my $req_id = $self->_add_request( $sender_id, $cmd_id );
137 4         16 my $packet = $self->_create_packet( $req_id, $cmd_id, @fields );
138              
139 4         16 $self->_inc_messages_sent();
140              
141             # Stash cancellation request ID for return packet
142 4 50       13 $self->_set_request_cancel_id( $req_id, $fields[0] )
143             if( OTCancel( $cmd_id ) );
144              
145 4         13 return( $packet, $req_id );
146             }
147              
148             # Handle and examine received packets
149             #
150             # This is complex, so here's the explanation:
151             # 1. If there is partial data stored from a previous run, prepend it.
152             # 2. Check the message length field.
153             # 3. If the data is still shorter than the message length, store as
154             # partial data for next loop and exit.
155             # 4. If it is long enough or longer, break off MsgLen bytes and process
156             # them, returning the remainder of the data to the caller.
157             # 5. Rinse and repeat (called in loop from caller).
158             #
159             # This is because the server can send packets that are smaller than a
160             # , the exact size of one , or containing multiple
161             # records per or multiple s per packet.
162             #
163             sub process_packet
164             {
165 3     3 1 8 my( $self, $data ) = @_;
166              
167 3         18 O_DEBUG( "process_packet( " . length( $data ) . " )" );
168              
169             # prepend the last packet received to the partial data, if apropo
170 3         17 $data = $self->_get_partial_data() . $data;
171 3         18 $self->_set_partial_data( undef );
172              
173             # check our length
174 3         13 my $msg_len = _get_message_length( $data );
175              
176 3         5 my( $leftover, $objects );
177             # Check if this packet contains a complete response
178 3 50       12 if( length( $data ) < ( $msg_len + 4 ) )
179             {
180 0         0 O_DEBUG( " packet not large enough; stashing." );
181             # Not large enough, stash it for next time.
182 0         0 $self->_set_partial_data( $data );
183 0         0 return ();
184             }
185             else # OK DESU.
186             {
187 3         7 my( $cmd_sts, $cmd_id, $req_id );
188              
189 3         12 O_DEBUG( " packet large enough; processing." );
190             # don't drop anything, store it for the next
191 3         11 $leftover = substr( $data, $msg_len + 4 );
192              
193 3         17 $self->_inc_messages_recv();
194              
195             # only work with one message, minus MessageLength
196 3         8 $data = substr( $data, 4, $msg_len );
197              
198 3         5 my( $msg_type );
199 3         11 ( $msg_type, $cmd_sts, $cmd_id, $req_id )
200             = _process_header( $data );
201              
202             # Drop message if invalid header or request_id not found
203 3 50       15 return( $leftover )
204             unless $self->_validate_header( $msg_type, $cmd_sts,
205             $cmd_id, $req_id );
206              
207             # chomp the header off, left only with the body.
208 3         11 $data = substr( $data, 12, $msg_len - 12 );
209              
210             # Everything is ready, process the body or notify of error
211 3 50       15 if( $cmd_sts == OTConstant( 'OT_STATUS_ERROR' ) )
212             {
213 0         0 push( @$objects, POE::Component::Client::opentick::Error->new(
214             RequestID => $req_id,
215             CommandID => $cmd_id,
216             Data => $data
217             ) );
218             }
219             else
220             {
221             # If this was a cancel response pkt, prune the original request.
222 3         13 $self->_cancel_commands( $req_id, $cmd_id );
223              
224             # FINALLY, process the body itself.
225 3         20 my $extradata;
226 3         13 ( $extradata, $objects ) =
227             $self->_process_body( $data, $req_id, $cmd_id );
228 3         10 $leftover .= $extradata;
229             }
230             }
231              
232 3         15 return( $leftover, $objects );
233             }
234              
235              
236             ########################################################################
237             ### Public Accessor methods ###
238             ########################################################################
239              
240             sub get_heartbeat_delay
241             {
242 1     1 1 2 my( $self ) = @_;
243              
244 1         9 return( $self->{heartbeat} );
245             }
246              
247             sub get_messages_sent
248             {
249 0     0 1 0 my( $self ) = @_;
250              
251 0         0 return( $self->{messages_sent} );
252             }
253              
254             sub get_messages_recv
255             {
256 0     0 1 0 my( $self ) = @_;
257              
258 0         0 return( $self->{messages_recv} );
259             }
260              
261             sub get_records_recv
262             {
263 0     0 1 0 my( $self ) = @_;
264              
265 0         0 return( $self->{records_recv} );
266             }
267              
268             sub get_errors_recv
269             {
270 0     0 1 0 my( $self ) = @_;
271              
272 0         0 return( $self->{errors_recv} );
273             }
274              
275              
276             ########################################################################
277             ### POE event handlers ###
278             ########################################################################
279              
280             # Generate a request packet to send to the server
281             # NOTE: This should be called with ->call() if you need the return value!
282             sub _ot_proto_issue_command
283             {
284 4     4   172 my( $self, $kernel, $sender, $cmd_id, @args )
285             = @_[OBJECT,KERNEL,SENDER,ARG0..$#_];
286              
287 4         15 my $sender_id = $sender->ID();
288              
289 4         26 O_DEBUG( sprintf( "_ot_proto_issue_command( %s ), from sender %s",
290             join( ', ', OTCommand( $cmd_id ), @args ),
291             $sender_id ) );
292              
293 4         18 my( $packet, $req_id )
294             = $self->prepare_packet( $sender_id, $cmd_id, @args );
295              
296 4 50       21 $kernel->call( $self->{alias}, '_ot_sock_send_packet', $packet )
297             if( $packet );
298              
299 4         31 return( $req_id );
300             }
301              
302             # Handle response packets from the server
303             sub _ot_proto_process_response
304             {
305 3     3   394 my( $self, $kernel, $data ) = @_[OBJECT,KERNEL,ARG0];
306 3         7 my( $cmd_sts, $cmd_id, $req_id, $objects );
307              
308 3         20 O_DEBUG( "_ot_proto_process_response( " . length( $data ) . " )" );
309              
310             # Loop to catch multiple messages sent per packet
311 3         13 while( $data )
312             {
313 3         18 ( $data, $objects ) = $self->process_packet( $data );
314              
315             # If we got something worthwhile...
316 3         10 for my $object ( @$objects )
317             {
318             # Notify the requestor of data or errors
319 1         7 $self->_send_notification( $object );
320             }
321              
322             # OPTIMIZATION: All messages in a single response will be from the
323             # same request, so SEPARATELY, for ONE OBJECT,
324             # Update the outstanding request list
325 3 100 66     26 if( @$objects and my $object = $objects->[0] )
326             {
327 1         6 $self->_update_requests( $object );
328             }
329             }
330              
331 3         15 return;
332             }
333              
334             # Handle End Of Data state from ProtocolMsg handlers
335             sub _ot_proto_end_of_data
336             {
337 0     0   0 my( $self, $kernel, $req_id, $cmd_id ) = @_[OBJECT, KERNEL, ARG0, ARG1];
338              
339             # Stab the request
340 0         0 my $sender = $self->_get_request_sender( $req_id );
341 0         0 $self->_prune_request( $req_id );
342              
343             # Notify the original requestor
344 0         0 $poe_kernel->yield( _notify_of_event =>
345             OTEvent( 'OT_REQUEST_COMPLETE' ),
346             [ $sender ], # extra sender list
347             $req_id,
348             $cmd_id );
349              
350 0         0 return;
351             }
352              
353             # Send a heartbeat and restart the timer
354             sub _ot_proto_heartbeat_send
355             {
356 1     1   120 my( $self, $kernel ) = @_[OBJECT,KERNEL];
357              
358 1         6 $kernel->call( $self->{alias},
359             '_ot_proto_issue_command',
360             OTConstant( 'OT_HEARTBEAT' ) );
361 1         9 $kernel->delay( '_ot_proto_heartbeat_send', $self->get_heartbeat_delay );
362              
363 1         159 return;
364             }
365              
366             # Stop the heartbeat timer
367             sub _ot_proto_heartbeat_stop
368             {
369 1     1   199 my( $self, $kernel ) = @_[OBJECT,KERNEL];
370              
371 1         6 $kernel->delay( '_ot_proto_heartbeat_send' );
372              
373 1         63 return;
374             }
375              
376             # Just a friendly wrapper to trap the event. Synchronously.
377             sub logout
378             {
379 1     1 1 37 my( $self, $kernel ) = @_[OBJECT,KERNEL];
380              
381 1         5 $kernel->call( $self->{alias},
382             '_ot_proto_issue_command',
383             OTConstant( 'OT_LOGOUT' )
384             );
385              
386 1         5 return;
387             }
388              
389             # Just a friendly wrapper to trap the event. Synchronously.
390             sub login
391             {
392 0     0 1 0 my( $self, $kernel ) = @_[OBJECT,KERNEL];
393              
394 0         0 $kernel->call( $self->{alias},
395             '_ot_proto_issue_command',
396             OTConstant( 'OT_LOGIN' )
397             );
398              
399 0         0 return;
400             }
401              
402             ########################################################################
403             ### Private methods ###
404             ########################################################################
405              
406             ### Requestor notification
407              
408             # Send notification to requestor
409             sub _send_notification
410             {
411 1     1   3 my( $self, $object ) = @_;
412              
413 1         6 my $cmd_id = $object->get_command_id();
414 1 50       6 return unless( $cmd_id );
415              
416 1         6 my $req_id = $object->get_request_id();
417 1         5 my $sender_id = $self->_get_request_sender( $req_id );
418 1         2 my $event;
419              
420 1 50       6 if( is_error( $object ) )
    50          
421             {
422 0         0 $event = OTEvent( 'OT_ON_ERROR' );
423 0         0 $self->_inc_errors_recv();
424             }
425             elsif( $object->is_eod )
426             {
427 0         0 $event = OTEvent( 'OT_REQUEST_COMPLETE' );
428             }
429             else
430             {
431 1         6 $event = OTEventByCommand( $cmd_id );
432 1 50       6 $self->_inc_records_recv()
433             if( $event eq OTEvent( 'OT_ON_DATA' ) );
434             }
435            
436             # SPECIAL CASE: We already sent the notification. Skip this.
437             # Have to send it high-priority.
438 1 50       5 undef( $sender_id ) if( $event eq OTEvent( 'OT_ON_LOGIN' ) );
439              
440             # G'wan and send it already already, already!
441 0         0 $poe_kernel->yield( _notify_of_event =>
442             $event,
443             [ $sender_id ],
444             $req_id,
445             $cmd_id,
446             # give them raw data if they really want it.
447             $self->{rawdata}
448 1 50       10 ? @{ $object->get_raw_data() }
449             : $object );
450              
451 1         85 return;
452             }
453              
454             ### Outgoing packet processing
455              
456             # Generate OT request packet
457             sub _create_packet
458             {
459 4     4   16 my( $self, $req_id, $cmd_id, @fields ) = @_;
460              
461 4         15 my $header = $self->_create_header( $req_id, $cmd_id );
462 4         15 my $body = $self->_create_body( $req_id, $cmd_id, @fields );
463 4         17 my $length =
464             $self->_create_msg_length( length( $header ) + length( $body ) );
465 4         10 my $packet = $length . $header . $body;
466              
467 4         10 return( $packet );
468             }
469              
470             # Generate MessageLength field.
471             sub _create_msg_length
472             {
473 4     4   9 my( $self, $msg_len ) = @_;
474              
475 4         12 my $junk = pack_binary( OTTemplate( 'MSG_LENGTH' ), $msg_len );
476              
477 4         11 return( $junk );
478             }
479              
480             # Generate OT packet header
481             sub _create_header
482             {
483 4     4   7 my( $self, $req_id, $cmd_id ) = @_;
484              
485 4         16 my $header = pack_binary(
486             OTTemplate( 'HEADER' ),
487             OTConstant( 'OT_MES_REQUEST' ),
488             OTConstant( 'OT_STATUS_OK' ),
489             $cmd_id,
490             $req_id,
491             );
492              
493 4         9 return( $header );
494             }
495              
496             # Generate OT packet message body
497             sub _create_body
498             {
499 4     4   11 my( $self, $req_id, $cmd_id, @fields ) = @_;
500              
501             # my $handler = $self->_get_state_handler( $cmd_id );
502 4         26 my $body = $self->{state_obj}->create_body( $req_id, $cmd_id, @fields );
503              
504 4         9 return( $body );
505             }
506              
507             ### Incoming packet processing
508              
509             # Return the MessageLength field
510             sub _get_message_length
511             {
512 3     3   11 my( $data ) = @_;
513              
514 3         21 my( $length ) = unpack_binary( OTTemplate( 'MSG_LENGTH' ), $data );
515              
516 3         26 return( $length );
517             }
518              
519             # Unpack a packet header
520             sub _process_header
521             {
522 3     3   7 my( $data ) = @_;
523              
524 3         13 my @fields = unpack_binary( OTTemplate( 'HEADER' ), $data );
525              
526 3         10 return( @fields );
527             }
528              
529             # Ensure the header fields are valid
530             # NOTE: I have generally tried to maintain the arg order of
531             # $sender_id, $request_id, $command_id, @etc
532             # throughout; but in functions that deal with packet contents themselves,
533             # the signature goes in packet contents order.
534             sub _validate_header
535             {
536 3     3   6 my( $self, $msg_type, $cmd_sts, $cmd_id, $req_id ) = @_;
537              
538 3 50       16 return( $FALSE ) unless( OTCmdStatus( $cmd_sts ) );
539 3 50       14 return( $FALSE ) unless( OTMsgType( $msg_type ) );
540 3 50       16 return( $FALSE ) unless( OTCommand( $cmd_id ) );
541 3 50       14 return( $FALSE ) unless( $self->_get_request_command( $req_id ) );
542              
543 3         14 return( $TRUE );
544             }
545              
546             # Handle the body of a response message through subclassed handlers
547             # XXX: This may have concurrency issues...
548             sub _process_body
549             {
550 3     3   10 my( $self, $body, $req_id, $cmd_id ) = @_;
551              
552             # my $handler = $self->_get_state_handler( $cmd_id );
553 3         29 my( $leftover, $results )
554             = $self->{state_obj}->process_body( $body, $req_id, $cmd_id );
555              
556 3         12 return( $leftover, $results );
557             }
558              
559             # Stash some data in the object for next loop
560             sub _set_partial_data
561             {
562 3     3   8 my( $self, $data ) = @_;
563              
564 3 50       16 $self->{partial_data} = defined( $data ) ? $data : '';
565              
566 3 50       53 return( defined( $data ) ? length( $data ) : 0 );
567             }
568              
569             # Retrieve (but keep) partial data from the object
570             sub _get_partial_data
571             {
572 3     3   7 my( $self, $data ) = @_;
573              
574 3         15 return( $self->{partial_data} );
575             }
576              
577             # Cancel entries from our request list if appropriate
578             sub _cancel_commands
579             {
580 3     3   8 my( $self, $req_id, $cmd_id ) = @_;
581              
582             # Bail out if this isn't a cancel command.
583 3 50       13 return unless( OTCancel( $cmd_id ) );
584              
585 0         0 my $cancel_id = $self->_get_request_cancel_id( $req_id );
586 0         0 my $cancelled = $self->_prune_request( $cancel_id );
587 0         0 $cancelled = $self->_prune_request( $req_id );
588              
589 0         0 O_DEBUG( "_cancel_commands( $req_id, $cmd_id ), cid=$cancel_id = $cancelled" );
590              
591 0         0 return( $cancelled );
592             }
593              
594             ### Outstanding request list processing
595              
596             # Generate an ID and add a request to the outstanding request list
597             sub _add_request
598             {
599 4     4   10 my( $self, $sender_id, $cmd_id ) = @_;
600              
601 4         13 my $id = $self->_get_next_request_id();
602              
603             # Don't save heartbeat requests in outstanding request queue.
604 4 100       15 unless( $cmd_id == OTConstant( 'OT_HEARTBEAT' ) )
605             {
606 3         14 $self->_update_request_time( $id );
607 3         12 $self->_update_request_sender( $id, $sender_id );
608 3         17 $self->_update_request_command( $id, $cmd_id );
609 3         10 $self->_update_request_respcount( $id, 0 );
610             }
611              
612 4         11 return( $id );
613             }
614              
615             # Remove request from catalog if appropriate
616             sub _update_requests
617             {
618 1     1   3 my( $self, $object ) = @_;
619              
620 1         5 my $packets_expected = OTResponses( $object->get_command_id() );
621 1         5 my $req_id = $object->get_request_id();
622              
623 1 50 33     4 if( $packets_expected <= OTConstant( 'OT_RESPONSES_ONE' ) ||
624             is_error( $object ) )
625             {
626 1         5 $self->_prune_request( $req_id );
627             }
628             else
629             {
630 0         0 $self->_update_request_time( $req_id );
631 0         0 $self->_update_request_respcount( $req_id );
632             }
633              
634             # Clean up ListSymbols and ListExchange requests while we're at it.
635 1         5 $self->_prune_old_requests();
636              
637 1         4 return;
638             }
639              
640             # Set the request_id that this command will cancel upon server confirmation
641             sub _set_request_cancel_id
642             {
643 0     0   0 my( $self, $req_id, $cancel_id ) = @_;
644              
645 0         0 O_DEBUG( "_set_request_cancel_id( $req_id, $cancel_id )" );
646              
647 0         0 return( $self->{requests}->{$cancel_id}->{cancel_rqid} = $req_id );
648             }
649              
650             # Update a request timestamp
651             sub _update_request_time
652             {
653 3     3   7 my( $self, $req_id ) = @_;
654              
655 3         13 $self->{requests}->{$req_id}->{stamp} = time;
656              
657 3         6 return;
658             }
659              
660             # Update or increment a request response count
661             sub _update_request_respcount
662             {
663 3     3   8 my( $self, $req_id, $new_count ) = @_;
664              
665 3 50       8 if( defined( $new_count ) )
666             {
667 3         11 $self->{requests}->{$req_id}->{respcount} = $new_count;
668             }
669             else
670             {
671 0         0 $self->{requests}->{$req_id}->{respcount}++;
672             }
673              
674 3         4 return;
675             }
676              
677             # Update a request POE session sender ID
678             sub _update_request_sender
679             {
680 3     3   5 my( $self, $req_id, $sender_id ) = @_;
681              
682 3         12 $self->{requests}->{$req_id}->{sender} = $sender_id;
683              
684 3         7 return;
685             }
686              
687             # Update a request command
688             sub _update_request_command
689             {
690 3     3   7 my( $self, $req_id, $cmd_id ) = @_;
691              
692 3         12 $self->{requests}->{$req_id}->{command} = $cmd_id;
693              
694 3         5 return;
695             }
696              
697             # Prune specified request, returning true if pruned.
698             sub _prune_request
699             {
700 1     1   2 my( $self, $req_id ) = @_;
701              
702 1 50       4 return unless( $req_id );
703              
704 1         4 my $pruned = delete( $self->{requests}->{$req_id} );
705              
706 1 50       6 return( $pruned ? $TRUE : $FALSE );
707             }
708              
709             # Remove outdated requests
710             sub _prune_old_requests
711             {
712 1     1   2 my( $self ) = @_;
713 1         4 my $timeout = $self->{request_timeout};
714              
715 1         4 for my $req_id ( $self->_get_requests() )
716             {
717 1         4 my $cmd_id = $self->_get_request_command( $req_id );
718              
719 1 0 0     5 if( ( time >
      33        
720             $self->_get_request_time( $req_id ) + $timeout ) and
721             ( $cmd_id == OTConstant( 'OT_REQUEST_LIST_EXCHANGES' ) or
722             $cmd_id == OTConstant( 'OT_REQUEST_LIST_SYMBOLS' ) or
723             $cmd_id == OTConstant( 'OT_REQUEST_LIST_SYMBOLS_EX' ) ) )
724             {
725 0         0 O_DEBUG( "pruning $req_id!" );
726 0         0 $self->_prune_request( $req_id );
727             }
728             }
729              
730 1         3 return;
731             }
732              
733             # Return list of all outstanding requests
734             sub _get_requests
735             {
736 1     1   2 my( $self ) = @_;
737              
738 1         3 return( keys( %{ $self->{requests} } ) );
  1         6  
739             }
740              
741             # Return boolean if request exists
742             sub _request_exists
743             {
744 0     0   0 my( $self, $req_id ) = @_;
745              
746 0 0       0 return( exists( $self->{requests}->{$req_id} ) ? $TRUE : $FALSE );
747             }
748              
749             # Return target ID for cancellation, if present
750             sub _get_request_cancel_id
751             {
752 0     0   0 my( $self, $req_id ) = @_;
753              
754 0         0 return( $self->{requests}->{$req_id}->{cancel_rqid} );
755             }
756              
757             # Get the sender of a request
758             sub _get_request_sender
759             {
760 1     1   6 my( $self, $req_id ) = @_;
761              
762 1         5 return( $self->{requests}->{$req_id}->{sender} );
763             }
764              
765             # Get the number of responses for this request
766             sub _get_request_respcount
767             {
768 0     0   0 my( $self, $req_id ) = @_;
769              
770 0   0     0 return( $self->{requests}->{$req_id}->{respcount} || 0 );
771             }
772              
773             # Return command of particular request
774             sub _get_request_command
775             {
776 4     4   10 my( $self, $req_id ) = @_;
777              
778 4 50       31 return( exists( $self->{requests}->{$req_id} )
779             ? $self->{requests}->{$req_id}->{command}
780             : undef
781             );
782             }
783              
784             # Return timestamp of particular request
785             sub _get_request_time
786             {
787 1     1   4 my( $self, $req_id ) = @_;
788              
789 1 50       10 return( exists( $self->{requests}->{$req_id} )
790             ? $self->{requests}->{$req_id}->{stamp}
791             : undef );
792             }
793              
794             # Generate and return a new, unique request ID number
795             { # CLOSURE
796             my $id;
797             sub _get_next_request_id
798             {
799 4     4   9 my( $self, $newid ) = @_;
800 4   100     28 $id = $newid || $id || 0;
801              
802 4         7 do {
803 4 50       27 $id = 1 if (++$id > 0xFFFFFFFF);
804 4 50       22 $id++ unless $id;
805             } while( exists( $self->{requests}->{ $id } ) );
806              
807 4         13 return $id;
808             }
809             } # /CLOSURE
810              
811             ### Statistical junk
812              
813             sub _inc_messages_sent
814             {
815 4     4   6 my( $self, $value ) = @_;
816              
817 4   50     31 return( $self->{messages_sent} += $value || 1 );
818             }
819              
820             sub _inc_messages_recv
821             {
822 3     3   5 my( $self, $value ) = @_;
823              
824 3   50     32 return( $self->{messages_recv} += $value || 1 );
825             }
826              
827             sub _inc_records_recv
828             {
829 0     0     my( $self, $value ) = @_;
830              
831 0   0       return( $self->{records_recv} += $value || 1 );
832             }
833              
834             sub _inc_errors_recv
835             {
836 0     0     my( $self, $value ) = @_;
837              
838 0   0       return( $self->{errors_recv} += $value || 1 );
839             }
840              
841             1;
842              
843             __END__