File Coverage

blib/lib/Tangence/Stream.pm
Criterion Covered Total %
statement 99 112 88.3
branch 19 30 63.3
condition 14 15 93.3
subroutine 22 23 95.6
pod 5 13 38.4
total 159 193 82.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2011-2020 -- leonerd@leonerd.org.uk
5              
6             package Tangence::Stream 0.28;
7              
8 12     12   3240 use v5.26;
  12         36  
9 12     12   57 use warnings;
  12         21  
  12         319  
10 12     12   429 use experimental 'signatures';
  12         2914  
  12         54  
11              
12 12     12   1224 use Carp;
  12         23  
  12         755  
13              
14 12     12   81 use Tangence::Constants;
  12         28  
  12         2073  
15 12     12   4214 use Tangence::Message;
  12         31  
  12         19549  
16              
17             # A map from request codes to method names
18             # Can't use => operator because it would quote the barewords on the left, but
19             # we want them as constants
20             my %REQ_METHOD = (
21             MSG_CALL, 'handle_request_CALL',
22             MSG_SUBSCRIBE, 'handle_request_SUBSCRIBE',
23             MSG_UNSUBSCRIBE, 'handle_request_UNSUBSCRIBE',
24             MSG_EVENT, 'handle_request_EVENT',
25             MSG_GETPROP, 'handle_request_GETPROP',
26             MSG_GETPROPELEM, 'handle_request_GETPROPELEM',
27             MSG_SETPROP, 'handle_request_SETPROP',
28             MSG_WATCH, 'handle_request_WATCH',
29             MSG_UNWATCH, 'handle_request_UNWATCH',
30             MSG_UPDATE, 'handle_request_UPDATE',
31             MSG_DESTROY, 'handle_request_DESTROY',
32             MSG_WATCH_CUSR, 'handle_request_WATCH_CUSR',
33             MSG_CUSR_NEXT, 'handle_request_CUSR_NEXT',
34             MSG_CUSR_DESTROY, 'handle_request_CUSR_DESTROY',
35              
36             MSG_GETROOT, 'handle_request_GETROOT',
37             MSG_GETREGISTRY, 'handle_request_GETREGISTRY',
38             MSG_INIT, 'handle_request_INIT',
39             );
40              
41             =head1 NAME
42              
43             C - base class for C stream-handling mixins
44              
45             =head1 DESCRIPTION
46              
47             This module provides a base for L and L.
48             It is not intended to be used directly by C implementation code.
49              
50             It provides the basic layer of message serialisation, deserialisation, and
51             dispatching to methods that would handle the messages. Higher level classes
52             are used to wrap this functionallity, and provide implementations of methods
53             to handle the messages received.
54              
55             When a message is received, it will be passed to a method whose name depends
56             on the code of message received. The name will be C, followed
57             by the name of the message code, in uppercase; for example
58             C.
59              
60             =cut
61              
62             =head1 REQUIRED METHODS
63              
64             The following methods are required to be implemented by some class using this
65             mixin.
66              
67             =cut
68              
69             =head2 tangence_write
70              
71             $stream->tangence_write( $data )
72              
73             Write bytes of data to the connected peer. C<$data> will be a plain perl
74             string.
75              
76             =cut
77              
78             =head2 handle_request_$CODE
79              
80             $stream->handle_request_$CODE( $token, $message )
81              
82             Invoked on receipt of a given message code. C<$token> will be some opaque perl
83             scalar value, and C<$message> will be an instance of L.
84              
85             The value of the token has no particular meaning, other than to be passed to
86             the C method.
87              
88             =cut
89              
90             =head1 PROVIDED METHODS
91              
92             The following methods are provided by this mixin.
93              
94             =cut
95              
96             # Accessors for Tangence::Message decoupling
97             our $BUILTIN_STRUCTIDS;
98             our %BUILTIN_ID2STRUCT;
99             our %ALWAYS_PEER_HASSTRUCT;
100              
101             sub message_state
102             {
103             shift->{message_state} ||= {
104 231   100 231 0 1088 id2struct => { %BUILTIN_ID2STRUCT },
105             next_structid => $BUILTIN_STRUCTIDS,
106             next_cursorid => 1,
107             }
108             }
109              
110 101   100 101 0 660 sub peer_hasobj { shift->{peer_hasobj} ||= {} }
111 121   100 121 0 553 sub peer_hasclass { shift->{peer_hasclass} ||= {} }
112 326   100 326 0 972 sub peer_hasstruct { shift->{peer_hasstruct} ||= { %ALWAYS_PEER_HASSTRUCT } }
113 25   100 25 0 278 sub peer_hascursor { shift->{peer_hascursor} ||= {} }
114              
115             sub identity
116             {
117 28     28 0 417 my $self = shift;
118 28 100       140 $self->{identity} = shift if @_;
119 28         98 return $self->{identity};
120             }
121              
122             =head2 tangence_closed
123              
124             $stream->tangence_closed
125              
126             Informs the object that the underlying connection has now been closed, and any
127             attachments to C or C instances
128             should now be dropped.
129              
130             =cut
131              
132             sub tangence_closed
133             {
134 2     2 1 10 my $self = shift;
135              
136 2         3 foreach my $id ( keys %{ $self->peer_hasobj } ) {
  2         8  
137 2         8 my $obj = $self->get_by_id( $id );
138 2         6 $obj->unsubscribe_event( "destroy", delete $self->peer_hasobj->{$id} );
139             }
140             }
141              
142             =head2 tangence_readfrom
143              
144             $stream->tangence_readfrom( $buffer )
145              
146             Informs the object that more data has been read from the underlying connection
147             stream. Whole messages will be removed from the beginning of the C<$buffer>,
148             which should be passed as a direct scalar (because it will be modified). This
149             method will invoke the required C methods. Any bytes
150             remaining that form the start of a partial message will be left in the buffer.
151              
152             =cut
153              
154             sub tangence_readfrom
155             {
156 295     295 1 13758 my $self = shift;
157              
158 295         642 while( length $_[0] ) {
159 295 50       587 last unless length $_[0] >= 5;
160 295         855 my ( $code, $len ) = unpack( "CN", $_[0] );
161 295 50       639 last unless length $_[0] >= 5 + $len;
162              
163 295         561 substr( $_[0], 0, 5, "" );
164 295         542 my $payload = substr( $_[0], 0, $len, "" );
165              
166 295         1170 my $message = Tangence::Message->new( $self, $code, $payload );
167              
168 295 100       692 if( $code < 0x80 ) {
169 147         206 push @{ $self->{request_queue} }, undef;
  147         317  
170 147         326 my $token = \$self->{request_queue}[-1];
171              
172 147 50 66     412 if( !$self->minor_version and $code != MSG_INIT ) {
173 0         0 $self->respondERROR( $token, "Cannot accept any message except MSG_INIT before MSG_INIT" );
174 0         0 next;
175             }
176              
177 147 50       468 if( my $method = $REQ_METHOD{$code} ) {
178 147 50       858 if( $self->can( $method ) ) {
179 147         559 $self->$method( $token, $message );
180             }
181             else {
182 0         0 $self->respondERROR( $token, sprintf( "Cannot respond to request code 0x%02x", $code ) );
183             }
184             }
185             else {
186 0         0 $self->respondERROR( $token, sprintf( "Unrecognised request code 0x%02x", $code ) );
187             }
188             }
189             else {
190 148         201 my $on_response = shift @{ $self->{responder_queue} };
  148         285  
191 148         409 $on_response->( $message );
192             }
193             }
194             }
195              
196 2         5 sub object_destroyed ( $self, $obj, $startsub, $donesub )
  2         2  
  2         5  
