File Coverage

blib/lib/Ryu/Async.pm
Criterion Covered Total %
statement 210 265 79.2
branch 20 50 40.0
condition 24 57 42.1
subroutine 47 64 73.4
pod 13 14 92.8
total 314 450 69.7


line stmt bran cond sub pod time code
1             package Ryu::Async;
2             # ABSTRACT: IO::Async support for Ryu stream management
3 6     6   572816 use strict;
  6         68  
  6         227  
4 6     6   41 use warnings;
  6         11  
  6         239  
5              
6             our $VERSION = '0.019';
7              
8 6     6   3767 use utf8;
  6         87  
  6         33  
9              
10             =encoding UTF8
11              
12             =head1 NAME
13              
14             Ryu::Async - use L with L
15              
16             =head1 SYNOPSIS
17              
18             #!/usr/bin/env perl
19             use strict;
20             use warnings;
21             use IO::Async::Loop;
22             use Ryu::Async;
23             # This will generate a lot of output, but is useful
24             # for demonstrating lifecycles. Drop this to 'info' or
25             # 'debug' to make it more realistic.
26             use Log::Any::Adapter qw(Stdout), log_level => 'trace';
27             #
28             my $loop = IO::Async::Loop->new;
29             $loop->add(
30             my $ryu = Ryu::Async->new
31             );
32             {
33             my $timer = $ryu->timer(
34             interval => 0.10,
35             )->take(10)
36             ->each(sub { print "tick\n" });
37             warn $timer->describe;
38             $timer->get;
39             }
40              
41             =head1 DESCRIPTION
42              
43             This is an L subclass for interacting with L.
44              
45             =cut
46              
47 6     6   3197 use parent qw(IO::Async::Notifier);
  6         1812  
  6         31  
48              
49 6     6   101789 use IO::Async::Handle;
  6         77857  
  6         239  
50 6     6   3189 use IO::Async::Listener;
  6         22340  
  6         273  
51 6     6   3436 use IO::Async::Process;
  6         21304  
  6         183  
52 6     6   3263 use IO::Async::Resolver;
  6         242665  
  6         255  
53 6     6   3325 use IO::Async::Signal;
  6         4048  
  6         196  
54 6     6   3079 use IO::Async::Socket;
  6         6809  
  6         189  
55 6     6   44 use IO::Async::Stream;
  6         12  
  6         125  
56 6     6   2911 use IO::Async::Timer::Absolute;
  6         3333  
  6         233  
57 6     6   42 use IO::Async::Timer::Countdown;
  6         22  
  6         156  
58 6     6   3024 use IO::Async::Timer::Periodic;
  6         5846  
  6         203  
59              
60 6     6   2951 use Ryu::Async::Client;
  6         16  
  6         189  
61 6     6   2597 use Ryu::Async::Packet;
  6         16  
  6         285  
62 6     6   2808 use Ryu::Async::Server;
  6         17  
  6         182  
63              
64 6     6   2797 use Ryu::Sink;
  6         10655  
  6         167  
65 6     6   3926 use Ryu::Source;
  6         294849  
  6         525  
66              
67 6     6   2939 use URI::udp;
  6         75856  
  6         225  
68 6     6   2819 use URI::tcp;
  6         1242  
  6         316  
69 6     6   48 use Socket qw(pack_sockaddr_in inet_pton AF_INET);
  6         13  
  6         404  
70              
71 6     6   39 use curry::weak;
  6         12  
  6         142  
72              
73 6     6   33 use Syntax::Keyword::Try;
  6         21  
  6         56  
74              
75 6     6   3501 use Ryu '2.000';
  6         2849  
  6         839  
76 6     6   44 use Ryu::Source;
  6         13  
  6         127  
77              
78 6     6   2922 use Ryu::Async::Process;
  6         19  
  6         217  
79 6     6   44 use Scalar::Util qw(blessed weaken);
  6         11  
  6         408  
80              
81 6     6   42 use Log::Any qw($log);
  6         21  
  6         56  
