File Coverage

blib/lib/Ryu/Async.pm
Criterion Covered Total %
statement 210 265 79.2
branch 20 50 40.0
condition 24 57 42.1
subroutine 47 64 73.4
pod 13 14 92.8
total 314 450 69.7


line stmt bran cond sub pod time code
1             package Ryu::Async;
2             # ABSTRACT: IO::Async support for Ryu stream management
3 5     5   532029 use strict;
  5         76  
  5         158  
4 5     5   27 use warnings;
  5         9  
  5         208  
5              
6             our $VERSION = '0.018';
7              
8 5     5   3134 use utf8;
  5         72  
  5         26  
9              
10             =encoding UTF8
11              
12             =head1 NAME
13              
14             Ryu::Async - use L with L
15              
16             =head1 SYNOPSIS
17              
18             #!/usr/bin/env perl
19             use strict;
20             use warnings;
21             use IO::Async::Loop;
22             use Ryu::Async;
23             # This will generate a lot of output, but is useful
24             # for demonstrating lifecycles. Drop this to 'info' or
25             # 'debug' to make it more realistic.
26             use Log::Any::Adapter qw(Stdout), log_level => 'trace';
27             #
28             my $loop = IO::Async::Loop->new;
29             $loop->add(
30             my $ryu = Ryu::Async->new
31             );
32             {
33             my $timer = $ryu->timer(
34             interval => 0.10,
35             )->take(10)
36             ->each(sub { print "tick\n" });
37             warn $timer->describe;
38             $timer->get;
39             }
40              
41             =head1 DESCRIPTION
42              
43             This is an L subclass for interacting with L.
44              
45             =cut
46              
47 5     5   2742 use parent qw(IO::Async::Notifier);
  5         1556  
  5         30  
48              
49 5     5   82411 use IO::Async::Handle;
  5         65489  
  5         196  
50 5     5   2629 use IO::Async::Listener;
  5         18949  
  5         180  
51 5     5   2854 use IO::Async::Process;
  5         18178  
  5         157  
52 5     5   2695 use IO::Async::Resolver;
  5         203472  
  5         203  
53 5     5   2666 use IO::Async::Signal;
  5         3516  
  5         150  
54 5     5   2497 use IO::Async::Socket;
  5         5448  
  5         161  
55 5     5   38 use IO::Async::Stream;
  5         10  
  5         104  
56 5     5   2371 use IO::Async::Timer::Absolute;
  5         2762  
  5         157  
57 5     5   33 use IO::Async::Timer::Countdown;
  5         9  
  5         121  
58 5     5   2473 use IO::Async::Timer::Periodic;
  5         5031  
  5         172  
59              
60 5     5   2324 use Ryu::Async::Client;
  5         12  
  5         159  
61 5     5   2100 use Ryu::Async::Packet;
  5         18  
  5         242  
62 5     5   2023 use Ryu::Async::Server;
  5         15  
  5         159  
63              
64 5     5   2232 use Ryu::Sink;
  5         8949  
  5         144  
65 5     5   3165 use Ryu::Source;
  5         244341  
  5         350  
66              
67 5     5   2413 use URI::udp;
  5         64223  
  5         201  
68 5     5   2297 use URI::tcp;
  5         1073  
  5         226  
69 5     5   37 use Socket qw(pack_sockaddr_in inet_pton AF_INET);
  5         13  
  5         323  
70              
71 5     5   34 use curry::weak;
  5         11  
  5         116  
72              
73 5     5   29 use Syntax::Keyword::Try;
  5         12  
  5         42  
74              
75 5     5   2800 use Ryu '2.000';
  5         2400  
  5         647  
76 5     5   37 use Ryu::Source;
  5         22  
  5         108  
77              
78 5     5   2336 use Ryu::Async::Process;
  5         13  
  5         184  
79 5     5   36 use Scalar::Util qw(blessed weaken);
  5         13  
  5         300  
80              
81 5     5   34 use Log::Any qw($log);
  5         11  
  5         52  