197 2     2 0 4 {
  2         3  
  2         3  
198 2         6 $startsub->();
199              
200 2         6 my $objid = $obj->id;
201              
202 2         7 delete $self->peer_hasobj->{$objid};
203              
204             $self->request(
205             request => Tangence::Message->new( $self, MSG_DESTROY )
206             ->pack_int( $objid ),
207              
208             on_response => sub {
209 2     2   5 my ( $message ) = @_;
210 2         24 my $code = $message->code;
211              
212 2 50       9 if( $code == MSG_OK ) {
    0          
213 2         7 $donesub->();
214             }
215             elsif( $code == MSG_ERROR ) {
216 0         0 my $msg = $message->unpack_str();
217 0         0 print STDERR "Cannot get connection $self to destroy object $objid - error $msg\n";
218             }
219             else {
220 0         0 print STDERR "Cannot get connection $self to destroy object $objid - code $code\n";
221             }
222             },
223 2         13 );
224             }
225              
226             =head2 request
227              
228             $stream->request( %args )
229              
230             Serialises a message object to pass to the C method, then
231             enqueues a response handler to be invoked when a reply arrives. Takes the
232             following named arguments:
233              
234             =over 8
235              
236             =item request => Tangence::Message
237              
238             The message body
239              
240             =item on_response => CODE
241              
242             CODE reference to the callback to be invoked when a response to the message is
243             received. It will be passed the response message:
244              
245             $on_response->( $message )
246              
247             =back
248              
249             =head2 request (non-void)
250              
251             $response = $stream->request( request => $request )->get
252              
253             When called in non-void context, this method returns a L that will
254             yield the response instead. In this case it should not be given an
255             C callback.
256              
257             In this form, a C response will automatically turn into a failed
258             Future; the subsequent C or C code will not have to handle this
259             case.
260              
261             =cut
262              
263 148         175 sub request ( $self, %args )
264 148     148 1 240 {
  148         378  
  148         212  
265 148 50       414 my $request = $args{request} or croak "Expected 'request'";
266              
267 148         261 my $f;
268             my $on_response;
269 148 100       284 if( defined wantarray ) {
270 68 50       186 $args{on_response} and croak "TODO: Can't take 'on_response' and return a Future";
271              
272 68         247 $f = $self->new_future;
273             $on_response = sub {
274 68     68   150 my ( $response ) = @_;
275 68 100       149 if( $response->code == MSG_ERROR ) {
276 7         15 $f->fail( $response->unpack_str(), tangence => );
277             }
278             else {
279 61         240 $f->done( $response );
280             }
281 68         728 };
282             }
283             else {
284 80 50       219 $on_response = $args{on_response} or croak "Expected 'on_response'";
285             }
286              
287 148         205 push @{ $self->{responder_queue} }, $on_response;
  148         376  
288              
289 148         381 my $payload = $request->payload;
290 148         362 $self->tangence_write(
291             pack "CNa*", $request->code, length($payload), $payload
292             );
293              
294 148         1533 return $f;
295             }
296              
297             =head2 respond
298              
299             $stream->respond( $token, $message )
300              
301             Serialises a message object to be sent to the C method. The
302             C<$token> value that was passed to the C method ensures that
303             it is sent at the correct position in the stream, to allow the peer to pair it
304             with the corresponding request.
305              
306             =cut
307              
308 147         186 sub respond ( $self, $token, $message )
  147         193  
