File Coverage

blib/lib/Paws/Kinesis/MemoryCaller.pm
Criterion Covered Total %
statement 183 184 99.4
branch 18 22 81.8
condition 3 3 100.0
subroutine 40 41 97.5
pod 1 3 33.3
total 245 253 96.8


line stmt bran cond sub pod time code
1             package Paws::Kinesis::MemoryCaller;
2 9     9   8467775 use 5.008001;
  9         37  
3              
4             our $VERSION = "0.09";
5              
6             =head1 NAME
7              
8             Paws::Kinesis::MemoryCaller - A Paws Caller with in-memory Kinesis.
9              
10             =head1 SYNOPSIS
11              
12             my $kinesis = Paws->service('Kinesis',
13             region => 'N/A',
14             caller => Paws::Kinesis::MemoryCaller->new(),
15             credentials => Paws::Credential::Environment->new(),
16             );
17              
18             # or simply...
19              
20             my $kinesis = Paws::Kinesis::MemoryCaller->new_kinesis();
21              
22             # Then use $kinesis as you would normally, for example:
23              
24             # Put multiple records on a stream...
25             $kinesis->PutRecords(%args);
26              
27             # Get records from a stream...
28             $kinesis->GetRecords(%args);
29              
30             =head1 DESCRIPTION
31              
32             Paws::Kinesis::MemoryCaller implements Paws::Net::CallerRole which simulates its
33             own streams, shards and records in memory.
34              
35             The following methods have been implemented:
36              
37             =over
38              
39             =item *
40              
41             CreateStream
42              
43             =item *
44              
45             DescribeStream
46              
47             =item *
48              
49             GetRecords
50              
51             =item *
52              
53             GetShardIterator
54              
55             =item *
56              
57             PutRecord
58              
59             =item *
60              
61             PutRecords
62              
63             =back
64              
65             =cut
66              
67 9     9   1797 use Moose;
  9         1514276  
  9         82  
68             with "Paws::Net::CallerRole";
69              
70 9     9   83140 use namespace::autoclean;
  9         31760  
  9         77  
71 9     9   5913 use Data::UUID;
  9         5675  
  9         618  
72 9     9   5526 use List::AllUtils qw(first_index);
  9         94828  
  9         943  
73 9     9   4623 use MIME::Base64 qw(decode_base64);
  9         5336  
  9         545  
74              
75 9     9   1746 use Paws;
  9         2065979  
  9         293  
76 9     9   2359 use Paws::Credential::Environment;
  9         69815  
  9         321  
77              
78 9     9   5142 use Paws::Kinesis::DescribeStreamOutput;
  9         148194  
  9         366  
79 9     9   5429 use Paws::Kinesis::GetRecordsOutput;
  9         267926  
  9         441  
80 9     9   5657 use Paws::Kinesis::GetShardIteratorOutput;
  9         146558  
  9         444  
81 9     9   5535 use Paws::Kinesis::PutRecordOutput;
  9         203483  
  9         389  
82 9     9   6030 use Paws::Kinesis::PutRecordsOutput;
  9         243368  
  9         411  
83              
84 9     9   5172 use Paws::Kinesis::PutRecord;
  9         370033  
  9         413  
85              
86 9     9   4823 use Paws::Kinesis::PutRecordsResultEntry;
  9         136754  
  9         388  
87              
88 9     9   4721 use Paws::Kinesis::HashKeyRange;
  9         112707  
  9         398  
89 9     9   4625 use Paws::Kinesis::Record;
  9         164537  
  9         384  
90 9     9   5287 use Paws::Kinesis::Shard;
  9         213708  
  9         431  
91 9     9   5251 use Paws::Kinesis::SequenceNumberRange;
  9         103858  
  9         382  
92 9     9   4966 use Paws::Kinesis::StreamDescription;
  9         460070  
  9         17418  
