line |
l |
!l&&r |
!l&&!r |
condition |
209
|
0 |
0 |
0 |
$self->{'in'} //= do {
my $sink = $self->db->new_sink;
die 'already have streaming input but no original ->{in} sink' if $self->{'streaming_input'};
$sink->source->completed->on_ready(sub {
$Database::Async::Query::log->debugf('Sink for %s completed with %s', $self, (shift())->state);
}
);
$self->{'streaming_input'} = $sink->source->buffer->pause;
$self->ready_to_stream->on_done(sub {
$Database::Async::Query::log->debugf('Ready to stream, resuming streaming input');
$self->streaming_input->resume;
}
);
$sink
} |
305
|
0 |
0 |
0 |
$self->{'row_hashrefs'} //= $self->row_data->map(sub {
my($row) = @_;
+{map({$self->{'field_names'}[$_], $row->[$_];} 0 .. $#$row)};
}
) |
310
|
0 |
0 |
0 |
$self->{'row_arrayrefs'} //= $self->row_data |
321
|
0 |
0 |
0 |
$self->{'queued'} //= $self->db->queue_query($self)->retain |
346
|
0 |
0 |
0 |
$self->{'row_data'} //= do {
my $row_data = $self->db->new_source;
$self->completed->on_ready(sub {
my($f) = @_;
return if $row_data->is_ready;
return $row_data->finish if $f->is_done;
return if $f->is_cancelled;
return $row_data->fail($f->failure);
}
);
$row_data->completed->on_ready(sub {
my $f = $self->completed;
(shift())->on_ready($f) unless $f->is_ready;
}
);
$self->start;
$row_data
} |
366
|
0 |
0 |
0 |
$self->{'completed'} //= do {
my $f = $self->db->new_future;
$self->start->on_fail(sub {
$f->fail(@_) unless $f->is_ready;
}
);
$f
} |
383
|
0 |
0 |
0 |
$self->{'ready_to_stream'} //= $self->db->new_future |
388
|
0 |
0 |
0 |
$self->{'input_stream'} //= $self->db->new_future |
424
|
0 |
0 |
0 |
$self->{'rows'} //= $self->row_data->map(sub {
my($row) = @_;
'Database::Async::Row'->new('index_by_name', {map({$row->[$_]{'description'}->name, $_;} 0 .. $#$row)}, 'data', $row);
}
) |
448
|
0 |
0 |
0 |
$self->{'single'} //= $self->row_data->first->flat_map(sub {
[@id ? @{$_;}{@id} : @$_];
}
)->as_list |