File Coverage

lib/Neo4j/Driver/Net/HTTP.pm
Criterion Covered Total %
statement 130 132 98.4
branch 35 44 79.5
condition 29 33 87.8
subroutine 23 23 100.0
pod 0 1 100.0
total 217 233 93.5


line stmt bran cond sub pod time code
1 17     17   309 use 5.010;
  17         56  
2 17     17   90 use strict;
  17         39  
  17         349  
3 17     17   87 use warnings;
  17         27  
  17         433  
4 17     17   104 use utf8;
  17         26  
  17         99  
5              
6             package Neo4j::Driver::Net::HTTP;
7             # ABSTRACT: Network controller for Neo4j HTTP
8             $Neo4j::Driver::Net::HTTP::VERSION = '0.40';
9              
10             # This package is not part of the public Neo4j::Driver API.
11              
12              
13 17     17   1148 use Carp qw(carp croak);
  17         70  
  17         1674  
14             our @CARP_NOT = qw(Neo4j::Driver::Transaction Neo4j::Driver::Transaction::HTTP);
15              
16 17     17   9675 use Time::Piece 1.20 qw();
  17         168149  
  17         550  
17 17     17   142 use URI 1.31;
  17         225  
  17         396  
18              
19 17     17   6017 use Neo4j::Driver::Net::HTTP::LWP;
  17         56  
  17         646  
20 17     17   7579 use Neo4j::Driver::Result::Jolt;
  17         46  
  17         700  
21 17     17   7026 use Neo4j::Driver::Result::JSON;
  17         55  
  17         581  
22 17     17   6285 use Neo4j::Driver::Result::Text;
  17         100  
  17         650  
23 17     17   123 use Neo4j::Driver::ServerInfo;
  17         37  
  17         30122  
