File Coverage

lib/Neo4j/Driver/Net/HTTP.pm
Criterion Covered Total %
statement 130 132 98.4
branch 35 44 79.5
condition 29 33 87.8
subroutine 23 23 100.0
pod 0 1 100.0
total 217 233 93.5


line stmt bran cond sub pod time code
1 17     17   307 use 5.010;
  17         53  
2 17     17   123 use strict;
  17         35  
  17         364  
3 17     17   87 use warnings;
  17         29  
  17         449  
4 17     17   93 use utf8;
  17         30  
  17         81  
5              
6             package Neo4j::Driver::Net::HTTP;
7             # ABSTRACT: Network controller for Neo4j HTTP
8             $Neo4j::Driver::Net::HTTP::VERSION = '0.39';
9              
10             # This package is not part of the public Neo4j::Driver API.
11              
12              
13 17     17   1116 use Carp qw(carp croak);
  17         56  
  17         1584  
14             our @CARP_NOT = qw(Neo4j::Driver::Transaction Neo4j::Driver::Transaction::HTTP);
15              
16 17     17   9158 use Time::Piece 1.20 qw();
  17         168201  
  17         623  
17 17     17   179 use URI 1.31;
  17         299  
  17         437  
18              
19 17     17   6297 use Neo4j::Driver::Net::HTTP::LWP;
  17         52  
  17         706  
20 17     17   7591 use Neo4j::Driver::Result::Jolt;
  17         42  
  17         654  
21 17     17   6890 use Neo4j::Driver::Result::JSON;
  17         50  
  17         606  
22 17     17   6224 use Neo4j::Driver::Result::Text;
  17         118  
  17         564  
23 17     17   153 use Neo4j::Driver::ServerInfo;
  17         33  
  17         29909  
