File Coverage

blib/lib/Tangence/Stream.pm
Criterion Covered Total %
statement 99 112 88.3
branch 19 30 63.3
condition 14 15 93.3
subroutine 22 23 95.6
pod 5 13 38.4
total 159 193 82.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2020 -- leonerd@leonerd.org.uk
5              
6             package Tangence::Stream 0.30;
7              
8 12     12   3346 use v5.26;
  12         42  
9 12     12   63 use warnings;
  12         25  
  12         367  
10 12     12   503 use experimental 'signatures';
  12         3376  
  12         63  
11              
12 12     12   1269 use Carp;
  12         25  
  12         785  
13              
14 12     12   81 use Tangence::Constants;
  12         24  
  12         2228  
15 12     12   4529 use Tangence::Message;
  12         34  
  12         22668  
16              
17             # A map from request codes to method names
18             # Can't use => operator because it would quote the barewords on the left, but
19             # we want them as constants
20             my %REQ_METHOD = (
21             MSG_CALL, 'handle_request_CALL',
22             MSG_SUBSCRIBE, 'handle_request_SUBSCRIBE',
23             MSG_UNSUBSCRIBE, 'handle_request_UNSUBSCRIBE',
24             MSG_EVENT, 'handle_request_EVENT',
25             MSG_GETPROP, 'handle_request_GETPROP',
26             MSG_GETPROPELEM, 'handle_request_GETPROPELEM',
27             MSG_SETPROP, 'handle_request_SETPROP',
28             MSG_WATCH, 'handle_request_WATCH',
29             MSG_UNWATCH, 'handle_request_UNWATCH',
30             MSG_UPDATE, 'handle_request_UPDATE',
31             MSG_DESTROY, 'handle_request_DESTROY',
32             MSG_WATCH_CUSR, 'handle_request_WATCH_CUSR',
33             MSG_CUSR_NEXT, 'handle_request_CUSR_NEXT',
34             MSG_CUSR_DESTROY, 'handle_request_CUSR_DESTROY',
35              
36             MSG_GETROOT, 'handle_request_GETROOT',
37             MSG_GETREGISTRY, 'handle_request_GETREGISTRY',
38             MSG_INIT, 'handle_request_INIT',
39             );
40              
41             =head1 NAME
42              
43             C - base class for C stream-handling mixins
44              
45             =head1 DESCRIPTION
46              
47             This module provides a base for L and L.
48             It is not intended to be used directly by C implementation code.
49              
50             It provides the basic layer of message serialisation, deserialisation, and
51             dispatching to methods that would handle the messages. Higher level classes
52             are used to wrap this functionallity, and provide implementations of methods
53             to handle the messages received.
54              
55             When a message is received, it will be passed to a method whose name depends
56             on the code of message received. The name will be C, followed
57             by the name of the message code, in uppercase; for example
58             C.
59              
60             =cut
61              
62             =head1 REQUIRED METHODS
63              
64             The following methods are required to be implemented by some class using this
65             mixin.
66              
67             =cut
68              
69             =head2 tangence_write
70              
71             $stream->tangence_write( $data )
72              
73             Write bytes of data to the connected peer. C<$data> will be a plain perl
74             string.
75              
76             =cut
77              
78             =head2 handle_request_$CODE
79              
80             $stream->handle_request_$CODE( $token, $message )
81              
82             Invoked on receipt of a given message code. C<$token> will be some opaque perl
83             scalar value, and C<$message> will be an instance of L.
84              
85             The value of the token has no particular meaning, other than to be passed to
86             the C method.
87              
88             =cut
89              
90             =head1 PROVIDED METHODS
91              
92             The following methods are provided by this mixin.
93              
94             =cut
95              
96             # Accessors for Tangence::Message decoupling
97             our $BUILTIN_STRUCTIDS;
98             our %BUILTIN_ID2STRUCT;
99             our %ALWAYS_PEER_HASSTRUCT;
100              
101             sub message_state
102             {
103             shift->{message_state} ||= {
104 231   100 231 0 1177 id2struct => { %BUILTIN_ID2STRUCT },
105             next_structid => $BUILTIN_STRUCTIDS,
106             next_cursorid => 1,
107             }
108             }
109              
110 101   100 101 0 621 sub peer_hasobj { shift->{peer_hasobj} ||= {} }
111 121   100 121 0 586 sub peer_hasclass { shift->{peer_hasclass} ||= {} }
112 326   100 326 0 1174 sub peer_hasstruct { shift->{peer_hasstruct} ||= { %ALWAYS_PEER_HASSTRUCT } }
113 25   100 25 0 287 sub peer_hascursor { shift->{peer_hascursor} ||= {} }
114              
115             sub identity
116             {
117 28     28 0 407 my $self = shift;
118 28 100       133 $self->{identity} = shift if @_;
119 28         103 return $self->{identity};
120             }
121              
122             =head2 tangence_closed
123              
124             $stream->tangence_closed
125              
126             Informs the object that the underlying connection has now been closed, and any
127             attachments to C or C instances
128             should now be dropped.
129              
130             =cut
131              
132             sub tangence_closed
133             {
134 2     2 1 12 my $self = shift;
135              
136 2         3 foreach my $id ( keys %{ $self->peer_hasobj } ) {
  2         10  
137 2         5 my $obj = $self->get_by_id( $id );
138 2         6 $obj->unsubscribe_event( "destroy", delete $self->peer_hasobj->{$id} );
139             }
140             }
141              
142             =head2 tangence_readfrom
143              
144             $stream->tangence_readfrom( $buffer )
145              
146             Informs the object that more data has been read from the underlying connection
147             stream. Whole messages will be removed from the beginning of the C<$buffer>,
148             which should be passed as a direct scalar (because it will be modified). This
149             method will invoke the required C methods. Any bytes
150             remaining that form the start of a partial message will be left in the buffer.
151              
152             =cut
153              
154             sub tangence_readfrom
155             {
156 295     295 1 19191 my $self = shift;
157              
158 295         731 while( length $_[0] ) {
159 295 50       658 last unless length $_[0] >= 5;
160 295         918 my ( $code, $len ) = unpack( "CN", $_[0] );
161 295 50       689 last unless length $_[0] >= 5 + $len;
162              
163 295         637 substr( $_[0], 0, 5, "" );
164 295         627 my $payload = substr( $_[0], 0, $len, "" );
165              
166 295         1360 my $message = Tangence::Message->new( $self, $code, $payload );
167              
168 295 100       691 if( $code < 0x80 ) {
169 147         259 push @{ $self->{request_queue} }, undef;
  147         326  
170 147         301 my $token = \$self->{request_queue}[-1];
171              
172 147 50 66     387 if( !$self->minor_version and $code != MSG_INIT ) {
173 0         0 $self->respondERROR( $token, "Cannot accept any message except MSG_INIT before MSG_INIT" );
174 0         0 next;
175             }
176              
177 147 50       470 if( my $method = $REQ_METHOD{$code} ) {
178 147 50       809 if( $self->can( $method ) ) {
179 147         546 $self->$method( $token, $message );
180             }
181             else {
182 0         0 $self->respondERROR( $token, sprintf( "Cannot respond to request code 0x%02x", $code ) );
183             }
184             }
185             else {
186 0         0 $self->respondERROR( $token, sprintf( "Unrecognised request code 0x%02x", $code ) );
187             }
188             }
189             else {
190 148         266 my $on_response = shift @{ $self->{responder_queue} };
  148         291  
191 148         390 $on_response->( $message );
192             }
193             }
194             }
195              
196 2         3 sub object_destroyed ( $self, $obj, $startsub, $donesub )
  2         4  
  2         3  