82              
83             =head1 Interaction with L
84              
85             On load, this module will provide a L which assigns
86             L instances from L.
87              
88             You can override this behaviour by doing this instead:
89              
90             BEGIN {
91             require Ryu::Source;
92             local $Ryu::Source::FUTURE_FACTORY = sub { };
93             require Ryu::Async;
94             }
95              
96             to ensure the original factory function is preserved.
97              
98             =cut
99              
100             $Ryu::Source::FUTURE_FACTORY = sub {
101             IO::Async::Loop->new->new_future(label => $_[1]);
102             };
103              
104             =head1 METHODS
105              
106             =cut
107              
108             =head2 from
109              
110             Creates a new L from a thing.
111              
112             The exact details of this are likely to change in future, but a few things that are expected to work:
113              
114             $ryu->from($io_async_stream_instance)
115             ->by_line
116             ->each(sub { print "Line: $_\n" });
117             $ryu->from([1..1000])
118             ->sum
119             ->each(sub { print "Total was $_\n" });
120              
121             =cut
122              
123             sub from {
124 2     2 1 14336 my $self = shift;
125              
126 2 100       15 if(my $class = blessed $_[0]) {
127 1 50       10 if($class->isa('IO::Async::Stream')) {
128 1         5 return $self->from_stream($_[0]);
129             } else {
130 0         0 die "Unable to determine appropriate source for $class";
131             }
132             }
133              
134 1         6 my $src = $self->source(label => 'from');
135 1 50       94 if(my $ref = ref $_[0]) {
136 1 50       5 if($ref eq 'ARRAY') {
137 1         2 my @pending = @{$_[0]};
  1         5  
138 1         5 weaken(my $weak_src = $src);
139 1         1 my $code;
140             $code = sub {
141 3     3   13452 my $src = $weak_src;
142 3 50 33     32 $src->emit(shift @pending) if @pending and $src;
143 3 100       1427 if(@pending) {
144 2         9 $self->loop->later($code);
145             } else {
146 1         6 $src->finish;
147 1         212 weaken $_ for $self, $code;
148             }
149 1         5 };
150 1         5 $self->loop->later($code);
151 1         41 return $src;
152             } else {
153 0         0 die "Unknown type $ref"
154             }
155             }
156              
157 0         0 my %args = @_;
158 0 0       0 if(my $dir = $args{directory}) {
159 0 0       0 opendir my $handle, $dir or die $!;
160 0         0 my $code;
161             $code = sub {
162 0 0   0   0 if(defined(my $item = readdir $handle)) {
163 0 0 0     0 $src->emit($item) unless $item eq '.' or $item eq '..';
164 0         0 $self->loop->later($code);
165             } else {
166 0         0 weaken($code);
167 0 0       0 closedir $handle or die $!;
168 0         0 $src->finish
169             }
170 0         0 };
171 0         0 $code->();
172 0         0 return $self;
173             }
174 0         0 die "unknown stuff";
175             }
176              
177             =head2 from_stream
178              
179             Create a new L from an L instance.
180              
181             Note that a stream which is not already attached to an L
182             will be added as a child of this instance.
183              
184             =cut
185              
186             sub from_stream {
187 1     1 1 3 my ($self, $stream, %args) = @_;
188              
189 1   50     9 my $src = $self->source(label => $args{label} // 'IaStream');
190              
191             # Our ->flow_control monitoring gives us a boolean
192             # value every time the state changes:
193             # 1 - we are active
194             # 0 - we are paused
195             # through sheer coïncidence, this is also what the
196             # IO::Async::Stream `->want_(read|write)ready` methods
197             # expect.
198 1         95 $src->flow_control
199             ->each($stream->curry::weak::want_readready);
200              
201             $stream->configure(
202             on_read => sub {
203 3     3   8468 my ($stream, $buffref, $eof) = @_;
204 3 100       21 $log->tracef("Have %d bytes of data, EOF = %s", length($$buffref), $eof ? 'yes' : 'no');
205 3         16 my $data = substr $$buffref, 0, length $$buffref, '';
206 3         13 $src->emit($data);
207 3 100 100     143 $src->finish if $eof && !$src->completed->is_ready;
208             }
209 1         56 );
210 1 50       77 unless($stream->parent) {
211 1         11 $self->add_child($stream);
212             $src->completed->on_ready(sub {
213 1 50   1   179 $self->remove_child($stream) if $stream->parent;
214 1         296 });
215             }
216 1         1359 return $src;
217             }
218              
219             =head2 to_stream
220              
221             Provides a L that will send data to an L instance.
222              
223             Requires the L and will return a new L instance.
224              
225             =cut
226              
227             sub to_stream {
228 0     0 1 0 my ($self, $stream, %args) = @_;
229              
230 0   0     0 my $sink = $self->sink(label => $args{label} // 'IaStream');
231              
232 0         0 $stream->configure(
233             on_writeable_start => $sink->curry::weak::resume,
234             on_writeable_stop => $sink->curry::weak::pause,
235             );
236             $sink->source
237             ->each(sub {
238 0     0   0 $stream->write($_)
239 0         0 });
240 0 0       0 unless($stream->parent) {
241 0         0 $self->add_child($stream);
242             $sink->completed->on_ready($self->$curry::weak(sub {
243 0     0   0 my ($self) = @_;
244 0 0       0 $self->remove_child($stream) if $stream->parent;
245 0         0 }));
246             }
247 0         0 return $sink;
248             }
249              
250             =head2 stdin
251              
252             Create a new L that wraps STDIN.
253              
254             As with other L wrappers, this will emit data as soon as it's available,
255             as raw bytes.
256              
257             Use L and L to split into lines and/or decode from UTF-8.
258              
259             =cut
260              
261             sub stdin {
262 0     0 1 0 my ($self) = @_;
263 0         0 return $self->from_stream(
264             IO::Async::Stream->new_for_stdin,
265             label => 'STDIN',
266             )
267             }
268              
269             =head2 stdout
270              
271             Returns a new L that wraps STDOUT.
272              
273             =cut
274              
275             sub stdout {
276 0     0 1 0 my ($self) = @_;
277 0         0 return $self->to_stream(
278             IO::Async::Stream->new_for_stdout,
279             label => 'STDOUT',
280             )
281             }
282              
283             =head2 stderr
284              
285             Returns a new L that wraps STDERR.
286              
287             =cut
288              
289             sub stderr {
290 0     0 1 0 my ($self) = @_;
291 0         0 return $self->to_stream(
292             IO::Async::Stream->new_for_stderr,
293             label => 'STDERR',
294             )
295             }
296              
297             =head2 timer
298              
299             Provides a L which emits an empty string at selected intervals.
300              
301             Takes the following named parameters:
302              
303             =over 4
304              
305             =item * interval - how often to trigger the timer, in seconds (fractional values allowed)
306              
307             =item * reschedule - type of rescheduling to use, can be C, C or C as documented
308             in L
309              
310             =back
311              
312             Example:
313              
314             $ryu->timer(interval => 1, reschedule => 'hard')
315             ->combine_latest(...)
316              
317             =cut
318              
319             sub timer {
320 1     1 1 5914 my ($self, %args) = @_;
321 1         4 my $src = $self->source(label => 'timer');
322             $self->add_child(
323             my $timer = IO::Async::Timer::Periodic->new(
324             reschedule => 'hard',
325             %args,
326 1     10   89 on_tick => $src->$curry::weak(sub { shift->emit('') }),
  10         1995420  
327             )
328             );
329 1         230 Scalar::Util::weaken($timer);
330             $src->on_ready($self->$curry::weak(sub {
331 1     1   1111 my ($self) = @_;
332 1 50       9 return unless $timer;
333 1 50       15 $timer->stop if $timer->is_running;
334 1         68 $self->remove_child($timer)
335 1         6 }));
336 1         1215 $timer->start;
337 1         5881 $src
338             }
339              
340             =head2 run
341              
342             Creates an L.
343              
344             =cut
345              
346             sub run {
347 0     0 1 0 my ($self, $code, %args) = @_;
348 0 0       0 if(ref($code) eq 'ARRAY') {
    0          
349             # Fork and exec
350 0         0 $args{command} = $code;
351             } elsif(ref($code) eq 'CODE') {
352 0         0 $args{code} = $code;
353             }
354             $self->add_child(
355 0         0 my $process = Ryu::Async::Process->new(
356             process => IO::Async::Process->new(%args)
357             )
358             );
359 0         0 $process;
360             }
361              
362             =head2 source
363              
364             Returns a new L instance.
365              
366             =cut
367              
368             sub source {
369 6     6 1 32 my ($self, %args) = @_;
370 6   66     30 my $label = delete($args{label}) // do {
371 2         17 my $label = (caller 1)[0];
372 2         7 for($label) {
373 2         6 s/^Net::Async::/Na/g;
374 2         4 s/^IO::Async::/Ia/g;
375 2         4 s/^Web::Async::/Wa/g;
376 2         4 s/^Tickit::Async::/Ta/g;
377 2         3 s/^Tickit::Widget::/TW/g;
378 2         6 s/::([^:]*)$/->$1/;
379             }
380             $label
381 2         8 };
382 6         30 Ryu::Source->new(
383             new_future => $self->loop->curry::weak::new_future,
384             apply_timeout => $self->curry::timeout,
385             label => $label,
386             %args,
387             )
388             }
389              
390             =head2 udp_client
391              
392             Creates a new UDP client.
393              
394             This provides a sink for L packets, and a source for L responses.
395              
396             =over 4
397              
398             =item * C - an optional URI of the form C<< udp://host:port >>
399              
400             =item * C - which host to listen on, defaults to C<0.0.0.0>
401              
402             =item * C - the port to listen on
403              
404             =back
405              
406             Returns a L instance.
407              
408             =cut
409              
410             sub udp_client {
411 1     1 1 7346 my ($self, %args) = @_;
412              
413 1         4 my $uri = delete $args{uri};
414 1   50     16 $uri //= 'udp://' . join ':', $args{host} // '*', $args{port} // ();
      33        
      33        
415 1 50       10 $uri = URI->new($uri) unless ref $uri;
416 1         110 $log->debugf("UDP client for %s", $uri->as_string);
417              
418             my $src = $self->source(
419 1   33     15 label => $args{label} // $uri->as_string,
420             );
421             my $sink = $self->sink(
422 1   33     72 label => $args{label} // $uri->as_string,
423             );
424             $self->add_child(
425             my $client = IO::Async::Socket->new(
426             on_recv => sub {
427 0     0   0 my ($sock, $payload, $addr) = @_;
428             try {
429             $log->tracef("Receiving [%s] from %s", $payload, $addr);
430             $src->emit(
431             Ryu::Async::Packet->new(
432             from => $addr,
433             payload => $payload
434             )
435             );
436 0         0 } catch {
437             $log->errorf("Exception when sending: %s", $@);
438             }
439             },
440             )
441 1         49 );
442 1   50     217 my $host = $uri->host || '0.0.0.0';
443 1 50       56 $host = '0.0.0.0' if $host eq '*';
444 1   50     4 my $port = $uri->port // 0;
445 1         43 my $f = $client->connect(
446             host => $host,
447             service => $port,
448             socktype => 'dgram',
449             );
450             $f->on_done(sub {
451 1     1   37 $log->debugf("UDP client connected");
452             })->on_fail(sub {
453 0     0   0 $log->errorf("UDP client failed to connect - %s", join ',', @_);
454 1         4375 });
455             $sink->source->each(sub {
456 1     1   109 my $payload = $_;
457             $f->on_done(sub {
458             try {
459             $log->tracef("Sending [%s] to %s", $payload, $uri);
460             $client->send(
461             $payload,
462             undef,
463             pack_sockaddr_in(
464             $port,
465             '' . inet_pton(AF_INET, $host)
466             )
467             );
468 1         20 } catch {
469             $log->errorf("Exception when sending: %s", $@);
470             }
471 1         8 })->retain;
472 1         28 });
473 1         45 Ryu::Async::Client->new(
474             outgoing => $sink,
475             incoming => $src,
476             );
477             }
478              
479             =head2 udp_server
480              
481             =cut
482              
483             sub udp_server {
484 1     1 1 6731 my ($self, %args) = @_;
485              
486 1         3 my $uri = delete $args{uri};
487 1   33     5 $uri //= do {
488 1   50     5 $args{host} //= '0.0.0.0';
489 1   33     12 'udp://' . join ':', $args{host}, $args{port} // ();
490             };
491 1 50       10 $uri = URI->new($uri) unless ref $uri;
492 1         312 $log->debugf("UDP server %s", $uri->as_string);
493              
494 1         71 my $src = $self->source;
495 1         98 my $sink = $self->sink;
496              
497             $self->add_child(
498             my $server = IO::Async::Socket->new(
499             on_recv => sub {
500 1     1   1149 my ($sock, $msg, $addr) = @_;
501 1         6 $log->debugf("UDP server [%s] had %s from %s", $uri->as_string, $msg, $addr);
502 1         22 $src->emit(
503             Ryu::Async::Packet->new(
504             payload => $msg,
505             from => $addr
506             )
507             )
508             },
509             on_recv_error => sub {
510 0     0   0 my ($sock, $err) = @_;
511 0         0 $src->fail($err);
512             }
513             )
514 1         65 );
515 1     0   255 $sink->source->each(sub { $server->send($_->payload, 0, $_->addr) });
  0         0  
516             my $port_f = $server->bind(
517             service => $uri->port // 0,
518             socktype => 'dgram'
519             )->then(sub {
520 1     1   21364 Future->done($server->write_handle->sockport)
521 1   50     80 });
522 1         110 Ryu::Async::Server->new(
523             port => $port_f,
524             incoming => $src,
525             outgoing => undef,
526             );
527             }
528              
529             =head2 tcp_server
530              
531             Creates a listening TCP socket, and provides a L
532             instance which will emit a new event every time a client connects.
533              
534             =cut
535              
536             sub tcp_server {
537 1     1 1 6480 my ($self, %args) = @_;
538              
539 1         3 my $uri = delete $args{uri};
540 1   33     5 $uri //= do {
541 1   50     4 $args{host} //= '0.0.0.0';
542 1   33     11 'tcp://' . join ':', $args{host}, $args{port} // ();
543             };
544 1 50       10 $uri = URI->new($uri) unless ref $uri;
545 1         326 $log->debugf("TCP server %s", $uri->as_string);
546              
547 1         80 my $src = $self->source;
548 1         119 my $sink = $self->sink;
549              
550             $self->add_child(
551             my $server = IO::Async::Listener->new(
552             on_stream => sub {
553 0     0   0 my ($sock, $msg, $addr) = @_;
554 0         0 $log->debugf("TCP server [%s] had %s from %s", $uri->as_string, $msg, $addr);
555 0         0 $src->emit(
556             Ryu::Async::Packet->new(
557             payload => $msg,
558             from => $addr
559             )
560             )
561             },
562             )
563 1         60 );
564 1     0   227 $sink->source->each(sub { $server->send($_->payload, 0, $_->addr) });
  0         0  
