File Coverage

blib/lib/Data/Stream/Bulk/AnyEvent.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1 4     4   130190 use strict;
  4         10  
  4         179  
2 4     4   23 use warnings;
  4         9  
  4         257  
3             package Data::Stream::Bulk::AnyEvent;
4              
5             # ABSTRACT: AnyEvent-friendly Data::Stream::Bulk::Callback
6             our $VERSION = 'v0.0.2'; # VERSION:
7              
8 4     4   2171 use Moose;
  0            
  0            
9             use AnyEvent;
10             use Carp;
11              
12             with qw(Data::Stream::Bulk);
13              
14             has _cv => (
15             is => 'rw',
16             isa => 'Maybe[AnyEvent::CondVar]',
17             default => undef,
18             );
19              
20             has _done => (
21             is => 'rw',
22             isa => 'Bool',
23             default => 0
24             );
25              
26             has cb => (
27             is => 'rw',
28             isa => 'Maybe[CodeRef]',
29             trigger => \&_on_cb_set,
30             default => undef,
31             );
32              
33             has callback => (
34             is => 'ro',
35             isa => 'CodeRef',
36             required => 1
37             );
38              
39             sub is_done
40             {
41             my $self = shift;
42             return $self->_done;
43             }
44              
45             sub next
46             {
47             my $self = shift;
48             return if $self->is_done;
49             $self->cb(undef) if $self->cb;
50             $self->_cv($self->callback->()) if(! $self->_cv);
51             my $ret = $self->_cv->recv;
52             $self->_cv(undef);
53             $self->_done(1) if ! defined $ret;
54             return $ret;
55             }
56              
57             sub _on_cb_set
58             {
59             my ($self, $new, $old) = @_;
60             return if !defined($new) && !defined($old);
61             if(defined($new)) {
62             my $sub; $sub = sub {
63             my $ret = shift;
64             $self->_cv(undef);
65             $self->_done(1) if(! defined $ret->recv);
66             if($new->($ret) && defined $ret->recv) {
67             $self->_cv($self->callback->());
68             $self->_cv->cb($sub);
69             }
70             };
71             $self->_cv($self->callback->()) if(! $self->_cv);
72             $self->_cv->cb($sub);
73             } else {
74             $self->_cv->croak(q{Callback `cb' was set as undef during active iteration}) if $self->_cv;
75             }
76             }
77              
78             __PACKAGE__->meta->make_immutable;
79              
80             1;
81              
82             __END__
83              
84             =pod
85              
86             =head1 NAME
87              
88             Data::Stream::Bulk::AnyEvent - AnyEvent-friendly Data::Stream::Bulk::Callback
89              
90             =head1 VERSION
91              
92             version v0.0.2
93              
94             =head1 SYNOPSIS
95              
96             # Default to blocking-mode
97             my $stream = Data::Stream::Bulk::AnyEvent->new(
98             # Producer callback has no arguments, and MUST return condition variable.
99             # Items are sent via the condition variable as array ref.
100             # If there are no more data, send undef.
101             producer => sub {
102             my $cv = AE::cv;
103             my $w; $w = AE::timer 1, 0, sub { # Useless, just an example
104             undef $w;
105             my $entry = shift @data; # defined like my @data = ([1,2], [2,3], undef);
106             $cv->send($entry);
107             };
108             return $cv;
109             }
110             );
111             # In this mode, you can use this class like other Data::Stream::Bulk subclasses, at client side
112             # NOTE that calling C<next> includes blocking wait AE::cv->recv() internally.
113             $stream->next if ! $stream->is_done;
114              
115             # Callback-mode
116             # This is natrual mode for asynchronous codes.
117             # Callback is called for each producer call.
118             # If you want to get more items, callback SHOULD return true. If not, return false.
119             my $stream = Data::Stream::Bulk::AnyEvent->new(
120             callback => sub { ... }, ...
121             )->cb(sub { my $ref = shift->recv; ... return defined $ref; });
122              
123             =head1 DESCRIPTION
124              
125             This class is like L<Data::Stream::Bulk::Callback>, but there are some differences.
126              
127             =over 4
128              
129             =item *
130              
131             Consumer side can use asynchronous callback style.
132              
133             =item *
134              
135             Producer callback does not return actual items but returns a condition variable. Items are sent via the condition variable.
136              
137             =back
138              
139             Primary purpose of this class is to make L<Net::Amazon::S3>, using L<Data::Stream::Bulk::Callback>, AnyEvent-friendly by using L<Module::AnyEvent::Helper::Filter>.
140              
141             =head1 ATTRIBUTES
142              
143             =head2 C<callback =E<gt> sub { my $cv = AE::CV; ... return $cv; }>
144              
145             Same as L<Data::Stream::Bulk::Callback>.
146              
147             Specify callback code reference called when data is requested.
148             This attribute is C<required>. Therefore, you need to specify in constructor argument.
149              
150             There is no argument of the callback. Return value MUST be a condition variable that items are sent as an array reference.
151             If there is no more items, send C<undef>.
152              
153             =head2 C<cb =E<gt> sub { my ($cv) = @_; }>
154              
155             Specify callback code reference called for each producer call.
156             A parameter of the callback is an AnyEvent condition variable.
157             If the callback returns true, iteration is continued.
158             If false, iteration is suspended.
159             If you need to resume iteration, you should call C<next> or set C<cb> again even though the same C<cb> is used.
160              
161             If you do not need callback, call C<next> or set C<cb> as C<undef>.
162             Setting C<cb> as C<undef> is succeeded only when iteration is not active, which means suspended or not started.
163             To set C<callback> as not-C<undef> means this object goes into callback mode,
164             while to set C<callback> as C<undef> means this object goes into blocking mode.
165              
166             You can change this value during lifetime of the object, except for the limitation described above.
167              
168             =head1 METHODS
169              
170             =head2 C<next()>
171              
172             Same as L<Data::Stream::Callback>.
173             If called in callback mode, the object goes into blocking mode and callback is canceled.
174              
175             =head2 C<is_done()>
176              
177             Same as L<Data::Stream::Callback>.
178              
179             =head1 AUTHOR
180              
181             Yasutaka ATARASHI <yakex@cpan.org>
182              
183             =head1 COPYRIGHT AND LICENSE
184              
185             This software is copyright (c) 2012 by Yasutaka ATARASHI.
186              
187             This is free software; you can redistribute it and/or modify it under
188             the same terms as the Perl 5 programming language system itself.
189              
190             =cut