197 2     2 0 5 {
  2         4  
  2         4  
198 2         7 $startsub->();
199              
200 2         22 my $objid = $obj->id;
201              
202 2         7 delete $self->peer_hasobj->{$objid};
203              
204             $self->request(
205             request => Tangence::Message->new( $self, MSG_DESTROY )
206             ->pack_int( $objid ),
207              
208             on_response => sub {
209 2     2   9 my ( $message ) = @_;
210 2         6 my $code = $message->code;
211              
212 2 50       9 if( $code == MSG_OK ) {
    0          
213 2         7 $donesub->();
214             }
215             elsif( $code == MSG_ERROR ) {
216 0         0 my $msg = $message->unpack_str();
217 0         0 print STDERR "Cannot get connection $self to destroy object $objid - error $msg\n";
218             }
219             else {
220 0         0 print STDERR "Cannot get connection $self to destroy object $objid - code $code\n";
221             }
222             },
223 2         15 );
224             }
225              
226             =head2 request
227              
228             $stream->request( %args )
229              
230             Serialises a message object to pass to the C method, then
231             enqueues a response handler to be invoked when a reply arrives. Takes the
232             following named arguments:
233              
234             =over 8
235              
236             =item request => Tangence::Message
237              
238             The message body
239              
240             =item on_response => CODE
241              
242             CODE reference to the callback to be invoked when a response to the message is
243             received. It will be passed the response message:
244              
245             $on_response->( $message )
246              
247             =back
248              
249             =head2 request (non-void)
250              
251             $response = $stream->request( request => $request )->get
252              
253             When called in non-void context, this method returns a L that will
254             yield the response instead. In this case it should not be given an
255             C callback.
256              
257             In this form, a C response will automatically turn into a failed
258             Future; the subsequent C or C code will not have to handle this
259             case.
260              
261             =cut
262              
263 148         209 sub request ( $self, %args )
264 148     148 1 277 {
  148         314  
  148         209  
265 148 50       428 my $request = $args{request} or croak "Expected 'request'";
266              
267 148         237 my $f;
268             my $on_response;
269 148 100       318 if( defined wantarray ) {
270 68 50       169 $args{on_response} and croak "TODO: Can't take 'on_response' and return a Future";
271              
272 68         255 $f = $self->new_future;
273             $on_response = sub {
274 68     68   168 my ( $response ) = @_;
275 68 100       152 if( $response->code == MSG_ERROR ) {
276 7         20 $f->fail( $response->unpack_str(), tangence => );
277             }
278             else {
279 61         254 $f->done( $response );
280             }
281 68         750 };
282             }
283             else {
284 80 50       202 $on_response = $args{on_response} or croak "Expected 'on_response'";
285             }
286              
287 148         229 push @{ $self->{responder_queue} }, $on_response;
  148         371  
288              
289 148         369 my $payload = $request->payload;
290 148         366 $self->tangence_write(
291             pack "CNa*", $request->code, length($payload), $payload
292             );
293              
294 148         1593 return $f;
295             }
296              
297             =head2 respond
298              
299             $stream->respond( $token, $message )
300              
301             Serialises a message object to be sent to the C method. The
302             C<$token> value that was passed to the C method ensures that
303             it is sent at the correct position in the stream, to allow the peer to pair it
304             with the corresponding request.
305              
306             =cut
307              
308 147         215 sub respond ( $self, $token, $message )
  147         195  