24              
25              
26             my $DISCOVERY_ENDPOINT = '/';
27             my $COMMIT_ENDPOINT = 'commit';
28              
29             my @RESULT_MODULES = qw( Neo4j::Driver::Result::Jolt Neo4j::Driver::Result::JSON );
30             my $RESULT_FALLBACK = 'Neo4j::Driver::Result::Text';
31              
32             my $RFC5322_DATE = '%a, %d %b %Y %H:%M:%S %z'; # strftime(3)
33              
34              
35             sub new {
36             # uncoverable pod
37 167     167 0 420 my ($class, $driver) = @_;
38            
39             $driver->{plugins}->{default_handlers}->{http_adapter_factory} //= sub {
40 11   100 11   34 my $net_module = $driver->config('net_module') || 'Neo4j::Driver::Net::HTTP::LWP';
41 11         91 return $net_module->new($driver);
42 167   100     910 };
43 167         562 my $http_adapter = $driver->{plugins}->trigger('http_adapter_factory', $driver);
44            
45             my $self = bless {
46             events => $driver->{plugins},
47             cypher_types => $driver->config('cypher_types'),
48             server_info => $driver->{server_info},
49 163   100     2721 http_agent => $http_adapter,
50             want_jolt => $driver->config('jolt'),
51             want_concurrent => $driver->config('concurrent_tx') // 0,
52             active_tx => {},
53             }, $class;
54            
55 163         848 return $self;
56             }
57              
58              
59             # Use Neo4j Discovery API to obtain both ServerInfo and the
60             # transaction endpoint templates.
61             sub _server {
62 72     72   143 my ($self) = @_;
63            
64 72         121 my ($neo4j_version, $tx_endpoint);
65 72         179 my @discovery_queue = ($DISCOVERY_ENDPOINT);
66 72         237 while (@discovery_queue) {
67 72         172 my $events = $self->{events};
68             my $tx = {
69 2     2   9 error_handler => sub { $events->trigger(error => shift) },
70 72         396 transaction_endpoint => shift @discovery_queue,
71             };
72 72         240 my $service = $self->_request($tx, 'GET')->_json;
73            
74 70         270 $neo4j_version = $service->{neo4j_version};
75 70         117 $tx_endpoint = $service->{transaction};
76 70 50 33     473 last if $neo4j_version && $tx_endpoint;
77            
78             # a different discovery endpoint existed in Neo4j < 4.0
79 0 0       0 if ($service->{data}) {
80 0         0 push @discovery_queue, URI->new( $service->{data} )->path;
81             }
82             }
83            
84 70 50       210 croak "Neo4j server not found (ServerInfo discovery failed)" unless $neo4j_version;
85            
86 70         227 my $date = $self->{http_agent}->date_header;
87 70         1052 $date =~ s/ GMT$/ +0000/;
88 70 50       396 $date = $date ? Time::Piece->strptime($date, $RFC5322_DATE) : Time::Piece->new;
89            
90             $self->{server_info} = Neo4j::Driver::ServerInfo->new({
91             uri => $self->{http_agent}->uri,
92 70         6621 version => "Neo4j/$neo4j_version",
93             time_diff => Time::Piece->new - $date,
94             tx_endpoint => $tx_endpoint,
95             });
96            
97 70         501 return $self->{server_info};
98             }
99              
100              
101             # Update requested database name based on transaction endpoint templates.
102             sub _set_database {
103 155     155   316 my ($self, $database) = @_;
104            
105 155         319 my $tx_endpoint = $self->{server_info}->{tx_endpoint};
106             $self->{endpoints} = {
107 155 50       899 new_transaction => "$tx_endpoint",
108             new_commit => "$tx_endpoint/$COMMIT_ENDPOINT",
109             } if $tx_endpoint;
110            
111 155 100       352 return unless defined $database;
112 152         490 $database = URI::Escape::uri_escape_utf8 $database;
113 152         3823 $self->{endpoints}->{new_transaction} =~ s/\{databaseName}/$database/;
114 152         461 $self->{endpoints}->{new_commit} =~ s/\{databaseName}/$database/;
115             }
116              
117              
118             # Send statements to the Neo4j server and return a list of all results.
119             sub _run {
120 254     254   599 my ($self, $tx, @statements) = @_;
121            
122 254 100 100     303 if ( %{$self->{active_tx}} && ! $self->{want_concurrent} ) {
  254         794  
123 36   100     135 my $is_concurrent = ! defined $tx->{commit_endpoint} || keys %{$self->{active_tx}} > 1;
124 36 100       234 $is_concurrent and carp "Concurrent transactions for HTTP are disabled; use multiple sessions or enable the concurrent_tx config option (this warning will be fatal in Neo4j::Driver 1.xx)";
125             }
126            
127 254         6514 my $json = { statements => \@statements };
128 254         777 return $self->_request($tx, 'POST', $json)->_results;
129             }
130              
131              
132             # Determine the Accept HTTP header that is appropriate for the specified
133             # request method. Accept headers are cached in $self->{accept_for}.
134             sub _accept_for {
135 209     209   414 my ($self, $method) = @_;
136            
137             $self->{want_jolt} = 'v1' if ! defined $self->{want_jolt}
138 209 100 100     1321 && $self->{server_info} && $self->{server_info}->{version} =~ m{^Neo4j/4\.[234]\.};
      100        
139            
140             # GET requests may fail if Neo4j sees clients that support Jolt, see neo4j #12644
141 209         491 my @modules = @RESULT_MODULES;
142 209 100       1235 unshift @modules, $self->{http_agent}->result_handlers if $self->{http_agent}->can('result_handlers');
143 209         584 my @accept = map { $_->_accept_header( $self->{want_jolt}, $method ) } @modules;
  418         1548  
144 209         1274 return $self->{accept_for}->{$method} = join ', ', @accept;
145             }
146              
147              
148             # Determine a result handler module that is appropriate for the specified
149             # media type. Result handlers are cached in $self->{result_module_for}.
150             sub _result_module_for {
151 193     193   398 my ($self, $content_type) = @_;
152            
153 193         372 my @modules = @RESULT_MODULES;
154 193 100       900 unshift @modules, $self->{http_agent}->result_handlers if $self->{http_agent}->can('result_handlers');
155 193         504 foreach my $module (@modules) {
156 322 100       1177 if ($module->_acceptable($content_type)) {
157 182         908 return $self->{result_module_for}->{$content_type} = $module;
158             }
159             }
160 11         66 return $RESULT_FALLBACK;
161             }
162              
163              
164             # Send a HTTP request to the Neo4j server and return a representation
165             # of the response.
166             sub _request {
167 338     338   679 my ($self, $tx, $method, $json) = @_;
168            
169 338 100       786 if (! defined $tx->{transaction_endpoint}) {
170 47         305 $tx->{transaction_endpoint} = URI->new( $self->{endpoints}->{new_transaction} )->path;
171             }
172 338         4695 my $tx_endpoint = "$tx->{transaction_endpoint}";
173 338   100     1238 my $accept = $self->{accept_for}->{$method}
174             // $self->_accept_for($method);
175            
176 338         1366 $self->{http_agent}->request($method, $tx_endpoint, $json, $accept, $tx->{mode});
177            
178 338         122246 my $header = $self->{http_agent}->http_header;
179             my $result_module = $self->{result_module_for}->{ $header->{content_type} }
180 338   66     923086 // $self->_result_module_for( $header->{content_type} );
181            
182             my $result = $result_module->new({
183             http_agent => $self->{http_agent},
184             http_method => $method,
185             http_path => $tx_endpoint,
186             http_header => $header,
187             cypher_types => $self->{cypher_types},
188             server_info => $self->{server_info},
189 338 100       2828 statements => $json ? $json->{statements} : [],
190             });
191            
192 336         1363 my $info = $result->_info;
193 336         1053 $self->_parse_tx_status($tx, $header, $info);
194 336 100       886 $tx->{error_handler}->($info->{_error}) if $info->{_error};
195 310         3854 return $result;
196             }
197              
198              
199             # Update list of active transactions and update transaction endpoints.
200             sub _parse_tx_status {
201 336     336   636 my ($self, $tx, $header, $info) = @_;
202            
203             # In case of errors, HTTP transaction status info is only reliable for
204             # server errors that aren't reported as network errors. (neo4j #12651)
205 336 100       810 if (my $error = $info->{_error}) {
206 28 100       80 return if $error->source ne 'Server';
207 15 50       82 do { return if $error->source eq 'Network' } while $error = $error->related;
  15         45  
208             }
209            
210 323         656 $tx->{unused} = 0;
211 323   66     1003 $tx->{closed} = ! $info->{commit} || ! $info->{transaction};
212            
213 323 100       694 if ( $tx->{closed} ) {
214 261         430 my $old_endpoint = $tx->{transaction_endpoint};
215 261         1662 $old_endpoint =~ s|/$COMMIT_ENDPOINT$||; # both endpoints may be set to /commit (for autocommit), so we need to remove that here
216 261         576 delete $self->{active_tx}->{ $old_endpoint };
217 261         494 return;
218             }
219 62 100 100     299 if ( $header->{location} && $header->{status} eq '201' ) { # Created
220 49         216 my $new_commit = URI->new( $info->{commit} )->path_query;
221 49         4906 my $new_endpoint = URI->new( $header->{location} )->path_query;
222 49         3508 $tx->{commit_endpoint} = $new_commit;
223 49         132 $tx->{transaction_endpoint} = $new_endpoint;
224             }
225 62 50       185 if ( my $expires = $info->{transaction}->{expires} ) {
226 62         169 $expires =~ s/ GMT$/ +0000/;
227 62         346 $expires = Time::Piece->strptime($expires, $RFC5322_DATE) + $self->{server_info}->{time_diff};
228 62         10531 $self->{active_tx}->{ $tx->{transaction_endpoint} } = $expires;
229             }
230             }
231              
232              
233             # Query list of active transactions, removing expired ones.
234             sub _is_active_tx {
235 47     47   123 my ($self, $tx) = @_;
236            
237 47         164 my $now = Time::Piece->new;
238 47         3597 foreach my $tx_key ( keys %{$self->{active_tx}} ) {
  47         229  
239 54         274 my $expires = $self->{active_tx}->{$tx_key};
240 54 50       225 delete $self->{active_tx}->{$tx_key} if $now > $expires;
241             }
242            
243 47         1244 my $tx_endpoint = $tx->{transaction_endpoint};
244 47         399 $tx_endpoint =~ s|/$COMMIT_ENDPOINT$||; # for tx in the (auto)commit state, both endpoints are set to commit
245 47         262 return exists $self->{active_tx}->{ $tx_endpoint };
246             }
247              
248              
249             1;