File Coverage

blib/lib/Sys/Async/Virt/Stream.pm
Criterion Covered Total %
statement 29 140 20.7
branch 0 40 0.0
condition 0 3 0.0
subroutine 10 24 41.6
pod 9 11 81.8
total 48 218 22.0


line stmt bran cond sub pod time code
1             ####################################################################
2             #
3             # This file was generated using XDR::Parse version v1.0.1
4             # and LibVirt version v12.1.0
5             #
6             # Don't edit this file, use the source template instead
7             #
8             # ANY CHANGES HERE WILL BE LOST !
9             #
10             ####################################################################
11              
12              
13 1     1   15 use v5.26;
  1         6  
14 1     1   5 use warnings;
  1         1  
  1         45  
15 1     1   4 use experimental 'signatures';
  1         1  
  1         5  
16 1     1   123 use Future::AsyncAwait;
  1         2  
  1         5  
17 1     1   42 use Object::Pad 0.821;
  1         6  
  1         34  
18              
19             class Sys::Async::Virt::Stream v0.6.1;
20              
21 1     1   209 use Carp qw(croak);
  1         3  
  1         73  
22 1     1   8 use Future;
  1         2  
  1         37  
23 1     1   6 use Future::Queue;
  1         2  
  1         57  
24 1     1   8 use Log::Any qw($log);
  1         2  
  1         9  
25              
26 1     1   263 use Protocol::Sys::Virt::Remote::XDR v12.1.0;
  1         16  
  1         4084  
27             my $remote = 'Protocol::Sys::Virt::Remote::XDR';
28              
29 0     0 0   field $_rpc_id :param :reader;
  0            
30 0     0 1   field $_proc :param :reader;
  0            
31 0     0 1   field $_client :param :reader;
  0            
32 0     0 1   field $_direction :param :reader;
  0            
33 0     0 1   field $_max_items :param :reader = 5;
34 0           field $_pending_error = undef;
35             field $_finished = Future->new;
36             field $_queue = Future::Queue->new;
37              
38 0     0 1   async method receive() {
  0            
  0            
  0            
39 0 0         if ($_direction eq 'send') {
40 0           die "Receive called on sending stream (id: $self->{id})";
41             }
42 0 0         if (my $e = $_pending_error) {
43 0           $_pending_error = undef;
44 0           die $e;
45             }
46              
47 0           return await $_queue->shift;
48             }
49              
50 0     0     async method _dispatch_receive($data, $hole, $eof, $final) {
  0            
  0            
  0            
  0            
  0            
  0            
  0            
51 0 0         if ($final) {
52 0           $_finished->done;
53 0           return;
54             }
55 0 0         if ($eof) {
56 0           $_queue->finish;
57 0           return;
58             }
59 0 0         if ($_direction eq 'send') {
60 0           die;
61             }
62              
63             # throttle receiving if the queue gets too long
64 0           await $_queue->push({ data => $data, hole => $hole });
65 0           return;
66             }
67              
68 0     0     async method _dispatch_error($error) {
  0            
  0            
  0            
  0            
69 0 0         return if $_finished->is_ready; # discard all input
70 0           $_finished->done;
71 0 0         unless ($_pending_error) {
72 0           $_pending_error = $error;
73             }
74 0           return;
75             }
76              
77 0     0 1   async method send($data, $offset = 0, $length = undef) {
  0            
  0            
  0            
  0            
  0            
  0            
78 0 0         if ($_direction eq 'receive') {
79 0           die "'send' called on receiving stream (id: $self->{id}";
80             }
81 0 0         if (my $e = $_pending_error) {
82 0           $_pending_error = undef;
83 0           die $e;
84             }
85 0 0         return if $_finished->is_ready; # discard all transfers
86              
87 0 0 0       my $chunk = ($offset or $length) ? substr($data, $offset, $length) : $data;
88 0           return await $_client->_send(
89             $_proc, $_rpc_id,
90             data => $chunk );
91             }
92              
93 0     0 1   async method send_hole($length, $flags = 0) {
  0            
  0            
  0            
  0            
  0            
94 0 0         if ($_direction eq 'receive') {
95 0           die "'send_hole' called on receiving stream (id: $self->{id}";
96             }
97 0 0         if (my $e = $_pending_error) {
98 0           $_pending_error = undef;
99 0           die $e;
100             }
101              
102 0 0         return if $_finished->is_ready; # discard all transfers
103 0           return await $_client->_send(
104             $_proc, $_rpc_id,
105             hole => { length => $length, flags => $flags } );
106             }
107              
108 0     0 1   async method abort() {
  0            
  0            
  0            
109 0 0         return if $_finished->is_ready;
110 0           $_client->_send_finish( $_proc, $_rpc_id, 1 );
111 0           await $_finished;
112              
113 0           $self->cleanup;
114 0 0         if (my $e = $_pending_error) {
115 0           $_pending_error = undef;
116 0           die $e;
117             }
118 0           return;
119             }
120              
121 0     0 0   method cleanup() {
  0            
  0            
122 0           $_queue = undef;
123 0 0         $_finished->done
124             unless $_finished->is_ready;
125              
126 0           return;
127             }
128              
129 0     0 1   async method finish() {
  0            
  0            
  0            
130 0 0         return if $_finished->is_ready;
131 0           $_client->_send_finish( $_proc, $_rpc_id, 0 );
132 0           await $_finished;
133              
134 0           $self->cleanup;
135 0 0         if (my $e = $_pending_error) {
136 0           $_pending_error = undef;
137 0           die $e;
138             }
139 0           return;
140             }
141              
142 0     0     method DESTROY() {
  0            
  0            
143 0 0         if (not $_finished->is_ready) {
144             # abort the stream
145 0           $_client->_send_finish( $_proc, $_rpc_id, 1 );
146             }
147             }
148              
149             1;
150              
151             __END__