82              
83             =head1 METHODS
84              
85             =cut
86              
87             =head2 from
88              
89             Creates a new L from a thing.
90              
91             The exact details of this are likely to change in future, but a few things that are expected to work:
92              
93             $ryu->from($io_async_stream_instance)
94             ->by_line
95             ->each(sub { print "Line: $_\n" });
96             $ryu->from([1..1000])
97             ->sum
98             ->each(sub { print "Total was $_\n" });
99              
100             =cut
101              
102             sub from {
103 2     2 1 14652 my $self = shift;
104              
105 2 100       15 if(my $class = blessed $_[0]) {
106 1 50       9 if($class->isa('IO::Async::Stream')) {
107 1         6 return $self->from_stream($_[0]);
108             } else {
109 0         0 die "Unable to determine appropriate source for $class";
110             }
111             }
112              
113 1         4 my $src = $self->source(label => 'from');
114 1 50       88 if(my $ref = ref $_[0]) {
115 1 50       4 if($ref eq 'ARRAY') {
116 1         2 my @pending = @{$_[0]};
  1         3  
117 1         5 weaken(my $weak_src = $src);
118 1         2 my $code;
119             $code = sub {
120 3     3   13041 my $src = $weak_src;
121 3 50 33     26 $src->emit(shift @pending) if @pending and $src;
122 3 100       1400 if(@pending) {
123 2         9 $self->loop->later($code);
124             } else {
125 1         5 $src->finish;
126 1         163 weaken $_ for $self, $code;
127             }
128 1         5 };
129 1         5 $self->loop->later($code);
130 1         42 return $src;
131             } else {
132 0         0 die "Unknown type $ref"
133             }
134             }
135              
136 0         0 my %args = @_;
137 0 0       0 if(my $dir = $args{directory}) {
138 0 0       0 opendir my $handle, $dir or die $!;
139 0         0 my $code;
140             $code = sub {
141 0 0   0   0 if(defined(my $item = readdir $handle)) {
142 0 0 0     0 $src->emit($item) unless $item eq '.' or $item eq '..';
143 0         0 $self->loop->later($code);
144             } else {
145 0         0 weaken($code);
146 0 0       0 closedir $handle or die $!;
147 0         0 $src->finish
148             }
149 0         0 };
150 0         0 $code->();
151 0         0 return $self;
152             }
153 0         0 die "unknown stuff";
154             }
155              
156             =head2 from_stream
157              
158             Create a new L from an L instance.
159              
160             Note that a stream which is not already attached to an L
161             will be added as a child of this instance.
162              
163             =cut
164              
165             sub from_stream {
166 1     1 1 4 my ($self, $stream, %args) = @_;
167              
168 1   50     10 my $src = $self->source(label => $args{label} // 'IaStream');
169              
170             # Our ->flow_control monitoring gives us a boolean
171             # value every time the state changes:
172             # 1 - we are active
173             # 0 - we are paused
174             # through sheer coïncidence, this is also what the
175             # IO::Async::Stream `->want_(read|write)ready` methods
176             # expect.
177 1         107 $src->flow_control
178             ->each($stream->curry::weak::want_readready);
179              
180             $stream->configure(
181             on_read => sub {
182 3     3   8590 my ($stream, $buffref, $eof) = @_;
183 3 100       30 $log->tracef("Have %d bytes of data, EOF = %s", length($$buffref), $eof ? 'yes' : 'no');
184 3         19 my $data = substr $$buffref, 0, length $$buffref, '';
185 3         15 $src->emit($data);
186 3 100 100     164 $src->finish if $eof && !$src->completed->is_ready;
187             }
188 1         59 );
189 1 50       76 unless($stream->parent) {
190 1         12 $self->add_child($stream);
191             $src->completed->on_ready(sub {
192 1 50   1   187 $self->remove_child($stream) if $stream->parent;
193 1         326 });
194             }
195 1         1418 return $src;
196             }
197              
198             =head2 to_stream
199              
200             Provides a L that will send data to an L instance.
201              
202             Requires the L and will return a new L instance.
203              
204             =cut
205              
206             sub to_stream {
207 0     0 1 0 my ($self, $stream, %args) = @_;
208              
209 0   0     0 my $sink = $self->sink(label => $args{label} // 'IaStream');
210              
211 0         0 $stream->configure(
212             on_writeable_start => $sink->curry::weak::resume,
213             on_writeable_stop => $sink->curry::weak::pause,
214             );
215             $sink->source
216             ->each(sub {
217 0     0   0 $stream->write($_)
218 0         0 });
219 0 0       0 unless($stream->parent) {
220 0         0 $self->add_child($stream);
221             $sink->completed->on_ready($self->$curry::weak(sub {
222 0     0   0 my ($self) = @_;
223 0 0       0 $self->remove_child($stream) if $stream->parent;
224 0         0 }));
225             }
226 0         0 return $sink;
227             }
228              
229             =head2 stdin
230              
231             Create a new L that wraps STDIN.
232              
233             As with other L wrappers, this will emit data as soon as it's available,
234             as raw bytes.
235              
236             Use L and L to split into lines and/or decode from UTF-8.
237              
238             =cut
239              
240             sub stdin {
241 0     0 1 0 my ($self) = @_;
242 0         0 return $self->from_stream(
243             IO::Async::Stream->new_for_stdin,
244             label => 'STDIN',
245             )
246             }
247              
248             =head2 stdout
249              
250             Returns a new L that wraps STDOUT.
251              
252             =cut
253              
254             sub stdout {
255 0     0 1 0 my ($self) = @_;
256 0         0 return $self->to_stream(
257             IO::Async::Stream->new_for_stdout,
258             label => 'STDOUT',
259             )
260             }
261              
262             =head2 stderr
263              
264             Returns a new L that wraps STDERR.
265              
266             =cut
267              
268             sub stderr {
269 0     0 1 0 my ($self) = @_;
270 0         0 return $self->to_stream(
271             IO::Async::Stream->new_for_stderr,
272             label => 'STDERR',
273             )
274             }
275              
276             =head2 timer
277              
278             Provides a L which emits an empty string at selected intervals.
279              
280             Takes the following named parameters:
281              
282             =over 4
283              
284             =item * interval - how often to trigger the timer, in seconds (fractional values allowed)
285              
286             =item * reschedule - type of rescheduling to use, can be C, C or C as documented
287             in L
288              
289             =back
290              
291             Example:
292              
293             $ryu->timer(interval => 1, reschedule => 'hard')
294             ->combine_latest(...)
295              
296             =cut
297              
298             sub timer {
299 1     1 1 6249 my ($self, %args) = @_;
300 1         4 my $src = $self->source(label => 'timer');
301             $self->add_child(
302             my $timer = IO::Async::Timer::Periodic->new(
303             reschedule => 'hard',
304             %args,
305 1     10   86 on_tick => $src->$curry::weak(sub { shift->emit('') }),
  10         1995131  
306             )
307             );
308 1         366 Scalar::Util::weaken($timer);
309             $src->on_ready($self->$curry::weak(sub {
310 1     1   985 my ($self) = @_;
311 1 50       12 return unless $timer;
312 1 50       13 $timer->stop if $timer->is_running;
313 1         34 $self->remove_child($timer)
314 1         9 }));
315 1         1312 $timer->start;
316 1         6046 $src
317             }
318              
319             =head2 run
320              
321             Creates an L.
322              
323             =cut
324              
325             sub run {
326 0     0 1 0 my ($self, $code, %args) = @_;
327 0 0       0 if(ref($code) eq 'ARRAY') {
    0          
328             # Fork and exec
329 0         0 $args{command} = $code;
330             } elsif(ref($code) eq 'CODE') {
331 0         0 $args{code} = $code;
332             }
333             $self->add_child(
334 0         0 my $process = Ryu::Async::Process->new(
335             process => IO::Async::Process->new(%args)
336             )
337             );
338 0         0 $process;
339             }
340              
341             =head2 source
342              
343             Returns a new L instance.
344              
345             =cut
346              
347             sub source {
348 6     6 1 33 my ($self, %args) = @_;
349 6   66     30 my $label = delete($args{label}) // do {
350 2         19 my $label = (caller 1)[0];
351 2         11 for($label) {
352 2         6 s/^Net::Async::/Na/g;
353 2         3 s/^IO::Async::/Ia/g;
354 2         6 s/^Web::Async::/Wa/g;
355 2         3 s/^Tickit::Async::/Ta/g;
356 2         3 s/^Tickit::Widget::/TW/g;
357 2         7 s/::([^:]*)$/->$1/;
358             }
359             $label
360 2         8 };
361 6         28 Ryu::Source->new(
362             new_future => $self->loop->curry::weak::new_future,
363             apply_timeout => $self->curry::timeout,
364             label => $label,
365             %args,
366             )
367             }
368              
369             =head2 udp_client
370              
371             Creates a new UDP client.
372              
373             This provides a sink for L packets, and a source for L responses.
374              
375             =over 4
376              
377             =item * C - an optional URI of the form C<< udp://host:port >>
378              
379             =item * C - which host to listen on, defaults to C<0.0.0.0>
380              
381             =item * C - the port to listen on
382              
383             =back
384              
385             Returns a L instance.
386              
387             =cut
388              
389             sub udp_client {
390 1     1 1 7464 my ($self, %args) = @_;
391              
392 1         4 my $uri = delete $args{uri};
393 1   50     15 $uri //= 'udp://' . join ':', $args{host} // '*', $args{port} // ();
      33        
      33        
394 1 50       12 $uri = URI->new($uri) unless ref $uri;
395 1         121 $log->debugf("UDP client for %s", $uri->as_string);
396              
397             my $src = $self->source(
398 1   33     18 label => $args{label} // $uri->as_string,
399             );
400             my $sink = $self->sink(
401 1   33     78 label => $args{label} // $uri->as_string,
402             );
403             $self->add_child(
404             my $client = IO::Async::Socket->new(
405             on_recv => sub {
406 0     0   0 my ($sock, $payload, $addr) = @_;
407             try {
408             $log->tracef("Receiving [%s] from %s", $payload, $addr);
409             $src->emit(
410             Ryu::Async::Packet->new(
411             from => $addr,
412             payload => $payload
413             )
414             );
415 0         0 } catch {
416             $log->errorf("Exception when sending: %s", $@);
417             }
418             },
419             )
420 1         47 );
421 1   50     230 my $host = $uri->host || '0.0.0.0';
422 1 50       55 $host = '0.0.0.0' if $host eq '*';
423 1   50     5 my $port = $uri->port // 0;
424 1         44 my $f = $client->connect(
425             host => $host,
426             service => $port,
427             socktype => 'dgram',
428             );
429             $f->on_done(sub {
430 1     1   23 $log->debugf("UDP client connected");
431             })->on_fail(sub {
432 0     0   0 $log->errorf("UDP client failed to connect - %s", join ',', @_);
433 1         4421 });
434             $sink->source->each(sub {
435 1     1   119 my $payload = $_;
436             $f->on_done(sub {
437             try {
438             $log->tracef("Sending [%s] to %s", $payload, $uri);
439             $client->send(
440             $payload,
441             undef,
442             pack_sockaddr_in(
443             $port,
444             '' . inet_pton(AF_INET, $host)
445             )
446             );
447 1         18 } catch {
448             $log->errorf("Exception when sending: %s", $@);
449             }
450 1         9 })->retain;
451 1         27 });
452 1         53 Ryu::Async::Client->new(
453             outgoing => $sink,
454             incoming => $src,
455             );
456             }
457              
458             =head2 udp_server
459              
460             =cut
461              
462             sub udp_server {
463 1     1 1 6878 my ($self, %args) = @_;
464              
465 1         4 my $uri = delete $args{uri};
466 1   33     6 $uri //= do {
467 1   50     4 $args{host} //= '0.0.0.0';
468 1   33     11 'udp://' . join ':', $args{host}, $args{port} // ();
469             };
470 1 50       11 $uri = URI->new($uri) unless ref $uri;
471 1         297 $log->debugf("UDP server %s", $uri->as_string);
472              
473 1         76 my $src = $self->source;
474 1         98 my $sink = $self->sink;
475              
476             $self->add_child(
477             my $server = IO::Async::Socket->new(
478             on_recv => sub {
479 1     1   1222 my ($sock, $msg, $addr) = @_;
480 1         6 $log->debugf("UDP server [%s] had %s from %s", $uri->as_string, $msg, $addr);
481 1         18 $src->emit(
482             Ryu::Async::Packet->new(
483             payload => $msg,
484             from => $addr
485             )
486             )
487             },
488             on_recv_error => sub {
489 0     0   0 my ($sock, $err) = @_;
490 0         0 $src->fail($err);
491             }
492             )
493 1         65 );
494 1     0   278 $sink->source->each(sub { $server->send($_->payload, 0, $_->addr) });
  0         0  
495             my $port_f = $server->bind(
496             service => $uri->port // 0,
497             socktype => 'dgram'
498             )->then(sub {
499 1     1   21340 Future->done($server->write_handle->sockport)
500 1   50     69 });
501 1         114 Ryu::Async::Server->new(
502             port => $port_f,
503             incoming => $src,
504             outgoing => undef,
505             );
506             }
507              
508             =head2 tcp_server
509              
510             Creates a listening TCP socket, and provides a L
511             instance which will emit a new event every time a client connects.
512              
513             =cut
514              
515             sub tcp_server {
516 1     1 1 6176 my ($self, %args) = @_;
517              
518 1         3 my $uri = delete $args{uri};
519 1   33     5 $uri //= do {
520 1   50     4 $args{host} //= '0.0.0.0';
521 1   33     11 'tcp://' . join ':', $args{host}, $args{port} // ();
522             };
523 1 50       9 $uri = URI->new($uri) unless ref $uri;
524 1         285 $log->debugf("TCP server %s", $uri->as_string);
525              
526 1         68 my $src = $self->source;
527 1         429 my $sink = $self->sink;
528              
529             $self->add_child(
530             my $server = IO::Async::Listener->new(
531             on_stream => sub {
532 0     0   0 my ($sock, $msg, $addr) = @_;
533 0         0 $log->debugf("TCP server [%s] had %s from %s", $uri->as_string, $msg, $addr);
534 0         0 $src->emit(
535             Ryu::Async::Packet->new(
536             payload => $msg,
537             from => $addr
538             )
539             )
540             },
541             )
542 1         58 );
543 1     0   246 $sink->source->each(sub { $server->send($_->payload, 0, $_->addr) });
  0         0  