565             my $port_f = $server->listen(
566             service => $uri->port // 0,
567             socktype => 'stream'
568             )->then(sub {
569 1     1   22347 my ($listener) = @_;
570 1         5 Future->done($listener->read_handle->sockport)
571 1   50     62 });
572 1         119 Ryu::Async::Server->new(
573             port => $port_f,
574             incoming => $src,
575             outgoing => undef,
576             );
577             }
578              
579             sub timeout {
580 0     0 0 0 my ($self, $input, $output, $delay) = @_;
581             $self->add_child(
582             my $timer = IO::Async::Timer::Countdown->new(
583             interval => $delay,
584 0     0   0 on_expire => sub { $output->fail('timeout') },
585             )
586 0         0 );
587 0     0   0 $input->each_while_source(sub { $timer->reset }, $output);
  0         0  
588 0         0 return $self;
589             }
590              
591             =head2 sink
592              
593             Returns a new L.
594              
595             The label will default to the calling package/class and method,
596             with some truncation rules:
597              
598             =over 4
599              
600             =item * A C prefix will be replaced by C.
601              
602             =item * A C prefix will be replaced by C.
603              
604             =item * A C prefix will be replaced by C.
605              
606             =item * A C prefix will be replaced by C.
607              
608             =item * A C prefix will be replaced by C.
609              
610             =item * A C prefix will be replaced by C.
611              
612             =back
613              
614             This list of truncations is subject to change, so please don't
615             rely on any of these in string matches or similar - better to set
616             your own label if you need consistency.
617              
618             =cut
619              
620             sub sink {
621 4     4 1 7509 my ($self, %args) = @_;
622 4   66     21 my $label = delete($args{label}) // do {
623 2         17 my $label = (caller 1)[3];
624 2         8 for($label) {
625 2         13 s/^Database::Async::/Da/g;
626 2         16 s/^Net::Async::/Na/g;
627 2         8 s/^IO::Async::/Ia/g;
628 2         6 s/^Web::Async::/Wa/g;
629 2         6 s/^Job::Async::/Ja/g;
630 2         103 s/^Tickit::Async::/Ta/g;
631 2         14 s/^Tickit::Widget::/TW/g;
632 2         33 s/::([^:]*)$/->$1/;
633             }
634             $label
635 2         11 };
636 4         17 Ryu::Sink->new(
637             new_future => $self->loop->curry::weak::new_future,
638             label => $label,
639             %args,
640             )
641             }
642              
643             1;
644              
645             __END__