File Coverage

lib/UR/Service/RPC/Message.pm
Criterion Covered Total %
statement 58 72 80.5
branch 11 18 61.1
condition 3 6 50.0
subroutine 8 8 100.0
pod 3 3 100.0
total 83 107 77.5


line stmt bran cond sub pod time code
1             package UR::Service::RPC::Message;
2              
3 2     2   72 use UR;
  2         2  
  2         20  
4 2     2   7 use FreezeThaw;
  2         3  
  2         97  
5 2     2   438 use IO::Select;
  2         1379  
  2         61  
6              
7 2     2   8 use strict;
  2         2  
  2         44  
8 2     2   8 use warnings;
  2         2  
  2         1062  
9             our $VERSION = "0.46"; # UR $VERSION;
10              
11             UR::Object::Type->define(
12             class_name => 'UR::Service::RPC::Message',
13             has => [
14             target_class => { is => 'String' },
15             method_name => { is => 'String' },
16             ],
17             has_optional => [
18             #arg_list => { is => 'ARRAY' },
19             params => { is => 'Object', is_many => 1 },
20             return_values => { is => 'Object', is_many => 1 },
21             'wantarray' => { is => 'Integer' },
22             fh => { is => 'IO::Handle' },
23             exception => { is => 'String' },
24             ],
25             is_transactional => 0,
26             );
27              
28              
29             sub create {
30 29     29 1 332 my($class,%params) = @_;
31              
32 29         78 foreach my $key ( 'params', 'return_values' ) {
33 58 100       220 if (!$params{$key}) {
    50          
34 23         57 $params{$key} = [];
35             } elsif (ref($params{$key}) ne 'ARRAY') {
36 0         0 $params{$key} = [ $params{$key} ];
37             }
38             }
39              
40 29         184 return $class->SUPER::create(%params);
41             }
42              
43            
44              
45              
46             sub send {
47 15     15 1 4674 my $self = shift;
48 15         28 my $fh = shift;
49              
50 15   66     69 $fh ||= $self->fh;
51              
52 15         24 my %struct;
53 15         36 foreach my $key ( qw (target_class method_name params wantarray return_values exception) ) {
54 90         150 $struct{$key} = $self->{$key};
55             }
56              
57 15         81 my $string = FreezeThaw::freeze(\%struct);
58 15         1950 $string = pack('N', length($string)) . $string;
59              
60 15         25 my $len = length($string);
61 15         30 my $sent = 0;
62 15         48 while($sent < $len) {
63 15         119 my $wrote = $fh->syswrite($string, $len - $sent, $sent);
64              
65 15 50       6196 if ($wrote) {
66 15         48 $sent += $wrote;
67             } else {
68             # The filehandle closed for some reason
69 0         0 $fh->close;
70 0         0 return undef;
71             }
72             }
73              
74 15         90 return $sent;
75             }
76              
77              
78              
79             sub recv {
80 15     15 1 4401 my($class, $fh, $timeout) = @_;
81              
82             # You can also call recv on a message object previously created
83 15 50 33     76 if (ref($class) && $class->isa('UR::Service::RPC::Message')) {
84 0         0 my $fh = $class->fh;
85 0         0 $class = ref($class);
86 0         0 return $class->recv($fh);
87             }
88              
89 15 100       47 if (@_ < 3) { # # if they didn't specify a timeout
90 5         9 $timeout = 5; # Default wait 5 sec
91             }
92              
93 15         95 my $select = IO::Select->new($fh);
94              
95             # read in the message len, 4 chars
96 15         785 my $msglen;
97 15         38 my $numchars = 0;
98 15         51 while ($numchars < 4) {
99 15 50       59 unless ($select->can_read($timeout)) {
100 0         0 $class->warning_message("Can't get message length, timed out");
101 0         0 return;
102             }
103              
104 15         117365 my $read = $fh->sysread($msglen, 4-$numchars, $numchars);
105              
106 15 50       235 unless ($read) {
107 0         0 $class->warning_message("Can't get message length: $!");
108 0         0 return;
109             }
110              
111 15         43 $numchars += $read;
112             }
113              
114 15         93 $msglen = unpack('N', $msglen);
115              
116 15         26 my $string = '';
117 15         26 $numchars = 0;
118 15         50 while ($numchars < $msglen) {
119 15 50       43 unless ($select->can_read($timeout)) {
120 0         0 $class->warning_message("Timed out reading message after $numchars bytes");
121 0         0 return;
122             }
123              
124 15         285 my $read = $fh->sysread($string, $msglen - $numchars, $numchars);
125              
126 15 50       136 unless($read) {
127 0         0 $class->warning_message("Error reading message after $numchars bytes: $!");
128 0         0 return;
129             }
130              
131 15         46 $numchars += $read;
132             }
133              
134 15         83 my($struct) = FreezeThaw::thaw($string);
135              
136 15         3747 my $obj = $class->create(%$struct, fh => $fh);
137              
138 15         112 return $obj;
139             }
140            
141            
142              
143             1;
144              
145             =pod
146              
147             =head1 NAME
148              
149             UR::Service::RPC::Message - Serializable object appropriate for sending RPC messages
150              
151             =head1 SYNOPSIS
152              
153             my $msg = UR::Service::RPC::Message->create(
154             target_class => 'URT::RPC::Thingy',
155             method_name => 'join',
156             params => ['-', @join_args],
157             'wantarray' => 0,
158             );
159             $msg->send($fh);
160              
161             my $resp = UR::Service::RPC::Message->recv($fh, 5);
162              
163             =head1 DESCRIPTION
164              
165             This class is used as a message-passing interface by the RPC service modules.
166              
167             =head1 PROPERTIES
168              
169             These properties should be filled in by the initiating caller
170              
171             =over 4
172              
173             =item method_name => Text
174              
175             The name of the subroutine the initiator whishes to call.
176              
177             =item target_class => Text
178              
179             The namespace the initiator wants the subroutine to be called in
180              
181             =item params => ARRAY
182              
183             List of parameters to pass to the subroutine
184              
185             =item wantarray => Boolean
186              
187             What wantarray() context the subroutine should be called in.
188              
189             =back
190              
191             These properties are assigned after the RPC call to the subroutine
192              
193             =over 4
194              
195             =item return_values => ARRAY
196              
197             List of values returned by the subroutine
198              
199             =item exception
200              
201             On the receiving side, the subroutine is called within an eval. If there
202             was an exception, C stores the value of $@, or the empty string.
203             The receiving side should also fill-in C if there was an
204             authentication failure.
205              
206             =item fh
207              
208             C fills this in with the file handle the message was read from.
209              
210             =back
211              
212             =head1 METHODS
213              
214             =over 4
215              
216             =item send
217              
218             $bytes = $msg->send($fh);
219              
220             Serializes the Message object with FreezeThaw and writes the data to the
221             filehandle $fh. Returns the number of bytes written. $bytes will be
222             false if there was an error.
223              
224             =item recv
225              
226             $response = UR::Service::RPC::Message->recv($fh,$timeout);
227              
228             $response = $msg->recv();
229              
230             Reads a serialized Message from the filehandle and constructs a Message
231             object that is then returned to the caller. In the first case, it reads
232             from the given filehandle, waiting a maximum of $timeout seconds with
233             select before giving up. In the second case, it reads from whatever
234             filehandle is stored in $msg to read data from.
235              
236             =back
237              
238             =head1 SEE ALSO
239              
240             UR::Service::RPC::Server, UR::Service::RPC::Executor
241              
242             =cut
243              
244