544             my $port_f = $server->listen(
545             service => $uri->port // 0,
546             socktype => 'stream'
547             )->then(sub {
548 1     1   21299 my ($listener) = @_;
549 1         5 Future->done($listener->read_handle->sockport)
550 1   50     49 });
551 1         108 Ryu::Async::Server->new(
552             port => $port_f,
553             incoming => $src,
554             outgoing => undef,
555             );
556             }
557              
558             sub timeout {
559 0     0 0 0 my ($self, $input, $output, $delay) = @_;
560             $self->add_child(
561             my $timer = IO::Async::Timer::Countdown->new(
562             interval => $delay,
563 0     0   0 on_expire => sub { $output->fail('timeout') },
564             )
565 0         0 );
566 0     0   0 $input->each_while_source(sub { $timer->reset }, $output);
  0         0  
567 0         0 return $self;
568             }
569              
570             =head2 sink
571              
572             Returns a new L.
573              
574             The label will default to the calling package/class and method,
575             with some truncation rules:
576              
577             =over 4
578              
579             =item * A C prefix will be replaced by C.
580              
581             =item * A C prefix will be replaced by C.
582              
583             =item * A C prefix will be replaced by C.
584              
585             =item * A C prefix will be replaced by C.
586              
587             =item * A C prefix will be replaced by C.
588              
589             =item * A C prefix will be replaced by C.
590              
591             =back
592              
593             This list of truncations is subject to change, so please don't
594             rely on any of these in string matches or similar - better to set
595             your own label if you need consistency.
596              
597             =cut
598              
599             sub sink {
600 3     3 1 18 my ($self, %args) = @_;
601 3   66     24 my $label = delete($args{label}) // do {
602 2         17 my $label = (caller 1)[3];
603 2         19 for($label) {
604 2         11 s/^Database::Async::/Da/g;
605 2         6 s/^Net::Async::/Na/g;
606 2         5 s/^IO::Async::/Ia/g;
607 2         7 s/^Web::Async::/Wa/g;
608 2         4 s/^Job::Async::/Ja/g;
609 2         8 s/^Tickit::Async::/Ta/g;
610 2         98 s/^Tickit::Widget::/TW/g;
611 2         37 s/::([^:]*)$/->$1/;
612             }
613             $label
614 2         12 };
615 3         14 Ryu::Sink->new(
616             new_future => $self->loop->curry::weak::new_future,
617             label => $label,
618             %args,
619             )
620             }
621              
622             1;
623              
624             __END__