309 147     147 1 200 {
  147         180  
  147         166  
310 147         301 my $payload = $message->payload;
311 147         319 my $response = pack "CNa*", $message->code, length($payload), $payload;
312              
313 147         294 $$token = $response;
314              
315 147         395 while( defined $self->{request_queue}[0] ) {
316 147         195 $self->tangence_write( shift @{ $self->{request_queue} } );
  147         565  
317             }
318             }
319              
320 0         0 sub respondERROR ( $self, $token, $string )
  0         0  
321 0     0 0 0 {
  0         0  
  0         0  
322 0         0 $self->respond( $token, Tangence::Message->new( $self, MSG_ERROR )
323             ->pack_str( $string )
324             );
325             }
326              
327             =head2 minor_version
328              
329             $ver = $stream->minor_version
330              
331             Returns the minor version negotiated by the C / C
332             initial message handshake.
333              
334             =cut
335              
336             sub minor_version
337             {
338 203     203 1 730 my $self = shift;
339 203 100       488 ( $self->{tangence_minor_version} ) = @_ if @_;
340 203   100     836 return $self->{tangence_minor_version} // 0;
341             }
342              
343             # Some (internal) methods that control new protocol features
344              
345             # wire protocol uses typed smash data
346 31     31   106 sub _ver_can_typed_smash { shift->minor_version >= 4 }
347              
348             # wire protocol understands FLOAT* types
349 19     19   52 sub _ver_can_num_float { shift->minor_version >= 4 }
350              
351             =head1 AUTHOR
352              
353             Paul Evans
354              
355             =cut
356              
357             0x55AA;