File Coverage

blib/lib/Coro/Amazon/SimpleDB.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Coro::Amazon::SimpleDB;
2 1     1   26580 use common::sense;
  0            
  0            
3              
4             $Coro::Amazon::SimpleDB::VERSION = 0.04;
5              
6             use EV;
7             use AnyEvent;
8             use Coro;
9             use Coro::AnyEvent;
10              
11             use Carp qw(croak carp);
12             use Scalar::Util qw(blessed);
13             use List::Util qw(first);
14              
15             use Amazon::SimpleDB::Client;
16             use Amazon::SimpleDB::Model::BatchPutAttributesRequest;
17             use Amazon::SimpleDB::Model::CreateDomainRequest;
18             use Amazon::SimpleDB::Model::DeleteAttributesRequest;
19             use Amazon::SimpleDB::Model::DeleteDomainRequest;
20             use Amazon::SimpleDB::Model::DomainMetadataRequest;
21             use Amazon::SimpleDB::Model::GetAttributesRequest;
22             use Amazon::SimpleDB::Model::ListDomainsRequest;
23             use Amazon::SimpleDB::Model::PutAttributesRequest;
24             use Amazon::SimpleDB::Model::SelectRequest;
25              
26              
27              
28             use Moose;
29              
30             has 'aws_access_key' => (is => 'rw');
31             has 'aws_secret_access_key' => (is => 'rw');
32             has 'domain_name' => (is => 'rw');
33             has 'sdb' => (is => 'ro', lazy_build => 1);
34             has 'pending' => (is => 'ro', default => sub { {} });
35              
36             has 'DEBUG' => (is => 'rw', default => !1);
37              
38             no Moose;
39              
40              
41              
42             REPLACE_AMAZON_SIMPLEDB_CLIENT_HTTPPOST: {
43             package Amazon::SimpleDB::Client;
44             use common::sense;
45             use AnyEvent::HTTP;
46             use HTTP::Request;
47             use HTTP::Response;
48              
49             # The only mention of a time-out in Amazon::SimpleDB::Client is in
50             # reference to a select operation. I'm using the value from there
51             # (5 seconds) as the default time-out for HTTP requests, as it
52             # seems reasonable. The setting is dynamic, but is used prior to
53             # putting HTTP request coros into wait state so it should do the
54             # expected thing if it's changed. Caveat emptor.
55              
56             our $HTTP_REQUEST_TIMEOUT = 5;
57              
58             # Replace the _httpPost method in Amazon::SimpleDB::Client to use
59             # an HTTP lib which does non-blocking requests better than
60             # Coro::LWP. This dangerously violates Amazon::SimpleDB::Client's
61             # encapsulation and has some code copied from the original
62             # _httpPost. There is a chance that changes to Amazon's module
63             # could break this.
64              
65             no warnings 'redefine';
66              
67             sub _httpPost {
68             my ($self, $parameters) = @_;
69             my $response = undef;
70             http_request
71             POST => $self->{_config}{ServiceURL},
72             body => join('&', map { $_ . '=' . $self->_urlencode($parameters->{$_}, 0) } keys %{$parameters}),
73             timeout => $HTTP_REQUEST_TIMEOUT,
74             headers => { 'Content-Type' => 'application/x-www-form-urlencoded; charset=utf-8' },
75             sub {
76             my ($body, $headers) = @_;
77             $response = HTTP::Response->new(@{$headers}{qw( Status Reason )});
78             $response->content($body);
79             $response->header($_, $headers->{$_})
80             for grep { !/[[:upper:]]/ } keys %{$headers};
81             };
82             # We need to put this coro to sleep until the response is returned.
83             while (not defined $response) { Coro::AnyEvent::sleep 0.1 }
84             return $response;
85             }
86             }
87              
88              
89             ADD_DISPATCH_HELPER_METHODS: {
90             # These methods are helpers so we can do method mapping via real
91             # dispatch. It would be nice if Amazon's library could do this
92             # dispatching for us.
93             sub Amazon::SimpleDB::Model::BatchPutAttributesRequest::client_request_method { 'batchPutAttributes' }
94             sub Amazon::SimpleDB::Model::CreateDomainRequest::client_request_method { 'createDomain' }
95             sub Amazon::SimpleDB::Model::DeleteAttributesRequest::client_request_method { 'deleteAttributes' }
96             sub Amazon::SimpleDB::Model::DeleteDomainRequest::client_request_method { 'deleteDomain' }
97             sub Amazon::SimpleDB::Model::DomainMetadataRequest::client_request_method { 'domainMetadata' }
98             sub Amazon::SimpleDB::Model::GetAttributesRequest::client_request_method { 'getAttributes' }
99             sub Amazon::SimpleDB::Model::ListDomainsRequest::client_request_method { 'listDomains' }
100             sub Amazon::SimpleDB::Model::PutAttributesRequest::client_request_method { 'putAttributes' }
101             sub Amazon::SimpleDB::Model::SelectRequest::client_request_method { 'select' }
102             }
103              
104              
105              
106             # Debugging methods.
107              
108              
109             sub _bug_process_message {
110             require Data::Dumper;
111             local $Data::Dumper::Sortkeys = 1;
112             local $Data::Dumper::Terse = 1;
113             my $message = shift;
114             my $result
115             = (not defined $message) ? ''
116             : (ref $message eq q()) ? $message
117             : (blessed($message) and $message->can('as_string')) ? scalar $message->as_string
118             : blessed($message) ? "$message"
119             : Dumper($message)
120             ;
121             return $result;
122             }
123              
124              
125             sub _bug_message {
126             my $message_array_ref = shift || [];
127             my $caller_level = shift || 0;
128             if (ref $message_array_ref ne 'ARRAY') {
129             warn "message_array_ref is not an array ref";
130             $message_array_ref = [];
131             }
132             $message_array_ref = [ "something is interesting" ]
133             unless @{$message_array_ref};
134             my $message = join((defined $, ? $, : q()), map { _bug_process_message($_) } @{$message_array_ref});
135             my @caller = caller($caller_level);
136             $message .= " at $caller[1] line $caller[2]\n" unless $message =~ /\n/xms;
137             return $message;
138             }
139              
140              
141             sub bug {
142             my $self = shift;
143             $self->DEBUG ? print STDERR _bug_message(\@_, 1) : undef;
144             }
145              
146              
147              
148             sub _build_sdb {
149             my $self = shift;
150             my $sdb = Amazon::SimpleDB::Client->new(
151             $self->aws_access_key,
152             $self->aws_secret_access_key,
153             );
154             return $sdb;
155             }
156              
157              
158             sub _normalize_sdb_request {
159             my $self = shift;
160             my $request = shift;
161              
162             # A scalar is interpreted as a request for an item name in the
163             # canonical domain.
164             return Amazon::SimpleDB::Model::GetAttributesRequest->new({
165             DomainName => $self->domain_name,
166             ItemName => $request,
167             })
168             unless ref $request;
169              
170             # A hash ref is interpreted as an argument to a call to the 'new'
171             # method of the class specified by the 'RequestType' key. This
172             # key will be removed prior to call 'new', a 'DomainName' key will
173             # be added if needed, and the class called may be aliased as
174             # specified in the anonymous hash below.
175             if (ref $request eq 'HASH') {
176             # Copy the request to avoid side effects.
177             my %request = (
178             DomainName => $self->domain_name,
179             %{$request},
180             );
181             my $type = delete $request{RequestType}
182             or croak "missing RequestType in request";
183             my $class = {
184             BatchPutAttributesRequest => 'Amazon::SimpleDB::Model::BatchPutAttributesRequest',
185             batchPutAttributes => 'Amazon::SimpleDB::Model::BatchPutAttributesRequest',
186             CreateDomainRequest => 'Amazon::SimpleDB::Model::CreateDomainRequest',
187             createDomain => 'Amazon::SimpleDB::Model::CreateDomainRequest',
188             DeleteAttributesRequest => 'Amazon::SimpleDB::Model::DeleteAttributesRequest',
189             deleteAttributes => 'Amazon::SimpleDB::Model::DeleteAttributesRequest',
190             DeleteDomainRequest => 'Amazon::SimpleDB::Model::DeleteDomainRequest',
191             deleteDomain => 'Amazon::SimpleDB::Model::DeleteDomainRequest',
192             DomainMetadataRequest => 'Amazon::SimpleDB::Model::DomainMetadataRequest',
193             domainMetadata => 'Amazon::SimpleDB::Model::DomainMetadataRequest',
194             GetAttributesRequest => 'Amazon::SimpleDB::Model::GetAttributesRequest',
195             getAttributes => 'Amazon::SimpleDB::Model::GetAttributesRequest',
196             ListDomainsRequest => 'Amazon::SimpleDB::Model::ListDomainsRequest',
197             listDomains => 'Amazon::SimpleDB::Model::ListDomainsRequest',
198             PutAttributesRequest => 'Amazon::SimpleDB::Model::PutAttributesRequest',
199             putAttributes => 'Amazon::SimpleDB::Model::PutAttributesRequest',
200             SelectRequest => 'Amazon::SimpleDB::Model::SelectRequest',
201             select => 'Amazon::SimpleDB::Model::SelectRequest',
202             }->{$type} || $type;
203             return $class->new(\%request);
204             }
205              
206             # An Amazon::SimpleDB::Model instance is almost left alone. The
207             # only processing done is adding a DomainName if needed. This is
208             # done directly on the class, so this produces a side-effect.
209             if (blessed $request and $request->isa('Amazon::SimpleDB::Model')) {
210             # Amazon's class hierarchy is very unfortunate. It would be
211             # nice to handle these as a base class but that's not how it
212             # was designed.
213             $request->setDomainName($self->domain_name)
214             if first { $request->isa("Amazon::SimpleDB::Model::$_") } qw(
215             BatchPutAttributesRequest
216             DeleteAttributesRequest
217             GetAttributesRequest
218             PutAttributesRequest
219             SelectRequest
220             )
221             and not $request->isSetDomainName
222             ;
223             return $request;
224             }
225              
226             croak "can't normalize '".(ref $request)."' request to an Amazon::SimpleDB::Model";
227             }
228              
229              
230             sub _process_request {
231             my $self = shift;
232             my $request = $self->_normalize_sdb_request(shift);
233             my $method = $request->client_request_method
234             or croak "no processing for request of type '".(ref $request)."'";
235             return $self->sdb->$method($request);
236             }
237              
238              
239             sub add_pending {
240             my $self = shift;
241             $self->pending->{$_} = $_ for @_;
242             return $self;
243             }
244              
245             sub remove_pending {
246             my $self = shift;
247             delete $self->pending->{$_} for @_;
248             return $self;
249             }
250              
251             sub has_pending { !!%{ shift->pending } }
252              
253              
254             sub poll {
255             my $self = shift;
256             async {
257             CHECK_LOOP: {
258             # Keep polling as long as there are pending requests.
259             if ($self->has_pending) {
260             Coro::AnyEvent::sleep 0.1;
261             redo CHECK_LOOP;
262             }
263             EV::unloop;
264             }
265             };
266             EV::loop;
267             return $self;
268             }
269              
270              
271             sub async_requests {
272             my ($self, @requests) = @_;
273              
274             my $debug = $self->DEBUG;
275             require Time::HiRes and Time::HiRes->import('time') if $debug;
276             my ($start, $duration) = (0, 0);
277             my @responses = ();
278             $self->bug("starting async enqueues");
279             $start = time() if $debug;
280             for ($[ .. $#requests) {
281             my $idx = $_;
282             my $request = $requests[$idx];
283             $self->bug("adding request $request");
284             my $coro = async {
285             my ($start, $duration) = (0, 0);
286             $self->bug("starting request for $request");
287             $start = time() if $debug;
288             $responses[$idx] = eval { $self->_process_request($request) };
289             # Store the exception instead of the response (which
290             # should be undef) if there was a problem.
291             $responses[$idx] = $@ if $@;
292             $duration = time() - $start if $debug;
293             $self->bug("completed request for $request in $duration secs");
294             };
295             $self->add_pending($coro);
296             $coro->on_destroy(sub { $self->remove_pending($coro) });
297             }
298             $duration = time() - $start if $debug;
299             $self->bug("completed async enqueues in $duration secs, starting coro polling");
300             $self->poll;
301              
302             return \@responses;
303             }
304              
305              
306             sub async_get_items {
307             my ($self, @items) = @_;
308             my $responses = $self->async_requests(@items);
309             my %items = map {
310             my $item_name = $items[$_];
311             my $response = $responses->[$_];
312             my $attributes
313             = (ref $response eq 'Amazon::SimpleDB::Model::GetAttributesResponse') ?
314             {
315             map {
316             defined $_->getName ? ($_->getName, $_->getValue) : ()
317             } @{ $response->getGetAttributesResult->getAttribute }
318             }
319             : $response
320             ;
321             ($item_name => $attributes);
322             } $[ .. $#items;
323             return \%items;
324             }
325              
326              
327              
328             1;
329              
330             __END__