File Coverage

blib/lib/Data/EventStream.pm
Criterion Covered Total %
statement 4 6 66.6
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 6 8 75.0


line stmt bran cond sub pod time code
1             package Data::EventStream;
2 1     1   18929 use 5.010;
  1         3  
  1         30  
3 1     1   419 use Moose;
  0            
  0            
4             our $VERSION = "0.13";
5             $VERSION = eval $VERSION;
6             use Carp;
7             use Data::EventStream::Window;
8              
9             =head1 NAME
10              
11             Data::EventStream - Perl extension for event processing
12              
13             =head1 VERSION
14              
15             This document describes Data::EventStream version 0.13
16              
17             =head1 SYNOPSIS
18              
19             use Data::EventStream;
20             $es = Data::EventStream->new;
21             $es->add_aggregator( $agg, %params );
22             while ( my $event = get_event() ) {
23             $es->add_event($event);
24             }
25              
26             =head1 DESCRIPTION
27              
28             Module provides methods to analyze stream of events.
29              
30             Please check L<Project
31             Homepage|http://trinitum.github.io/perl-Data-EventStream/> for more information
32             about using this module and examples.
33              
34             =head1 METHODS
35              
36             =head2 $class->new(%params)
37              
38             Creates a new instance. The following parameters are accepted:
39              
40             =over 4
41              
42             =item B<time>
43              
44             Initial model time, by default 0
45              
46             =item B<time_sub>
47              
48             Reference to a subroutine that returns time associated with the event passed to
49             it as the only parameter. This argument is not required if you are only going
50             to use count based sliding windows.
51              
52             =item B<filter>
53              
54             Reference to a subroutine that is invoked every time new event is being added.
55             The new event is passed as the only argument to this subroutine. If subroutine
56             returns false event is ignored.
57              
58             =back
59              
60             =cut
61              
62             has time => ( is => 'ro', default => 0 );
63              
64             has time_sub => ( is => 'ro', );
65              
66             has events => (
67             is => 'ro',
68             default => sub { [] },
69             );
70              
71             has aggregators => (
72             is => 'ro',
73             default => sub { [] },
74             );
75              
76             =head2 $self->set_filter(\&sub)
77              
78             Set a new filter
79              
80             =head2 $self->remove_filter
81              
82             Remove filter
83              
84             =cut
85              
86             has filter => (
87             is => 'ro',
88             isa => 'CodeRef',
89             writer => 'set_filter',
90             clearer => 'remove_filter',
91             );
92              
93             has time_length => ( is => 'ro', default => 0, );
94              
95             has length => ( is => 'ro', default => 0, );
96              
97             has _next_leave => ( is => 'rw', );
98              
99             =head2 $self->set_time($time)
100              
101             Set new model time. This time must not be less than the current model time.
102              
103             =cut
104              
105             sub set_time {
106             my ( $self, $time ) = @_;
107             croak "new time ($time) is less than current time ($self->{time})" if $time < $self->{time};
108             $self->{time} = $time;
109             my $gt = $self->{time_sub};
110             croak "time_sub must be defined if you using time aggregators" unless $gt;
111              
112             my $as = $self->aggregators;
113             my $next_leave = $time + $self->{time_length};
114             my @deleted;
115              
116             AGGREGATOR:
117             for my $n ( 0 .. $#$as ) {
118             my $aggregator = $as->[$n];
119             my $win = $aggregator->{_window};
120             my $obj = $aggregator->{_obj};
121             if ( $aggregator->{duration} ) {
122             next if $win->{start_time} > $time;
123             my $period = $aggregator->{duration};
124             if ( $aggregator->{batch} ) {
125             while ( $time - $win->{start_time} >= $period ) {
126             $win->{end_time} = $win->{start_time} + $period;
127             $obj->window_update($win);
128             $aggregator->{on_reset}->($obj) if $aggregator->{on_reset};
129             $win->{start_time} = $win->{end_time};
130             $win->{count} = 0;
131             $obj->reset($win);
132             if ( $aggregator->{disposable} ) {
133             push @deleted, $n;
134             next AGGREGATOR;
135             }
136             }
137             $win->{end_time} = $time;
138             my $nl = $win->{start_time} + $period;
139             $next_leave = $nl if $nl < $next_leave;
140             }
141             else {
142             $win->{end_time} = $time;
143             if ( $win->time_length >= $period ) {
144             my $st = $time - $period;
145             while ( $win->{count}
146             and ( my $ev_time = $gt->( $win->get_event(0) ) ) <= $st )
147             {
148             $win->{start_time} = $ev_time;
149             $obj->window_update($win);
150             $aggregator->{on_leave}->($obj) if $aggregator->{on_leave};
151             $obj->leave( $win->_shift_event, $win );
152             }
153             $win->{start_time} = $st;
154             }
155             if ( $win->{count} ) {
156             my $nl = $gt->( $win->get_event(0) ) + $period;
157             $next_leave = $nl if $nl < $next_leave;
158             }
159             }
160             $obj->window_update($win);
161             }
162             else {
163             $win->{end_time} = $time;
164             $win->{start_time} = $time unless $win->{count};
165             $obj->window_update($win);
166             }
167             }
168             while ( my $n = pop @deleted ) {
169             splice @$as, $n, 1;
170             }
171             $self->_next_leave($next_leave);
172              
173             my $limit = $self->{time} - $self->{time_length};
174             my $ev = $self->{events};
175             while ( @$ev > $self->{length}
176             and $gt->( $ev->[0] ) <= $limit )
177             {
178             shift @$ev;
179             }
180             }
181              
182             =head2 $self->next_leave
183              
184             Return time of the next nearest leave or reset event
185              
186             =cut
187              
188             sub next_leave {
189             shift->_next_leave;
190             }
191              
192             =head2 $self->add_aggregator($aggregator, %params)
193              
194             Add a new aggregator object. An aggregator that is passed as the first argument
195             should implement interface described in L<Data::EventStream::Aggregator>
196             documentation. Parameters that go next can be the following::
197              
198             =over 4
199              
200             =item B<count>
201              
202             Maximum number of event for which aggregator can aggregate data. When number
203             of aggregated events reaches this limit, each time before a new event enters
204             aggregator, the oldest aggregated event will leave it.
205              
206             =item B<duration>
207              
208             Maximum period of time handled by aggregator. Each time the model time is
209             updated, events with age exceeding specified duration are leaving aggregator.
210              
211             =item B<batch>
212              
213             If enabled, when I<count> or I<duration> limit is reached, aggregator is reset
214             and all events leaving it at once.
215              
216             =item B<start_time>
217              
218             Time when the first period should start. Used in conjunction with I<duration>
219             and I<batch>. By default current model time.
220              
221             =item B<disposable>
222              
223             Used in conjunction with I<batch>. Aggregator only aggregates specified period
224             once and on reset it is removed from the list of aggregators.
225              
226             =item B<on_enter>
227              
228             Callback that should be invoked after event entered aggregator. Aggregator
229             object is passed as the only argument to callback.
230              
231             =item B<on_leave>
232              
233             Callback that should be invoked before event leaves aggregator.
234             Aggregator object is passed as the only argument to callback.
235              
236             =item B<on_reset>
237              
238             Callback that should be invoked before resetting the aggregator.
239             Aggregator object is passed as the only argument to callback.
240              
241             =back
242              
243             =cut
244              
245             sub add_aggregator {
246             my ( $self, $aggregator, %params ) = @_;
247             $params{_obj} = $aggregator;
248             $params{_window} = Data::EventStream::Window->new(
249             events => $self->{events},
250             start_time => $params{start_time} // $self->{time},
251             );
252              
253             unless ( $params{count} or $params{duration} ) {
254             croak 'At least one of "count" or "duration" parameters must be provided';
255             }
256             if ( $params{count} ) {
257             if ( $params{count} > $self->{length} ) {
258             $self->{length} = $params{count};
259             }
260             }
261             if ( $params{duration} ) {
262             croak "time_sub must be defined for using time aggregators"
263             unless $self->{time_sub};
264             if ( $params{duration} > $self->{time_length} ) {
265             $self->{time_length} = $params{duration};
266             }
267             }
268             push @{ $self->{aggregators} }, \%params;
269             }
270              
271             =head2 $self->add_event($event)
272              
273             Add new event
274              
275             =cut
276              
277             sub add_event {
278             my ( $self, $event ) = @_;
279             if ( $self->{filter} ) {
280             return unless $self->{filter}->($event);
281             }
282             my $ev = $self->{events};
283             my $ev_num = @$ev;
284             my $as = $self->aggregators;
285             my $time;
286             my $gt = $self->{time_sub};
287             if ($gt) {
288             $time = $gt->($event);
289             $self->set_time($time);
290             }
291              
292             for my $aggregator (@$as) {
293             if ( $aggregator->{count} ) {
294             my $win = $aggregator->{_window};
295             if ( $win->{count} and $win->{count} == $aggregator->{count} ) {
296              
297             if ($gt) {
298             $win->{start_time} = $gt->( $win->get_event(0) );
299             $aggregator->{_obj}->window_update($win);
300             }
301             $aggregator->{on_leave}->( $aggregator->{_obj} ) if $aggregator->{on_leave};
302             my $ev_out = $win->_shift_event;
303             if ($gt) {
304             if ( $win->{count} ) {
305             $win->{start_time} = $gt->( $win->get_event(0) );
306             }
307             else {
308             $win->{start_time} = $time;
309             }
310             $aggregator->{_obj}->window_update($win);
311             }
312             $aggregator->{_obj}->leave( $ev_out, $win );
313             }
314             }
315             }
316              
317             push @$ev, $event;
318              
319             my $next_leave = $self->_next_leave;
320             my @deleted;
321              
322             AGGREGATOR:
323             for my $n ( 0 .. $#$as ) {
324             my $aggregator = $as->[$n];
325             my $win = $aggregator->{_window};
326             if ( $aggregator->{count} ) {
327             my $ev_in = $win->_push_event;
328             $aggregator->{_obj}->enter( $ev_in, $win );
329             $aggregator->{on_enter}->( $aggregator->{_obj} ) if $aggregator->{on_enter};
330             if ( $aggregator->{batch} and $win->{count} == $aggregator->{count} ) {
331             $aggregator->{on_reset}->( $aggregator->{_obj} ) if $aggregator->{on_reset};
332              
333             $win->{count} = 0;
334             if ($gt) {
335             $win->{start_time} = $gt->($ev_in);
336             }
337             $aggregator->{_obj}->reset($win);
338             if ( $aggregator->{disposable} ) {
339             push @deleted, $n;
340             next AGGREGATOR;
341             }
342             }
343             }
344             else {
345             my $ev_in = $win->_push_event;
346             $aggregator->{_obj}->enter( $ev_in, $win );
347             $aggregator->{on_enter}->( $aggregator->{_obj} ) if $aggregator->{on_enter};
348             }
349             if ( $aggregator->{duration} and $win->{count} ) {
350             my $nl = $gt->( $win->get_event(0) ) + $aggregator->{duration};
351             $next_leave = $nl if $nl < $next_leave;
352             }
353             }
354             while ( my $n = pop @deleted ) {
355             splice @$as, $n, 1;
356             }
357             $self->_next_leave($next_leave);
358              
359             my $time_limit = $self->{time} - $self->{time_length};
360             while ( @$ev > $self->{length} ) {
361             if ($gt) {
362             if ( $gt->( $ev->[0] ) <= $time_limit ) {
363             shift @$ev;
364             }
365             else {
366             last;
367             }
368             }
369             else {
370             shift @$ev;
371             }
372             }
373             }
374              
375             no Moose;
376              
377             __PACKAGE__->meta->make_immutable;
378              
379             1;
380              
381             __END__
382              
383             =head1 SEE ALSO
384              
385             Project homepage at L<http://trinitum.github.io/perl-Data-EventStream/>
386              
387             =head1 BUGS
388              
389             Please report any bugs or feature requests via GitHub bug tracker at
390             L<http://github.com/trinitum/perl-Data-EventStream/issues>.
391              
392             =head1 AUTHOR
393              
394             Pavel Shaydo C<< <zwon at cpan.org> >>
395              
396             =head1 LICENSE AND COPYRIGHT
397              
398             Copyright (C) 2014 Pavel Shaydo
399              
400             This program is free software; you can redistribute it and/or modify it
401             under the terms of either: the GNU General Public License as published
402             by the Free Software Foundation; or the Artistic License.
403              
404             See http://dev.perl.org/licenses/ for more information.
405              
406             =cut