93              
94             has store => (is => 'ro', isa => 'HashRef', default => sub { +{} });
95             has shard_iterator__address => (
96             is => 'ro',
97             isa => 'HashRef',
98             default => sub { +{} },
99             );
100              
101             =head1 METHODS
102              
103             =head2 new_kinesis
104              
105             Shortcut method to create a new Kinesis service instance that uses this caller.
106             Equivalent to:
107              
108             Paws->service('Kinesis',
109             caller => Paws::Kinesis::MemoryCaller->new(),
110             credentials => Paws::Credential::Environment->new(),
111             region => "N/A",
112             );
113              
114             =cut
115              
116             sub new_kinesis {
117 2     2 1 161 my $class = shift;
118              
119 2         60 return Paws->service('Kinesis',
120             caller => $class->new(),
121             credentials => Paws::Credential::Environment->new(),
122             region => "N/A",
123             );
124             }
125              
126       0 0   sub caller_to_response {}
127              
128             sub do_call {
129 64     64 0 4143538 my $self = shift;
130 64         201 my ($kinesis, $action) = @_;
131              
132 64         186 my $action_class = ref $action;
133              
134             my $method = {
135             "Paws::Kinesis::CreateStream" => "_create_stream",
136             "Paws::Kinesis::DescribeStream" => "_describe_stream",
137             "Paws::Kinesis::GetRecords" => "_get_records",
138             "Paws::Kinesis::GetShardIterator" => "_get_shard_iterator",
139             "Paws::Kinesis::PutRecord" => "_put_record",
140             "Paws::Kinesis::PutRecords" => "_put_records",
141 64 50       763 }->{$action_class} or die "($action_class) is not implemented";
142              
143 64         410 $self->$method($action);
144             }
145              
146             sub _create_stream {
147 9     9   26 my $self = shift;
148 9         23 my ($action) = @_;
149              
150 9         279 my $last_shard = $action->ShardCount - 1;
151 9 100       106 $last_shard >= 0
152             or die "ShardCount must be greater than zero to CreateStream";
153              
154             my $shard_id__records = {
155 7         25 map { sprintf("shardId-%012d", $_) => [] }
  8         62  
156             0..$last_shard
157             };
158              
159 7         214 $self->store->{$action->StreamName} = $shard_id__records;
160              
161 7         95 return undef;
162             }
163              
164             sub _get_shard_iterator {
165 12     12   36 my $self = shift;
166 12         32 my ($action) = @_;
167              
168 12         402 my $shard_iterator_type = $action->ShardIteratorType;
169              
170             my $method = {
171             LATEST => "_get_shard_iterator_latest",
172             TRIM_HORIZON => "_get_shard_iterator_trim_horizon",
173             AT_SEQUENCE_NUMBER => "_get_shard_iterator_at_sequence_number",
174             AFTER_SEQUENCE_NUMBER => "_get_shard_iterator_after_sequence_number",
175 12 100       206 }->{$shard_iterator_type}
176             or die "ShardIteratorType($shard_iterator_type) is invalid or not implemented";
177              
178 11         318 my $shard_iterator = $self->$method(
179             stream_name => $action->StreamName,
180             shard_id => $action->ShardId,
181             sequence_number => $action->StartingSequenceNumber,
182             );
183              
184 9         122 return Paws::Kinesis::GetShardIteratorOutput->new(
185             ShardIterator => $shard_iterator,
186             );
187             }
188              
189             sub _get_shard_iterator_after_sequence_number {
190 1     1   63 my $self = shift;
191 1         5 my %args = @_;
192              
193 1         5 my $index = $self->_get_index_by_sequence_number(%args);
194              
195             return $self->_create_shard_iterator(
196             $args{stream_name},
197             $args{shard_id},
198 1         17 $index + 1,
199             );
200             }
201              
202             sub _get_shard_iterator_at_sequence_number {
203 2     2   139 my $self = shift;
204 2         15 my %args = @_;
205              
206 2         16 my $index = $self->_get_index_by_sequence_number(%args);
207              
208             return $self->_create_shard_iterator(
209             $args{stream_name},
210             $args{shard_id},
211 2         37 $index,
212             );
213             }
214              
215             sub _get_index_by_sequence_number {
216 3     3   10 my $self = shift;
217 3         13 my %args = @_;
218              
219 3         11 my $stream_name = $args{stream_name};
220 3         11 my $shard_id = $args{shard_id};
221             my $sequence_number = $args{sequence_number}
222 3 50       28 or die "StartingSequenceNumber is required";
223              
224 3         21 my $records = $self->_get_records_from_store($stream_name, $shard_id);
225              
226             return first_index {
227 11     11   319 $_->SequenceNumber eq $sequence_number
228 3         83 } @$records;
229             }
230              
231             sub _get_shard_iterator_latest {
232 5     5   401 my $self = shift;
233 5         28 my %args = @_;
234              
235 5         16 my $stream_name = $args{stream_name};
236 5         11 my $shard_id = $args{shard_id};
237              
238 5         18 my $records = $self->_get_records_from_store($stream_name, $shard_id);
239              
240 3 50       16 my $index = @$records ? scalar @$records : 0;
241              
242 3         19 return $self->_create_shard_iterator($stream_name, $shard_id, $index);
243             }
244              
245             sub _get_shard_id__records {
246 147     147   276 my $self = shift;
247 147         288 my ($stream_name) = @_;
248              
249 147 100       4236 my $shard_id__records = $self->store->{$stream_name}
250             or die "StreamName($stream_name) does not exist";
251              
252 143         329 return $shard_id__records;
253             }
254              
255             sub _get_records_from_store {
256 101     101   194 my $self = shift;
257 101         228 my ($stream_name, $shard_id) = @_;
258              
259 101         280 my $shard_id__records = $self->_get_shard_id__records($stream_name);
260              
261             my $records = $shard_id__records->{$shard_id}
262             or die sprintf(
263             "ShardId(%s) does not exist. ShardIds are (%s)",
264             $shard_id,
265 100 100       323 join(", ", sort { $a cmp $b } keys %$shard_id__records),
  0         0  
266             );
267              
268 99         261 return $records;
269             }
270              
271             sub _push_record_to_store {
272 38     38   81 my $self = shift;
273 38         145 my ($stream_name, $shard_id, $record) = @_;
274              
275 38         126 my $records = $self->_get_records_from_store($stream_name, $shard_id);
276 38         132 push @$records, $record;
277             }
278              
279             sub _get_shard_iterator_trim_horizon {
280 3     3   192 my $self = shift;
281 3         22 my %args = @_;
282              
283 3         11 my $stream_name = $args{stream_name};
284 3         9 my $shard_id = $args{shard_id};
285              
286 3         13 return $self->_create_shard_iterator($stream_name, $shard_id, 0);
287             }
288              
289             sub _create_shard_iterator {
290 26     26   66 my $self = shift;
291 26         76 my ($stream_name, $shard_id, $index) = @_;
292              
293 26         5721 my $shard_iterator = Data::UUID->new->create_b64();
294              
295 26         2716 $self->shard_iterator__address->{$shard_iterator} = {
296             stream_name => $stream_name,
297             shard_id => $shard_id,
298             index => $index,
299             };
300              
301 26         99 return $shard_iterator;
302             }
303              
304             sub _describe_stream {
305 6     6   15 my $self = shift;
306 6         17 my ($action) = @_;
307              
308 6         159 my $stream_name = $action->StreamName;
309              
310 6         60 my $shard_ids = $self->_get_shard_ids_from_stream_name($stream_name);
311              
312             my $shards = [
313             map {
314 5         19 Paws::Kinesis::Shard->new(
  5         68  
315             HashKeyRange => Paws::Kinesis::HashKeyRange->new(
316             EndingHashKey => "",
317             StartingHashKey => "",
318             ),
319             SequenceNumberRange => Paws::Kinesis::SequenceNumberRange->new(
320             StartingSequenceNumber => "",
321             ),
322             ShardId => $_,
323             )
324             }
325             @$shard_ids
326             ];
327              
328 5         13749 return Paws::Kinesis::DescribeStreamOutput->new(
329             StreamDescription => Paws::Kinesis::StreamDescription->new(
330             EnhancedMonitoring => [],
331             HasMoreShards => "",
332             RetentionPeriodHours => 24,
333             Shards => $shards,
334             StreamARN => "",
335             StreamCreationTimestamp => "",
336             StreamName => $stream_name,
337             StreamStatus => "",
338             ),
339             );
340             }
341              
342             sub _get_records {
343 18     18   112 my $self = shift;
344 18         102 my ($action) = @_;
345              
346 18         558 my $shard_iterator = $action->ShardIterator;
347 18         563 my $limit = $action->Limit;
348              
349             my $address =
350 18 100       655 $self->shard_iterator__address->{$shard_iterator}
351             or die "ShardIterator($shard_iterator) does not exist";
352              
353 17         54 my $stream_name = $address->{stream_name};
354 17         78 my $shard_id = $address->{shard_id};
355 17         103 my $index = $address->{index};
356              
357             my @stream_shard_records =
358 17         62 @{$self->_get_records_from_store($stream_name, $shard_id)};
  17         65  
359              
360 17 100       82 my $end_index = defined $limit
361             ? $index + $limit - 1
362             : scalar(@stream_shard_records) - 1;
363              
364 17         66 my $records = [ grep { $_ } @stream_shard_records[$index..$end_index] ];
  36         119  
365              
366 17         122 my $next_shard_iterator = $self->_create_shard_iterator(
367             $stream_name, $shard_id, $index + scalar(@$records),
368             );
369              
370 17         146 return Paws::Kinesis::GetRecordsOutput->new(
371             Records => $records,
372             NextShardIterator => $next_shard_iterator,
373             );
374             }
375              
376             sub _put_record {
377 43     43   47041 my $self = shift;
378 43         112 my ($action) = @_;
379              
380 43         1160 my $stream_name = $action->StreamName;
381 43         1293 my $data = $action->Data;
382              
383 43 100 100     693 decode_base64($data) && length($data) % 4 == 0
384             or die "Data($data) is not valid Base64";
385              
386 40         154 my $shard_id = $self->_get_shard_id_from_partition_key($action);
387 38         150 my $records = $self->_get_records_from_store($stream_name, $shard_id);
388              
389 38         89 my $sequence_number = scalar(@$records + 1);
390              
391 38         997 my $record = Paws::Kinesis::Record->new(
392             Data => $data,
393             PartitionKey => $action->PartitionKey,
394             SequenceNumber => $sequence_number,
395             );
396              
397 38         53504 $self->_push_record_to_store($stream_name, $shard_id, $record);
398              
399 38         225 return Paws::Kinesis::PutRecordOutput->new(
400             ShardId => $shard_id,
401             SequenceNumber => $sequence_number,
402             );
403             }
404              
405             sub _put_records {
406 12     12   31 my $self = shift;
407 12         32 my ($action) = @_;
408              
409 12         340 my $stream_name = $action->StreamName;
410              
411             my $records = [
412             map {
413 36         32233 my $record = $_;
414              
415 36         1060 my $data = $record->Data;
416 36         1285 my $paritition_key = $record->PartitionKey;
417              
418 36         380 my $put_record_output = $self->_put_record(
419             Paws::Kinesis::PutRecord->new(
420             PartitionKey => $paritition_key,
421             StreamName => $stream_name,
422             Data => $data,
423             ),
424             );
425              
426 34         34813 Paws::Kinesis::PutRecordsResultEntry->new(
427             ShardId => $put_record_output->ShardId,
428             SequenceNumber => $put_record_output->SequenceNumber,
429             );
430             }
431 12         98 @{$action->Records}
  12         283  
432             ];
433              
434 10         13334 return Paws::Kinesis::PutRecordsOutput->new(
435             Records => $records,
436             );
437             }
438              
439             sub _get_shard_id_from_partition_key {
440 40     40   92 my $self = shift;
441 40         91 my ($action) = @_;
442              
443 40         1115 my $paritition_key = $action->PartitionKey;
444 40         1246 my $stream_name = $action->StreamName;
445              
446 40         319 my $shard_ids = $self->_get_shard_ids_from_stream_name($stream_name);
447              
448 38 50       136 die "stream ($stream_name) has no shards" unless scalar @$shard_ids;
449              
450 38         104 my $index = length($paritition_key) % scalar(@$shard_ids);
451 38         123 return $shard_ids->[$index];
452             }
453              
454             sub _get_shard_ids_from_stream_name {
455 46     46   100 my $self = shift;
456 46         117 my ($stream_name) = @_;
457              
458 46         153 my $shard_id__records = $self->_get_shard_id__records($stream_name);
459              
460 43         223 return [ sort { $a cmp $b } keys %$shard_id__records ],
  3         20  
461             }
462              
463             __PACKAGE__->meta->make_immutable;
464              
465             =head1 LICENSE
466              
467             Copyright (C) Keith Broughton.
468              
469             This library is free software; you can redistribute it and/or modify
470             it under the same terms as Perl itself.
471              
472             =head1 DEVELOPMENT
473              
474             =head2 Author
475              
476             Keith Broughton C<< <keithbro [AT] cpan.org> >>
477              
478             =head2 Bug reports
479              
480             Please report any bugs or feature requests on GitHub:
481              
482             L<https://github.com/keithbro/Paws-Kinesis-MemoryCaller/issues>.
483              
484             =cut