File Coverage

blib/lib/Paws/Kinesis/MemoryCaller.pm
Criterion Covered Total %
statement 3 5 60.0
branch n/a
condition n/a
subroutine 2 2 100.0
pod n/a
total 5 7 71.4


line stmt bran cond sub pod time code
1             package Paws::Kinesis::MemoryCaller;
2 1     1   476 use 5.008001;
  1         2  
3              
4             our $VERSION = "0.01";
5              
6 1     1   289 use Moose;
  0            
  0            
7             with "Paws::Net::CallerRole";
8              
9             use Data::UUID;
10             use List::AllUtils qw(first_index);
11              
12             use Paws::Kinesis::DescribeStreamOutput;
13             use Paws::Kinesis::GetRecordsOutput;
14             use Paws::Kinesis::GetShardIteratorOutput;
15             use Paws::Kinesis::PutRecordOutput;
16              
17             use Paws::Kinesis::Record;
18              
19             has store => (is => 'rw', default => sub { +{} });
20             has shard_iterator__address => (is => 'rw', default => sub { +{} });
21              
22             sub caller_to_response {}
23              
24             sub do_call {
25             my $self = shift;
26             my ($kinesis, $action) = @_;
27              
28             my $action_class = ref $action;
29              
30             my $method = {
31             "Paws::Kinesis::CreateStream" => "create_stream",
32             "Paws::Kinesis::DescribeStream" => "describe_stream",
33             "Paws::Kinesis::GetRecords" => "get_records",
34             "Paws::Kinesis::GetShardIterator" => "get_shard_iterator",
35             "Paws::Kinesis::PutRecord" => "put_record",
36             }->{$action_class} or die "unknown action ($action_class)";
37              
38             $self->$method($action);
39             }
40              
41             sub create_stream {
42             my $self = shift;
43             my ($action) = @_;
44              
45             my $shard_count = $action->ShardCount;
46              
47             my $shard_id__records = { map { $_ => [] } 1..$shard_count };
48              
49             $self->store->{$action->StreamName} = $shard_id__records;
50              
51             return undef;
52             }
53              
54             sub get_shard_iterator {
55             my $self = shift;
56             my ($action) = @_;
57              
58             my $shard_iterator_type = $action->ShardIteratorType;
59              
60             my $method = {
61             LATEST => "_get_shard_iterator_latest",
62             TRIM_HORIZON => "_get_shard_iterator_trim_horizon",
63             AT_SEQUENCE_NUMBER => "_get_shard_iterator_at_sequence_number",
64             }->{$shard_iterator_type}
65             or die "unknown shard_iterator_type ($shard_iterator_type)";
66              
67             my $shard_iterator = $self->$method(
68             stream_name => $action->StreamName,
69             shard_id => $action->ShardId,
70             sequence_number => $action->StartingSequenceNumber,
71             );
72              
73             return Paws::Kinesis::GetShardIteratorOutput->new(
74             ShardIterator => $shard_iterator,
75             );
76             }
77              
78             sub _get_shard_iterator_at_sequence_number {
79             my $self = shift;
80             my %args = @_;
81              
82             my $stream_name = $args{stream_name};
83             my $shard_id = $args{shard_id};
84             my $sequence_number = $args{sequence_number};
85              
86             my $records = $self->store->{$stream_name}->{$shard_id};
87             my $index = first_index {
88             $_->SequenceNumber eq $sequence_number
89             } @$records;
90              
91             return $self->_create_shard_iterator($stream_name, $shard_id, $index);
92             }
93              
94             sub _get_shard_iterator_latest {
95             my $self = shift;
96             my %args = @_;
97              
98             my $stream_name = $args{stream_name};
99             my $shard_id = $args{shard_id};
100              
101             my $records = $self->store->{$stream_name}->{$shard_id};
102             my $index = @$records ? scalar @$records : 0;
103              
104             return $self->_create_shard_iterator($stream_name, $shard_id, $index);
105             }
106              
107             sub _get_shard_iterator_trim_horizon {
108             my $self = shift;
109             my %args = @_;
110              
111             my $stream_name = $args{stream_name};
112             my $shard_id = $args{shard_id};
113              
114             return $self->_create_shard_iterator($stream_name, $shard_id, 0);
115             }
116              
117             sub _create_shard_iterator {
118             my $self = shift;
119             my ($stream_name, $shard_id, $index) = @_;
120              
121             my $shard_iterator = Data::UUID->new->create_str();
122              
123             $self->shard_iterator__address->{$shard_iterator} = {
124             stream_name => $stream_name,
125             shard_id => $shard_id,
126             index => $index,
127             };
128              
129             return $shard_iterator;
130             }
131              
132             sub describe_stream {
133             my $self = shift;
134             my ($action) = @_;
135              
136             my $stream_name = $action->StreamName;
137              
138             return Paws::Kinesis::DescribeStreamOutput->new();
139              
140             return {
141             name => $stream_name,
142             shard_ids => $self->_get_shard_ids_from_stream_name($stream_name),
143             };
144             }
145              
146             sub get_records {
147             my $self = shift;
148             my ($action) = @_;
149              
150             my $shard_iterator = $action->ShardIterator;
151             my $limit = $action->Limit;
152              
153             my $address =
154             $self->shard_iterator__address->{$shard_iterator}
155             or die "shard_iterator ($shard_iterator) not found";
156              
157             my $stream_name = $address->{stream_name};
158             my $shard_id = $address->{shard_id};
159             my $index = $address->{index};
160              
161             my @records = @{$self->store->{$stream_name}->{$shard_id}};
162              
163             return Paws::Kinesis::GetRecordsOutput->new(
164             Records => [
165             defined $limit
166             ? splice(@records, $index, $limit)
167             : splice(@records, $index)
168             ],
169             NextShardIterator => $self->_get_shard_iterator_latest(
170             stream_name => $stream_name,
171             shard_id => $shard_id,
172             ),
173             );
174             }
175              
176             sub put_record {
177             my $self = shift;
178             my ($action) = @_;
179              
180             my $shard_id =
181             $self->_get_shard_id_from_partition_key($action);
182              
183             my $records = $self->store->{$action->StreamName}->{$shard_id};
184              
185             my $sequence_number = scalar(@$records + 1);
186              
187             my $new_record = Paws::Kinesis::Record->new(
188             Data => $action->Data,
189             PartitionKey => $action->PartitionKey,
190             SequenceNumber => $sequence_number,
191             );
192              
193             push @$records, $new_record;
194              
195             return Paws::Kinesis::PutRecordOutput->new(
196             ShardId => $shard_id,
197             SequenceNumber => $sequence_number,
198             );
199             }
200              
201             sub _get_shard_id_from_partition_key {
202             my $self = shift;
203             my ($action) = @_;
204              
205             my $paritition_key = $action->PartitionKey;
206             my $stream_name = $action->StreamName;
207              
208             my $shard_ids = $self->_get_shard_ids_from_stream_name($stream_name);
209              
210             die "stream ($stream_name) has no shards" unless scalar @$shard_ids;
211              
212             my $index = length($paritition_key) % scalar(@$shard_ids);
213             return $shard_ids->[$index];
214             }
215              
216             sub _get_shard_ids_from_stream_name {
217             my $self = shift;
218             my ($stream_name) = @_;
219              
220             my $shard_id__records = $self->store->{$stream_name}
221             or die "stream ($stream_name) does not exist";
222              
223             return [ sort { $a <=> $b } keys %$shard_id__records ],
224             }
225              
226              
227             1;
228             __END__