309 147     147 1 212 {
  147         223  
  147         191  
310 147         325 my $payload = $message->payload;
311 147         391 my $response = pack "CNa*", $message->code, length($payload), $payload;
312              
313 147         319 $$token = $response;
314              
315 147         454 while( defined $self->{request_queue}[0] ) {
316 147         225 $self->tangence_write( shift @{ $self->{request_queue} } );
  147         516  
317             }
318             }
319              
320 0         0 sub respondERROR ( $self, $token, $string )
  0         0  
321 0     0 0 0 {
  0         0  
  0         0  
322 0         0 $self->respond( $token, Tangence::Message->new( $self, MSG_ERROR )
323             ->pack_str( $string )
324             );
325             }
326              
327             =head2 minor_version
328              
329             $ver = $stream->minor_version
330              
331             Returns the minor version negotiated by the C / C
332             initial message handshake.
333              
334             =cut
335              
336             sub minor_version
337             {
338 203     203 1 803 my $self = shift;
339 203 100       506 ( $self->{tangence_minor_version} ) = @_ if @_;
340 203   100     877 return $self->{tangence_minor_version} // 0;
341             }
342              
343             # Some (internal) methods that control new protocol features
344              
345             # wire protocol uses typed smash data
346 31     31   113 sub _ver_can_typed_smash { shift->minor_version >= 4 }
347              
348             # wire protocol understands FLOAT* types
349 19     19   63 sub _ver_can_num_float { shift->minor_version >= 4 }
350              
351             =head1 AUTHOR
352              
353             Paul Evans
354              
355             =cut
356              
357             0x55AA;