File Coverage

blib/lib/DS/Transformer/Buffer.pm
Criterion Covered Total %
statement 76 77 98.7
branch 17 18 94.4
condition 6 9 66.6
subroutine 11 11 100.0
pod 2 7 28.5
total 112 122 91.8


line stmt bran cond sub pod time code
1             #!perl
2            
3             # ########################################################################## #
4             # Title: Batch-of-rows processor
5             # Creation date: 2007-03-05
6             # Author: Michael Zedeler
7             # Henrik Andreasen
8             # Description: Process batches of rows in a data stream
9             # Data Stream class
10             # Data transformer
11             # Buffers rows
12             # File: $Source: /data/cvs/lib/DSlib/lib/DS/Transformer/Buffer.pm,v $
13             # Repository: kronhjorten
14             # State: $State: Exp $
15             # Documentation: inline
16             # Recepient: -
17             # ########################################################################## #
18            
19             package DS::Transformer::Buffer;
20            
21 1     1   1053 use base qw{ DS::Transformer::TypePassthrough };
  1         3  
  1         687  
22            
23 1     1   7 use strict;
  1         2  
  1         30  
24 1     1   5 use Carp;
  1         2  
  1         53  
25 1     1   7 use Carp::Assert;
  1         2  
  1         6  
26            
27             our ($VERSION) = $DS::VERSION;
28             our ($REVISION) = '$Revision: 1.1 $' =~ /(\d+\.\d+)/;
29            
30            
31             # new
32             #
33             # Class constructor
34             #
35             sub new {
36 1     1 1 1304 my( $class, $source, $target ) = @_;
37            
38 1         15 my $self = $class->SUPER::new( $source, $target );
39            
40 1         16 $self->{buffer} = []; # Holds copies (?) of the currently buffered rows (stream is N elements long, indexed 0 .. N-1)
41 1         4 $self->{first} = 0; # Range: [0..N] Inv: Always at first buffered element (one past "last" when buffer is empty)
42 1         4 $self->{last} = -1; # Range: [-1..N-1] Inv: Always at last element (one before "first" when buffer is empty)
43 1         2 $self->{current} = -1; # Range: [-1..N] Inv: Always at current element, initially -1, finally N (past end)
44            
45 1         5 return $self;
46             }
47            
48             # receive_row
49             #
50             # Processes a row in stream.
51             # Returns undef when no more rows are available.
52             # It is allowed to call fetch after the stream has ended, each call returning undef.
53             #
54             sub process {
55 15     15 1 18 my ($self, $row) = @_;
56 15         49 $self->push( $row );
57 15         36 return $self->shift;
58             }
59            
60             sub shift {
61 15     15 0 21 my( $self ) = @_;
62            
63 15         25 my $last = $self->{last};
64 15         22 my $current = $self->{current};
65 15         18 my $result = undef;
66            
67             # Find and return the "next" element, if any
68 15 100       37 ++$current if ($current <= $last);
69 15 100       38 $result = ${$self->{buffer}}[$current] if ($current <= $last);
  13         25  
70            
71 15         25 $self->{current} = $current;
72            
73 15         61 return $result;
74             }
75            
76             sub push {
77 15     15 0 22 my( $self, $row ) = @_;
78            
79 15         24 my $last = $self->{last};
80            
81             # Put row in buffer if not EOF or EOF not already registered in buffer
82 15 100 100     45 if ( $row or ${$self->{buffer}}[$last] ) {
  5         27  
83 11         13 ++$last;
84 11 100       51 $row = {%$row} if $row; # Make a copy if not EOF
85 11         2430 ${$self->{buffer}}[$last] = $row;
  11         33  
86             }
87            
88 15         25 $self->{last} = $last;
89            
90 15         29 return;
91             }
92            
93             # fetch
94             #
95             # Re-fetches rows that has been unfetched.
96             # It is a fatal error to try fetching beyond last row in the buffer
97             #TODO Implement some kind of end of stream indicator that will allow fetch to return undef indicating end of stream
98             sub fetch {
99 10     10 0 4383 my ($self) = @_;
100            
101 10         20 my $last = $self->{last};
102 10         20 my $current = $self->{current};
103 10         15 my $result = undef;
104            
105             # Make sure we're not beyond end of buffer
106 10 100       26 if( $current < $last ) {
107             # Find and return the "next" element
108 6         9 ++$current;
109 6         10 $result = ${$self->{buffer}}[$current];
  6         18  
110             } else {
111 4         1105 croak("Can't fetch past buffer end.");
112             }
113            
114 6         12 $self->{last} = $last;
115 6         11 $self->{current} = $current;
116            
117 6         30 return $result;
118             }
119            
120            
121             # unreceive_row
122             #
123             # Moves the "current" position one step backwards within the buffered rows.
124             # It is a fatal error to try to move before the start of the currently buffered rows.
125             #
126             sub unfetch {
127 13     13 0 4306 my ($self) = @_;
128            
129 13         25 my $first = $self->{first};
130 13         22 my $last = $self->{last};
131 13         18 my $current = $self->{current};
132            
133             # Validate the request
134 13 100       44 if ($current < $first) {
    100          
135 3         21 die "Cannot unfetch beyond buffer start (frame starts at row number $first and current record is $current)\n";
136             } elsif( $current > $last ) {
137             # If EOF reached, return to the element that contains EOF
138 1         10 assert( $current == $last + 1, '$current must never be more than one past $last' );
139 1         5 --$current;
140             }
141            
142             # Move back one step
143 10         12 --$current;
144            
145 10         18 $self->{current} = $current;
146 10         23 return 1;
147             }
148            
149            
150             # flush
151             #
152             # Clears the buffered rows up to and including the given point.
153             # If the "current" position is flush, it is moved forward such that it will
154             # return the first available element at next fetch, if any.
155             # It is a fatal error to flush non-existent rows.
156             #
157             sub flush {
158 2     2 0 3105 my ($self, $point) = @_;
159            
160 2         5 my $first = $self->{first};
161 2         5 my $last = $self->{last};
162 2         5 my $current = $self->{current};
163            
164             # Use current position to flush in no point provided
165 2   66     13 $point ||= $current;
166            
167             # Validate the request
168 2 50 33     15 if ($point < $first || $point > $last) {
169 0         0 croak("Cannot flush non-existent elements. ($point is not within valid range: $first .. $last)");
170             }
171            
172             # Delete buffer elements and adjust pointers
173 2         8 for (my $i = $first; $i <= $point; ++$i) {
174 3         4 delete ${$self->{buffer}}[$i];
  3         18  
175             }
176 2         5 $self->{first} = $point + 1;
177 2 100       8 $self->{current} = $point if ($self->{current} < $point);
178            
179 2         5 return 1;
180             }
181            
182             1;