File Coverage

blib/lib/ElasticSearch/Transport.pm
Criterion Covered Total %
statement 30 287 10.4
branch 0 118 0.0
condition 0 81 0.0
subroutine 10 42 23.8
pod 1 27 3.7
total 41 555 7.3


line stmt bran cond sub pod time code
1             package ElasticSearch::Transport;
2             $ElasticSearch::Transport::VERSION = '0.68';
3 1     1   10 use strict;
  1         2  
  1         31  
4 1     1   5 use warnings FATAL => 'all';
  1         2  
  1         32  
5 1     1   554 use ElasticSearch::Util qw(throw build_error parse_params);
  1         4  
  1         87  
6 1     1   3657 use URI();
  1         7928  
  1         27  
7 1     1   1180 use JSON();
  1         16242  
  1         35  
8 1     1   11245 use Encode qw(decode_utf8);
  1         22639  
  1         115  
9 1     1   11 use Scalar::Util qw(openhandle);
  1         2  
  1         178  
10 1     1   7 use List::Util qw(shuffle min);
  1         2  
  1         106  
11 1     1   1282 use IO::Handle();
  1         10534  
  1         3236  
12              
13             our %Transport = (
14             'http' => 'ElasticSearch::Transport::HTTP',
15             'httplite' => 'ElasticSearch::Transport::HTTPLite',
16             'thrift' => 'ElasticSearch::Transport::Thrift',
17             'httptiny' => 'ElasticSearch::Transport::HTTPTiny',
18             'curl' => 'ElasticSearch::Transport::Curl',
19             'aehttp' => 'ElasticSearch::Transport::AEHTTP',
20             'aecurl' => 'ElasticSearch::Transport::AECurl',
21             );
22              
23             our %Min_Versions = (
24             'ElasticSearch::Transport::Thrift' => '0.03',
25             'ElasticSearch::Transport::Curl' => '0.07',
26             'ElasticSearch::Transport::AEHTTP' => '0.06',
27             'ElasticSearch::Transport::AECurl' => '0.06',
28             );
29              
30             #===================================
31             sub new {
32             #===================================
33 0     0 0   my $class = shift;
34 0           my $params = shift;
35 0   0       my $transport_name = delete $params->{transport} || 'http';
36 0 0         my $transport_class = $Transport{$transport_name}
37             or $class->throw(
38             'Param',
39             "Unknown transport '$transport_name'",
40             { Available => \%Transport }
41             );
42              
43 0 0         eval "require $transport_class" or $class->throw( "Internal", $@ );
44 0 0         if ( my $min = $Min_Versions{$transport_class} ) {
45 0           my $version = $transport_class->VERSION;
46 0 0         $class->throw( 'Internal',
47             "$transport_class v$min required but v$version installed." )
48             unless $version ge $min;
49             }
50              
51 0           my $self = bless {
52             _JSON => JSON->new(),
53             _timeout => 120,
54             _max_requests => 10_000,
55             _max_content_length => 104_857_600,
56             _failed => {},
57             },
58             $transport_class;
59              
60 0   0       my $servers = delete $params->{servers}
61             || '127.0.0.1:' . $transport_class->default_port;
62              
63 0 0         $self->{_default_servers}
64             = [ shuffle( ref $servers eq 'ARRAY' ? @$servers : $servers ) ];
65              
66 0           for (qw(timeout max_requests no_refresh deflate max_content_length)) {
67 0 0         next unless exists $params->{$_};
68 0           $self->$_( delete $params->{$_} );
69             }
70 0           $self->init($params);
71 0           return $self;
72             }
73              
74             #===================================
75 0     0 1   sub init { shift() }
76             #===================================
77              
78             #===================================
79             sub request {
80             #===================================
81 0     0 0   my $self = shift;
82 0           my $params = shift;
83 0           my $single_server = shift;
84              
85 0           my $args = $self->_tidy_params($params);
86 0           $self->reset_failed_servers();
87              
88 0           my $json;
89 0           while (1) {
90 0   0       my $srvr = $single_server || $self->next_server;
91 0 0         $self->log_request( $srvr, $args ) unless $single_server;
92              
93 0 0         $json = eval { $self->send_request( $srvr, $args ) || '{"ok":true}' }
  0 0          
94             and last;
95              
96 0   0       my $error = $@ || 'Unknown error';
97 0 0 0       next if !$single_server && $self->should_retry( $srvr, $error );
98 0 0         $error = $self->_handle_error( $srvr, $params, $error )
99             or return;
100 0           die $error;
101             }
102 0           return $self->_response( $json, $params, $single_server );
103              
104             }
105              
106             #===================================
107             sub _response {
108             #===================================
109 0     0     my $self = shift;
110 0           my $response_json = shift;
111 0           my $params = shift;
112 0           my $skip_log = shift;
113              
114 0           my $as_json = $params->{as_json};
115 0           my $post_process = $params->{post_process};
116              
117 0           my $result;
118 0 0 0       $result = $self->JSON->decode($response_json)
      0        
119             unless $as_json && !$post_process && $skip_log;
120              
121 0 0 0       $self->log_response( $result || $response_json )
122             unless $skip_log;
123              
124 0 0         if ($post_process) {
125 0           $result = $post_process->($result);
126 0 0         if ($as_json) {
127 0           $response_json = $self->JSON->encode($result);
128 0           $result = undef;
129             }
130             }
131              
132 0 0         return $as_json ? $response_json : $result;
133             }
134              
135             #===================================
136             sub skip_request {
137             #===================================
138 0     0 0   my $self = shift;
139 0           my $as_json = shift;
140 0           my $result = shift;
141 0 0         return $result unless $as_json;
142 0           return $self->JSON->encode($result);
143             }
144              
145             #===================================
146             sub should_retry {
147             #===================================
148 0     0 0   my $self = shift;
149 0           my $server = shift;
150 0           my $error = shift;
151              
152 0 0         return unless $error->isa('ElasticSearch::Error::Connection');
153              
154 0   0       warn "Error connecting to '$server' : "
155             . ( $error->{-text} || 'Unknown' ) . "\n\n";
156              
157 0 0 0       if ( $self->no_refresh || $error->isa('ElasticSearch::Error::NotReady') )
158             {
159 0           $self->_remove_server($server);
160             }
161             else {
162 0           $self->{_refresh_in} = 0;
163             }
164              
165 0           return 1;
166             }
167              
168             #===================================
169             sub _handle_error {
170             #===================================
171 0     0     my $self = shift;
172 0           my $server = shift;
173 0           my $params = shift;
174 0   0       my $error = shift || 'Unknown error';
175              
176 0 0         $error = build_error( $self, 'Request', $error, { request => $params } )
177             unless ref $error;
178              
179             return
180 0 0 0       if $error->isa('ElasticSearch::Error::Missing')
181             && $params->{qs}{ignore_missing};
182              
183 0           $error->{-vars}{request} = $params;
184              
185 0 0         if ( my $raw = $error->{-vars}{content} ) {
186 0 0 0       $error->{-vars}{current_version} = $1
187             if $error->isa('ElasticSearch::Error::Conflict')
188             and $raw =~ /: version conflict, current \[(\d+)\]/;
189              
190 0   0       my $content = eval { $self->JSON->decode($raw) } || $raw;
191 0           $self->log_response($content);
192 0 0 0       if ( ref $content and $content->{error} ) {
193 0           $error->{-text} = $content->{error};
194 0 0         $error->{-vars}{error_trace} = $content->{error_trace}
195             if $content->{error_trace};
196 0           delete $error->{-vars}{content};
197             }
198             }
199 0           return $error;
200             }
201              
202             #===================================
203             sub _tidy_params {
204             #===================================
205 0     0     my $self = shift;
206 0           my $params = shift;
207              
208 0   0       $params->{method} ||= 'GET';
209 0   0       $params->{cmd} ||= '/';
210 0   0       $params->{qs} ||= {};
211              
212 0           my $data = $params->{data};
213 0 0         $data
    0          
214             = ref $data eq 'SCALAR'
215             ? $$data
216             : $self->JSON->encode($data)
217             if $data;
218              
219 0           return { data => $data, map { $_ => $params->{$_} } qw(method cmd qs) };
  0            
220             }
221              
222             #===================================
223             sub refresh_servers {
224             #===================================
225 0     0 0   my $self = shift;
226              
227 0           $self->{_refresh_in} = 0;
228 0           delete $self->{_current_server};
229              
230 0           my %servers = map { $_ => 1 }
  0            
231 0           ( @{ $self->servers }, @{ $self->default_servers } );
  0            
232              
233 0           my @all_servers = keys %servers;
234 0           my $protocol = $self->protocol;
235              
236 0           foreach my $server (@all_servers) {
237 0 0         next unless $server;
238              
239 0 0         my $nodes = eval {
240 0           $self->request( { cmd => '/_cluster/nodes', qs => { http => 1 } },
241             $server );
242             }
243             or next;
244              
245 0           my @servers = grep {$_}
  0            
246 0 0 0       map {m{/([^]]+)}}
247             map {
248 0           $_->{ $protocol . '_address' }
249             || $_->{ $protocol . 'Address' }
250             || ''
251 0           } values %{ $nodes->{nodes} };
252 0 0         next unless @servers;
253              
254 0 0         if ( $protocol eq 'http' ) {
255 0           my $content_length = min( $self->max_content_length,
256 0           grep {$_} map { $_->{http}{max_content_length_in_bytes} }
  0            
257 0           values %{ $nodes->{nodes} } );
258 0           $self->max_content_length($content_length);
259             }
260              
261 0           @servers = shuffle(@servers);
262              
263 0           $self->{_refresh_in} = $self->max_requests - 1;
264 0           return $self->servers( \@servers );
265             }
266              
267             $self->throw(
268 0           'NoServers',
269             "Could not retrieve a list of active servers:\n$@",
270             { servers => \@all_servers }
271             );
272             }
273              
274             #===================================
275             sub next_server {
276             #===================================
277 0     0 0   my $self = shift;
278 0 0         unless ( $self->{_refresh_in}-- ) {
279 0 0         if ( $self->no_refresh ) {
280 0           $self->servers( $self->default_servers );
281 0           $self->{_refresh_in} = $self->max_requests - 1;
282 0           $self->reset_failed_servers();
283             }
284             else {
285 0           $self->refresh_servers;
286             }
287             }
288              
289 0           my @servers = @{ $self->servers };
  0            
290              
291 0 0         unless (@servers) {
292 0           my $failed = $self->{_failed};
293 0           @servers = grep { !$failed->{$_} } @{ $self->default_servers };
  0            
  0            
294 0 0         unless (@servers) {
295 0           $self->{_refresh_in} = 0;
296 0           $self->throw(
297             "NoServers",
298             "No servers available:\n",
299             { servers => $self->default_servers }
300             );
301             }
302              
303             }
304              
305 0           my $next = shift(@servers);
306              
307 0           $self->{_current_server} = { $$ => $next };
308 0           $self->servers( @servers, $next );
309 0           return $next;
310             }
311              
312             #===================================
313             sub _remove_server {
314             #===================================
315 0     0     my $self = shift;
316 0           my $server = shift;
317 0           $self->{_failed}{$server}++;
318 0           my @servers = grep { $_ ne $server } @{ $self->servers };
  0            
  0            
319 0           $self->servers( \@servers );
320             }
321              
322             #===================================
323             sub reset_failed_servers {
324             #===================================
325 0     0 0   my $self = shift;
326 0           $self->{_failed} = {};
327             }
328              
329             #===================================
330             sub current_server {
331             #===================================
332 0     0 0   my $self = shift;
333 0   0       return $self->{_current_server}{$$} || $self->next_server;
334             }
335              
336             #===================================
337             sub servers {
338             #===================================
339 0     0 0   my $self = shift;
340 0 0         if (@_) {
341 0 0         $self->{_servers} = ref $_[0] eq 'ARRAY' ? shift : [@_];
342             }
343 0   0       return $self->{_servers} ||= [];
344             }
345              
346             #===================================
347             sub max_requests {
348             #===================================
349 0     0 0   my $self = shift;
350 0 0         if (@_) {
351 0           $self->{_max_requests} = shift;
352             }
353 0   0       return $self->{_max_requests} || 0;
354             }
355              
356             #===================================
357             sub max_content_length {
358             #===================================
359 0     0 0   my $self = shift;
360 0 0         if (@_) {
361 0           $self->{_max_content_length} = shift;
362             }
363 0   0       return $self->{_max_content_length} || 0;
364             }
365              
366             #===================================
367             sub check_content_length {
368             #===================================
369 0     0 0   my $self = shift;
370 0           my $length = length ${ $_[0] };
  0            
371 0 0         return unless $length > $self->max_content_length;
372              
373 0           my $msg
374             = "Content length ($length) greater than max_content_length ("
375             . $self->max_content_length
376             . ") for request:\n"
377 0           . substr( ${ $_[0] }, 0, 500 ) . '...';
378 0           $self->throw( 'Request', $msg );
379             }
380              
381             #===================================
382 0     0 0   sub default_servers { shift->{_default_servers} }
383             #===================================
384              
385             #===================================
386             sub http_uri {
387             #===================================
388 0     0 0   my $self = shift;
389 0           my $server = shift;
390 0           my $cmd = shift;
391 0 0         $cmd = '' unless defined $cmd;
392 0           my $uri = URI->new( 'http://' . $server . $cmd );
393 0 0         $uri->query_form(shift) if $_[0];
394 0           return $uri->as_string;
395             }
396              
397             #===================================
398             sub inflate {
399             #===================================
400 0     0 0   my $self = shift;
401 0           my $content = shift;
402 0           my $output;
403 0           require IO::Uncompress::Inflate;
404              
405 1     1   11 no warnings 'once';
  1         2  
  1         1734  
406 0 0         IO::Uncompress::Inflate::inflate( \$content, \$output, Transparent => 0 )
407             or die "Couldn't inflate response: "
408             . $IO::Uncompress::Inflate::InflateError;
409 0           return $output;
410             }
411              
412             #===================================
413             sub timeout {
414             #===================================
415 0     0 0   my $self = shift;
416 0 0         if (@_) {
417 0           $self->{_timeout} = shift;
418 0           $self->clear_clients;
419             }
420 0   0       return $self->{_timeout} || 0;
421             }
422              
423             #===================================
424             sub deflate {
425             #===================================
426 0     0 0   my $self = shift;
427 0 0         if (@_) {
428 0           $self->{_deflate} = shift;
429 0           $self->clear_clients;
430             }
431 0   0       return $self->{_deflate} || 0;
432             }
433              
434             #===================================
435             sub no_refresh {
436             #===================================
437 0     0 0   my $self = shift;
438 0 0         if (@_) {
439 0           $self->{_no_refresh} = !!shift();
440             }
441 0   0       return $self->{_no_refresh} || 0;
442             }
443              
444             #===================================
445             sub trace_calls {
446             #===================================
447 0     0 0   my $self = shift;
448 0 0         if (@_) {
449 0           delete $self->{_log_fh};
450 0           $self->{_trace_calls} = shift;
451 0           $self->JSON->pretty( !!$self->{_trace_calls} );
452              
453             }
454 0           return $self->{_trace_calls};
455             }
456              
457             #===================================
458             sub _log_fh {
459             #===================================
460 0     0     my $self = shift;
461 0 0         unless ( exists $self->{_log_fh}{$$} ) {
462 0           my $log_fh;
463 0 0         if ( my $file = $self->trace_calls ) {
464 0 0         $file = \*STDERR if $file eq 1;
465 0           my $open_mode = '>>';
466 0 0         if ( openhandle($file) ) {
467 0           $open_mode = '>>&';
468             }
469             else {
470 0           $file .= ".$$";
471             }
472 0 0         open $log_fh, $open_mode, $file
473             or $self->throw( 'Internal',
474             "Couldn't open '$file' for trace logging: $!" );
475 0           binmode( $log_fh, ':utf8' );
476 0           $log_fh->autoflush(1);
477             }
478 0           $self->{_log_fh}{$$} = $log_fh;
479             }
480 0           return $self->{_log_fh}{$$};
481             }
482              
483             #===================================
484             sub log_request {
485             #===================================
486 0     0 0   my $self = shift;
487 0 0         my $log = $self->_log_fh or return;
488 0           my $server = shift;
489 0           my $params = shift;
490              
491 0           my $data = $params->{data};
492 0 0 0       if ( defined $data and $data ne "{}\n" ) {
493 0           $data =~ s/'/\\u0027/g;
494 0           $data = " -d '\n${data}'";
495             }
496             else {
497 0           $data = '';
498             }
499              
500 0           printf $log (
501             "# [%s] Protocol: %s, Server: %s\n",
502             scalar localtime(),
503             $self->protocol, ${server}
504             );
505 0           my %qs = ( %{ $params->{qs} }, pretty => 1 );
  0            
506 0           my $uri = $self->http_uri( '127.0.0.1:9200', $params->{cmd}, \%qs );
507              
508 0           my $method = $params->{method};
509 0           print $log "curl -X$method '$uri' ${data}\n\n";
510             }
511              
512             #===================================
513             sub log_response {
514             #===================================
515 0     0 0   my $self = shift;
516 0 0         my $log = $self->_log_fh or return;
517 0           my $content = shift;
518 0 0         my $out = ref $content ? $self->JSON->encode($content) : $content;
519 0           my @lines = split /\n/, $out;
520 0           printf $log ( "# [%s] Response:\n", scalar localtime() );
521 0           while (@lines) {
522 0           my $line = shift @lines;
523 0 0         if ( length $line > 65 ) {
524 0           my ($spaces) = ( $line =~ /^(?:> )?(\s*)/ );
525 0 0         $spaces = substr( $spaces, 0, 20 ) if length $spaces > 20;
526 0           unshift @lines, '> ' . $spaces . substr( $line, 65 );
527 0           $line = substr $line, 0, 65;
528             }
529 0           print $log "# $line\n";
530             }
531 0           print $log "\n";
532             }
533              
534             #===================================
535             sub clear_clients {
536             #===================================
537 0     0 0   my $self = shift;
538 0           delete $self->{_client};
539             }
540              
541             #===================================
542 0     0 0   sub JSON { shift()->{_JSON} }
543             #===================================
544              
545             my %Statuses = (
546             100 => 'CONT',
547             101 => 'SWITCHING_PROTOCOLS',
548             200 => 'OK',
549             201 => 'CREATED',
550             202 => 'ACCEPTED',
551             203 => 'NON_AUTHORITATIVE_INFORMATION',
552             204 => 'NO_CONTENT',
553             205 => 'RESET_CONTENT',
554             206 => 'PARTIAL_CONTENT',
555             207 => 'MULTI_STATUS',
556             300 => 'MULTIPLE_CHOICES',
557             301 => 'MOVED_PERMANENTLY',
558             302 => 'FOUND',
559             303 => 'SEE_OTHER',
560             304 => 'NOT_MODIFIED',
561             305 => 'USE_PROXY',
562             307 => 'TEMPORARY_REDIRECT',
563             400 => 'BAD_REQUEST',
564             401 => 'UNAUTHORIZED',
565             402 => 'PAYMENT_REQUIRED',
566             403 => 'FORBIDDEN',
567             404 => 'NOT_FOUND',
568             405 => 'METHOD_NOT_ALLOWED',
569             406 => 'NOT_ACCEPTABLE',
570             407 => 'PROXY_AUTHENTICATION',
571             408 => 'REQUEST_TIMEOUT',
572             409 => 'CONFLICT',
573             410 => 'GONE',
574             411 => 'LENGTH_REQUIRED',
575             412 => 'PRECONDITION_FAILED',
576             413 => 'REQUEST_ENTITY_TOO_LARGE',
577             414 => 'REQUEST_URI_TOO_LONG',
578             415 => 'UNSUPPORTED_MEDIA_TYPE',
579             416 => 'REQUESTED_RANGE_NOT_SATISFIED',
580             417 => 'EXPECTATION_FAILED',
581             422 => 'UNPROCESSABLE_ENTITY',
582             423 => 'LOCKED',
583             424 => 'FAILED_DEPENDENCY',
584             500 => 'INTERNAL_SERVER_ERROR',
585             501 => 'NOT_IMPLEMENTED',
586             502 => 'BAD_GATEWAY',
587             503 => 'SERVICE_UNAVAILABLE',
588             504 => 'GATEWAY_TIMEOUT',
589             506 => 'INSUFFICIENT_STORAGE',
590             );
591              
592             #===================================
593             sub http_status {
594             #===================================
595 0   0 0 0   my $code = $_[1] || 0;
596 0   0       return $Statuses{$code} || 'Unknown code ' . $code;
597             }
598              
599             my %Code_To_Error = (
600             409 => 'Conflict',
601             404 => 'Missing',
602             403 => 'ClusterBlocked',
603             503 => 'NotReady'
604             );
605              
606             #===================================
607             sub code_to_error {
608             #===================================
609 0     0 0   my $self = shift;
610 0   0       my $code = shift || return;
611 0           return $Code_To_Error{$code};
612             }
613              
614             #===================================
615             sub register {
616             #===================================
617 0     0 0   my $class = shift;
618 0   0       my $name = shift
619             || $class->throw( 'Param',
620             'No transport name passed to register_transport()' );
621 0   0       my $module = shift
622             || $class->throw( 'Param',
623             'No module name passed to register_transport()' );
624 0           return $Transport{$name} = $module;
625             }
626              
627             =head1 NAME
628              
629             ElasticSearch::Transport - Base class for communicating with ElasticSearch
630              
631             =head1 DESCRIPTION
632              
633             ElasticSearch::Transport is a base class for the modules which communicate
634             with the ElasticSearch server.
635              
636             It handles failover to the next node in case the current node closes
637             the connection.
638              
639             All requests are round-robin'ed to all live servers as returned by
640             C, except we C the server list when we
641             retrieve it, and thus avoid having all our instances make their first
642             request to the same server.
643              
644             On the first request and every C after that (default 10,000),
645             the list of live nodes is automatically refreshed. This can be disabled
646             by setting C to C<0>.
647              
648             Regardless of the C setting, a list of live nodes will still be
649             retrieved on the first request. This may not be desirable behaviour
650             if, for instance, you are connecting to remote servers which use internal
651             IP addresses, or which don't allow remote C requests.
652              
653             If you want to disable this behaviour completely, set C to C<1>,
654             in which case the transport module will round robin through the
655             C list only. Failed nodes will be removed from the list
656             (but added back in every C or when all nodes have failed):
657              
658             The HTTP clients check that the post body content length is not greater than the
659             L, which defaults to 104,857,600 bytes (100MB) - the default
660             that is configured in Elasticsearch. From version 0.19.12, when C
661             set to false, the HTTP transport clients will auto-detect the minimum
662             C from the cluster.
663              
664             Currently, the available backends are:
665              
666             =over
667              
668             =item * C (default)
669              
670             Uses L to communicate using HTTP. See L
671              
672             =item * C
673              
674             Uses L to communicate using HTTP.
675             See L
676              
677             =item * C
678              
679             Uses L to communicate using HTTP.
680             See L
681              
682             =item * C
683              
684             Uses L and thus L
685             to communicate using HTTP. See L
686              
687             =item * C
688              
689             Uses L to communicate asynchronously using HTTP.
690             See L
691              
692             =item * C
693              
694             Uses L (and thus L)
695             to communicate asynchronously using HTTP. See L
696              
697             =item * C
698              
699             Uses C to communicate using a compact binary protocol over sockets.
700             See L. You need to have the
701             C plugin installed on your ElasticSearch server for this
702             to work.
703              
704             =back
705              
706             You shouldn't need to talk to the transport modules directly - everything
707             happens via the main L class.
708              
709             =cut
710              
711             =head1 SYNOPSIS
712              
713              
714             use ElasticSearch;
715             my $e = ElasticSearch->new(
716             servers => 'search.foo.com:9200',
717             transport => 'httplite',
718             timeout => '10',
719             no_refresh => 0 | 1,
720             deflate => 0 | 1,
721             max_content_length => 104_857_600,
722             );
723              
724             my $t = $e->transport;
725              
726             $t->max_requests(5) # refresh_servers every 5 requests
727             $t->protocol # eg 'http'
728             $t->next_server # next node to use
729             $t->current_server # eg '127.0.0.1:9200' ie last used node
730             $t->default_servers # seed servers passed in to new()
731              
732             $t->servers # eg ['192.168.1.1:9200','192.168.1.2:9200']
733             $t->servers(@servers); # set new 'live' list
734              
735             $t->refresh_servers # refresh list of live nodes
736              
737             $t->clear_clients # clear all open clients
738              
739             $t->no_refresh(0|1) # don't retrieve the live node list
740             # instead, use just the nodes specified
741              
742             $t->deflate(0|1); # should ES deflate its responses
743             # useful if ES is on a remote network.
744             # ES needs compression enabled with
745             # http.compression: true
746              
747             $t->max_content_length(1000); # set the max HTTP body content length
748              
749             $t->register('foo',$class) # register new Transport backend
750              
751             =head1 WHICH TRANSPORT SHOULD YOU USE
752              
753             Although the C interface has the right buzzwords (binary, compact,
754             sockets), the generated Perl code is very slow. Until that is improved, I
755             recommend one of the C backends instead.
756              
757             The HTTP backends in increasing order of speed are:
758              
759             =over
760              
761             =item *
762              
763             C - L based
764              
765             =item *
766              
767             C - L based, about 30% faster than C
768              
769             =item *
770              
771             C - L based, about 1% faster than C
772              
773             =item *
774              
775             C - L based, about 60% faster than C!
776              
777             =back
778              
779             See also:
780             L
781             and L
782              
783             =head1 SUBCLASSING TRANSPORT
784              
785             If you want to add a new transport backend, then these are the methods
786             that you should subclass:
787              
788             =head2 init()
789              
790             $t->init($params)
791              
792             By default, a no-op. Receives a HASH ref with the parameters passed in to
793             C, less C, C and C.
794              
795             Any parameters specific to your module should be deleted from C<$params>
796              
797             =head2 send_request()
798              
799             $json = $t->send_request($server,$params)
800              
801             where $params = {
802             method => 'GET',
803             cmd => '/_cluster',
804             qs => { pretty => 1 },
805             data => '{ "foo": "bar"}',
806             }
807              
808             This must be overridden in the subclass - it is the method called to
809             actually talk to the server.
810              
811             See L for an example implementation.
812              
813             =head2 protocol()
814              
815             $t->protocol
816              
817             This must return the protocol in use, eg C<"http"> or C<"thrift">. It is
818             used to extract the list of bound addresses from ElasticSearch, eg
819             C or C.
820              
821             =head2 client()
822              
823             $client = $t->client($server)
824              
825             Returns the client object used in L. The server param
826             will look like C<"192.168.5.1:9200">. It should store its clients in a PID
827             specific slot in C<< $t->{_client} >> as C deletes
828             this key.
829              
830             See L and
831             L
832             for an example implementation.
833              
834             =head1 Registering your Transport backend
835              
836             You can register your Transport backend as follows:
837              
838             BEGIN {
839             ElasticSearch::Transport->register('mytransport',__PACKAGE__);
840             }
841              
842             =head1 SEE ALSO
843              
844             =over
845              
846             =item * L
847              
848             =item * L
849              
850             =item * L
851              
852             =item * L
853              
854             =item * L
855              
856             =item * L
857              
858             =item * L
859              
860             =item * L
861              
862             =back
863              
864             =head1 LICENSE AND COPYRIGHT
865              
866             Copyright 2010 - 2011 Clinton Gormley.
867              
868             This program is free software; you can redistribute it and/or modify it
869             under the terms of either: the GNU General Public License as published
870             by the Free Software Foundation; or the Artistic License.
871              
872             See http://dev.perl.org/licenses/ for more information.
873              
874              
875             =cut
876              
877             1;