24              
25              
26             my $DISCOVERY_ENDPOINT = '/';
27             my $COMMIT_ENDPOINT = 'commit';
28              
29             my @RESULT_MODULES = qw( Neo4j::Driver::Result::Jolt Neo4j::Driver::Result::JSON );
30             my $RESULT_FALLBACK = 'Neo4j::Driver::Result::Text';
31              
32             my $RFC5322_DATE = '%a, %d %b %Y %H:%M:%S %z'; # strftime(3)
33              
34              
35             sub new {
36             # uncoverable pod
37 167     167 0 399 my ($class, $driver) = @_;
38            
39             $driver->{plugins}->{default_handlers}->{http_adapter_factory} //= sub {
40 11   100 11   37 my $net_module = $driver->config('net_module') || 'Neo4j::Driver::Net::HTTP::LWP';
41 11         89 return $net_module->new($driver);
42 167   100     936 };
43 167         601 my $http_adapter = $driver->{plugins}->trigger('http_adapter_factory', $driver);
44            
45             my $self = bless {
46             events => $driver->{plugins},
47             cypher_types => $driver->config('cypher_types'),
48             server_info => $driver->{server_info},
49 163   100     2734 http_agent => $http_adapter,
50             want_jolt => $driver->config('jolt'),
51             want_concurrent => $driver->config('concurrent_tx') // 0,
52             active_tx => {},
53             }, $class;
54            
55 163         831 return $self;
56             }
57              
58              
59             # Use Neo4j Discovery API to obtain both ServerInfo and the
60             # transaction endpoint templates.
61             sub _server {
62 72     72   141 my ($self) = @_;
63            
64 72         166 my ($neo4j_version, $tx_endpoint);
65 72         190 my @discovery_queue = ($DISCOVERY_ENDPOINT);
66 72         190 while (@discovery_queue) {
67 72         158 my $events = $self->{events};
68             my $tx = {
69 2     2   8 error_handler => sub { $events->trigger(error => shift) },
70 72         449 transaction_endpoint => shift @discovery_queue,
71             };
72 72         251 my $service = $self->_request($tx, 'GET')->_json;
73            
74 70         269 $neo4j_version = $service->{neo4j_version};
75 70         121 $tx_endpoint = $service->{transaction};
76 70 50 33     531 last if $neo4j_version && $tx_endpoint;
77            
78             # a different discovery endpoint existed in Neo4j < 4.0
79 0 0       0 if ($service->{data}) {
80 0         0 push @discovery_queue, URI->new( $service->{data} )->path;
81             }
82             }
83            
84 70 50       183 croak "Neo4j server not found (ServerInfo discovery failed)" unless $neo4j_version;
85            
86 70         242 my $date = $self->{http_agent}->date_header;
87 70         1133 $date =~ s/ GMT$/ +0000/;
88 70 50       438 $date = $date ? Time::Piece->strptime($date, $RFC5322_DATE) : Time::Piece->new;
89            
90             $self->{server_info} = Neo4j::Driver::ServerInfo->new({
91             uri => $self->{http_agent}->uri,
92 70         6699 version => "Neo4j/$neo4j_version",
93             time_diff => Time::Piece->new - $date,
94             tx_endpoint => $tx_endpoint,
95             });
96            
97 70         579 return $self->{server_info};
98             }
99              
100              
101             # Update requested database name based on transaction endpoint templates.
102             sub _set_database {
103 155     155   332 my ($self, $database) = @_;
104            
105 155         320 my $tx_endpoint = $self->{server_info}->{tx_endpoint};
106             $self->{endpoints} = {
107 155 50       1025 new_transaction => "$tx_endpoint",
108             new_commit => "$tx_endpoint/$COMMIT_ENDPOINT",
109             } if $tx_endpoint;
110            
111 155 100       404 return unless defined $database;
112 152         559 $database = URI::Escape::uri_escape_utf8 $database;
113 152         3888 $self->{endpoints}->{new_transaction} =~ s/\{databaseName}/$database/;
114 152         461 $self->{endpoints}->{new_commit} =~ s/\{databaseName}/$database/;
115             }
116              
117              
118             # Send statements to the Neo4j server and return a list of all results.
119             sub _run {
120 254     254   600 my ($self, $tx, @statements) = @_;
121            
122 254 100 100     312 if ( %{$self->{active_tx}} && ! $self->{want_concurrent} ) {
  254         885  
123 36   100     153 my $is_concurrent = ! defined $tx->{commit_endpoint} || keys %{$self->{active_tx}} > 1;
124 36 100       212 $is_concurrent and carp "Concurrent transactions for HTTP are disabled; use multiple sessions or enable the concurrent_tx config option (this warning will be fatal in Neo4j::Driver 1.xx)";
125             }
126            
127 254         6434 my $json = { statements => \@statements };
128 254         826 return $self->_request($tx, 'POST', $json)->_results;
129             }
130              
131              
132             # Determine the Accept HTTP header that is appropriate for the specified
133             # request method. Accept headers are cached in $self->{accept_for}.
134             sub _accept_for {
135 209     209   451 my ($self, $method) = @_;
136            
137             $self->{want_jolt} = 'v1' if ! defined $self->{want_jolt}
138 209 100 100     1418 && $self->{server_info} && $self->{server_info}->{version} =~ m{^Neo4j/4\.[234]\.};
      100        
139            
140             # GET requests may fail if Neo4j sees clients that support Jolt, see neo4j #12644
141 209         524 my @modules = @RESULT_MODULES;
142 209 100       1228 unshift @modules, $self->{http_agent}->result_handlers if $self->{http_agent}->can('result_handlers');
143 209         597 my @accept = map { $_->_accept_header( $self->{want_jolt}, $method ) } @modules;
  418         1752  
144 209         1191 return $self->{accept_for}->{$method} = join ', ', @accept;
145             }
146              
147              
148             # Determine a result handler module that is appropriate for the specified
149             # media type. Result handlers are cached in $self->{result_module_for}.
150             sub _result_module_for {
151 193     193   784 my ($self, $content_type) = @_;
152            
153 193         401 my @modules = @RESULT_MODULES;
154 193 100       896 unshift @modules, $self->{http_agent}->result_handlers if $self->{http_agent}->can('result_handlers');
155 193         538 foreach my $module (@modules) {
156 322 100       1236 if ($module->_acceptable($content_type)) {
157 182         955 return $self->{result_module_for}->{$content_type} = $module;
158             }
159             }
160 11         39 return $RESULT_FALLBACK;
161             }
162              
163              
164             # Send a HTTP request to the Neo4j server and return a representation
165             # of the response.
166             sub _request {
167 338     338   767 my ($self, $tx, $method, $json) = @_;
168            
169 338 100       764 if (! defined $tx->{transaction_endpoint}) {
170 47         259 $tx->{transaction_endpoint} = URI->new( $self->{endpoints}->{new_transaction} )->path;
171             }
172 338         4876 my $tx_endpoint = "$tx->{transaction_endpoint}";
173 338   100     1339 my $accept = $self->{accept_for}->{$method}
174             // $self->_accept_for($method);
175            
176 338         1469 $self->{http_agent}->request($method, $tx_endpoint, $json, $accept, $tx->{mode});
177            
178 338         122345 my $header = $self->{http_agent}->http_header;
179             my $result_module = $self->{result_module_for}->{ $header->{content_type} }
180 338   66     918824 // $self->_result_module_for( $header->{content_type} );
181            
182             my $result = $result_module->new({
183             http_agent => $self->{http_agent},
184             http_method => $method,
185             http_path => $tx_endpoint,
186             http_header => $header,
187             cypher_types => $self->{cypher_types},
188             server_info => $self->{server_info},
189 338 100       3047 statements => $json ? $json->{statements} : [],
190             });
191            
192 336         1526 my $info = $result->_info;
193 336         1046 $self->_parse_tx_status($tx, $header, $info);
194 336 100       969 $tx->{error_handler}->($info->{_error}) if $info->{_error};
195 310         4003 return $result;
196             }
197              
198              
199             # Update list of active transactions and update transaction endpoints.
200             sub _parse_tx_status {
201 336     336   664 my ($self, $tx, $header, $info) = @_;
202            
203             # In case of errors, HTTP transaction status info is only reliable for
204             # server errors that aren't reported as network errors. (neo4j #12651)
205 336 100       845 if (my $error = $info->{_error}) {
206 28 100       91 return if $error->source ne 'Server';
207 15 50       76 do { return if $error->source eq 'Network' } while $error = $error->related;
  15         47  
208             }
209            
210 323         733 $tx->{unused} = 0;
211 323   66     1077 $tx->{closed} = ! $info->{commit} || ! $info->{transaction};
212            
213 323 100       690 if ( $tx->{closed} ) {
214 261         455 my $old_endpoint = $tx->{transaction_endpoint};
215 261         1768 $old_endpoint =~ s|/$COMMIT_ENDPOINT$||; # both endpoints may be set to /commit (for autocommit), so we need to remove that here
216 261         552 delete $self->{active_tx}->{ $old_endpoint };
217 261         509 return;
218             }
219 62 100 100     345 if ( $header->{location} && $header->{status} eq '201' ) { # Created
220 49         263 my $new_commit = URI->new( $info->{commit} )->path_query;
221 49         5315 my $new_endpoint = URI->new( $header->{location} )->path_query;
222 49         3550 $tx->{commit_endpoint} = $new_commit;
223 49         172 $tx->{transaction_endpoint} = $new_endpoint;
224             }
225 62 50       227 if ( my $expires = $info->{transaction}->{expires} ) {
226 62         209 $expires =~ s/ GMT$/ +0000/;
227 62         364 $expires = Time::Piece->strptime($expires, $RFC5322_DATE) + $self->{server_info}->{time_diff};
228 62         11307 $self->{active_tx}->{ $tx->{transaction_endpoint} } = $expires;
229             }
230             }
231              
232              
233             # Query list of active transactions, removing expired ones.
234             sub _is_active_tx {
235 47     47   132 my ($self, $tx) = @_;
236            
237 47         179 my $now = Time::Piece->new;
238 47         3651 foreach my $tx_key ( keys %{$self->{active_tx}} ) {
  47         203  
239 54         343 my $expires = $self->{active_tx}->{$tx_key};
240 54 50       233 delete $self->{active_tx}->{$tx_key} if $now > $expires;
241             }
242            
243 47         1306 my $tx_endpoint = $tx->{transaction_endpoint};
244 47         430 $tx_endpoint =~ s|/$COMMIT_ENDPOINT$||; # for tx in the (auto)commit state, both endpoints are set to commit
245 47         272 return exists $self->{active_tx}->{ $tx_endpoint };
246             }
247              
248              
249             1;