File Coverage

lib/Neo4j/Driver/Net/HTTP.pm
Criterion Covered Total %
statement 126 128 98.4
branch 28 36 77.7
condition 29 33 87.8
subroutine 23 23 100.0
pod 0 1 100.0
total 206 221 93.6


line stmt bran cond sub pod time code
1 17     17   304 use 5.010;
  17         54  
2 17     17   84 use strict;
  17         28  
  17         366  
3 17     17   88 use warnings;
  17         31  
  17         481  
4 17     17   140 use utf8;
  17         31  
  17         94  
5              
6             package Neo4j::Driver::Net::HTTP;
7             # ABSTRACT: Network controller for Neo4j HTTP
8             $Neo4j::Driver::Net::HTTP::VERSION = '0.38';
9              
10             # This package is not part of the public Neo4j::Driver API.
11              
12              
13 17     17   1079 use Carp qw(carp croak);
  17         51  
  17         1492  
14             our @CARP_NOT = qw(Neo4j::Driver::Transaction Neo4j::Driver::Transaction::HTTP);
15              
16 17     17   9320 use Time::Piece 1.20 qw();
  17         164500  
  17         544  
17 17     17   120 use URI 1.31;
  17         241  
  17         444  
18              
19 17     17   6048 use Neo4j::Driver::Net::HTTP::LWP;
  17         65  
  17         655  
20 17     17   7566 use Neo4j::Driver::Result::Jolt;
  17         54  
  17         600  
21 17     17   6849 use Neo4j::Driver::Result::JSON;
  17         50  
  17         589  
22 17     17   6215 use Neo4j::Driver::Result::Text;
  17         126  
  17         564  
23 17     17   133 use Neo4j::Driver::ServerInfo;
  17         29  
  17         28357  
24              
25              
26             my $DISCOVERY_ENDPOINT = '/';
27             my $COMMIT_ENDPOINT = 'commit';
28              
29             my @RESULT_MODULES = qw( Neo4j::Driver::Result::Jolt Neo4j::Driver::Result::JSON );
30             my $RESULT_FALLBACK = 'Neo4j::Driver::Result::Text';
31              
32             my $RFC5322_DATE = '%a, %d %b %Y %H:%M:%S %z'; # strftime(3)
33              
34              
35             sub new {
36             # uncoverable pod
37 166     166 0 329 my ($class, $driver) = @_;
38            
39             $driver->{plugins}->{default_handlers}->{http_adapter_factory} //= sub {
40 11   100 11   45 my $net_module = $driver->{net_module} || 'Neo4j::Driver::Net::HTTP::LWP';
41 11         86 return $net_module->new($driver);
42 166   100     876 };
43 166         563 my $http_adapter = $driver->{plugins}->trigger('http_adapter_factory', $driver);
44            
45             my $self = bless {
46             events => $driver->{plugins},
47             cypher_types => $driver->{cypher_types},
48             server_info => $driver->{server_info},
49             http_agent => $http_adapter,
50             want_jolt => $driver->{jolt},
51 162   100     3323 want_concurrent => $driver->{concurrent_tx} // 1,
52             active_tx => {},
53             }, $class;
54            
55 162         759 return $self;
56             }
57              
58              
59             # Use Neo4j Discovery API to obtain both ServerInfo and the
60             # transaction endpoint templates.
61             sub _server {
62 72     72   128 my ($self) = @_;
63            
64 72         134 my ($neo4j_version, $tx_endpoint);
65 72         161 my @discovery_queue = ($DISCOVERY_ENDPOINT);
66 72         171 while (@discovery_queue) {
67 72         161 my $events = $self->{events};
68             my $tx = {
69 2     2   9 error_handler => sub { $events->trigger(error => shift) },
70 72         370 transaction_endpoint => shift @discovery_queue,
71             };
72 72         244 my $service = $self->_request($tx, 'GET')->_json;
73            
74 70         324 $neo4j_version = $service->{neo4j_version};
75 70         121 $tx_endpoint = $service->{transaction};
76 70 50 33     648 last if $neo4j_version && $tx_endpoint;
77            
78             # a different discovery endpoint existed in Neo4j < 4.0
79 0 0       0 if ($service->{data}) {
80 0         0 push @discovery_queue, URI->new( $service->{data} )->path;
81             }
82             }
83            
84 70 50       180 croak "Neo4j server not found (ServerInfo discovery failed)" unless $neo4j_version;
85            
86 70         245 my $date = $self->{http_agent}->date_header;
87 70         1118 $date =~ s/ GMT$/ +0000/;
88 70 50       442 $date = $date ? Time::Piece->strptime($date, $RFC5322_DATE) : Time::Piece->new;
89            
90             $self->{server_info} = Neo4j::Driver::ServerInfo->new({
91             uri => $self->{http_agent}->uri,
92 70         6410 version => "Neo4j/$neo4j_version",
93             time_diff => Time::Piece->new - $date,
94             tx_endpoint => $tx_endpoint,
95             });
96            
97 70         563 return $self->{server_info};
98             }
99              
100              
101             # Update requested database name based on transaction endpoint templates.
102             sub _set_database {
103 154     154   327 my ($self, $database) = @_;
104            
105 154         281 my $tx_endpoint = $self->{server_info}->{tx_endpoint};
106             $self->{endpoints} = {
107 154 50       939 new_transaction => "$tx_endpoint",
108             new_commit => "$tx_endpoint/$COMMIT_ENDPOINT",
109             } if $tx_endpoint;
110            
111 154 100       408 return unless defined $database;
112 151         487 $database = URI::Escape::uri_escape_utf8 $database;
113 151         3766 $self->{endpoints}->{new_transaction} =~ s/\{databaseName}/$database/;
114 151         454 $self->{endpoints}->{new_commit} =~ s/\{databaseName}/$database/;
115             }
116              
117              
118             # Send statements to the Neo4j server and return a list of all results.
119             sub _run {
120 252     252   549 my ($self, $tx, @statements) = @_;
121            
122 252 100       576 if ( ! $self->{want_concurrent} ) {
123 13   100     24 my $is_concurrent = %{$self->{active_tx}} && ! defined $tx->{commit_endpoint};
124 13   100     36 $is_concurrent ||= keys %{$self->{active_tx}} > 1;
  9         30  
125 13 100       109 $is_concurrent and carp "Concurrent transactions for HTTP are disabled; use multiple sessions or enable the concurrent_tx config option (this warning may become fatal in a future Neo4j::Driver version)";
126             }
127            
128 252         5060 my $json = { statements => \@statements };
129 252         648 return $self->_request($tx, 'POST', $json)->_results;
130             }
131              
132              
133             # Determine the Accept HTTP header that is appropriate for the specified
134             # request method. Accept headers are cached in $self->{accept_for}.
135             sub _accept_for {
136 208     208   397 my ($self, $method) = @_;
137            
138             $self->{want_jolt} = 'v1' if ! defined $self->{want_jolt}
139 208 100 100     1230 && $self->{server_info} && $self->{server_info}->{version} =~ m{^Neo4j/4\.[234]\.};
      100        
140            
141             # GET requests may fail if Neo4j sees clients that support Jolt, see neo4j #12644
142 208         483 my @modules = @RESULT_MODULES;
143 208 100       1177 unshift @modules, $self->{http_agent}->result_handlers if $self->{http_agent}->can('result_handlers');
144 208         638 my @accept = map { $_->_accept_header( $self->{want_jolt}, $method ) } @modules;
  416         1667  
145 208         1146 return $self->{accept_for}->{$method} = join ', ', @accept;
146             }
147              
148              
149             # Determine a result handler module that is appropriate for the specified
150             # media type. Result handlers are cached in $self->{result_module_for}.
151             sub _result_module_for {
152 192     192   402 my ($self, $content_type) = @_;
153            
154 192         382 my @modules = @RESULT_MODULES;
155 192 100       885 unshift @modules, $self->{http_agent}->result_handlers if $self->{http_agent}->can('result_handlers');
156 192         484 foreach my $module (@modules) {
157 320 100       1228 if ($module->_acceptable($content_type)) {
158 181         875 return $self->{result_module_for}->{$content_type} = $module;
159             }
160             }
161 11         38 return $RESULT_FALLBACK;
162             }
163              
164              
165             # Send a HTTP request to the Neo4j server and return a representation
166             # of the response.
167             sub _request {
168 336     336   686 my ($self, $tx, $method, $json) = @_;
169            
170 336 100       762 if (! defined $tx->{transaction_endpoint}) {
171 46         251 $tx->{transaction_endpoint} = URI->new( $self->{endpoints}->{new_transaction} )->path;
172             }
173 336         4376 my $tx_endpoint = "$tx->{transaction_endpoint}";
174 336   100     1230 my $accept = $self->{accept_for}->{$method}
175             // $self->_accept_for($method);
176            
177 336         1439 $self->{http_agent}->request($method, $tx_endpoint, $json, $accept, $tx->{mode});
178            
179 336         117056 my $header = $self->{http_agent}->http_header;
180 336         913954 $tx->{closed} = $header->{success}; # see _parse_tx_status() and neo4j #12651
181             my $result_module = $self->{result_module_for}->{ $header->{content_type} }
182 336   66     1451 // $self->_result_module_for( $header->{content_type} );
183            
184             my $result = $result_module->new({
185             http_agent => $self->{http_agent},
186             http_method => $method,
187             http_path => $tx_endpoint,
188             http_header => $header,
189             error_handler => $tx->{error_handler},
190             cypher_types => $self->{cypher_types},
191             server_info => $self->{server_info},
192 336 100       3368 statements => $json ? $json->{statements} : [],
193             });
194            
195 310         1400 $self->_parse_tx_status($tx, $header, $result->_info);
196 310         1624 return $result;
197             }
198              
199              
200             # Update list of active transactions and update transaction endpoints.
201             sub _parse_tx_status {
202 310     310   592 my ($self, $tx, $header, $info) = @_;
203            
204 310         496 $tx->{unused} = 0;
205 310   66     924 $tx->{closed} = ! $info->{commit} || ! $info->{transaction};
206            
207 310 100       633 if ( $tx->{closed} ) {
208 228         385 my $old_endpoint = $tx->{transaction_endpoint};
209 228         1577 $old_endpoint =~ s|/$COMMIT_ENDPOINT$||; # both endpoints may be set to /commit (for autocommit), so we need to remove that here
210 228         505 delete $self->{active_tx}->{ $old_endpoint };
211 228         416 return;
212             }
213 82 100 100     359 if ( $header->{location} && $header->{status} eq '201' ) { # Created
214 47         257 my $new_commit = URI->new( $info->{commit} )->path_query;
215 47         4844 my $new_endpoint = URI->new( $header->{location} )->path_query;
216 47         3301 $tx->{commit_endpoint} = $new_commit;
217 47         102 $tx->{transaction_endpoint} = $new_endpoint;
218             }
219 82 50       234 if ( my $expires = $info->{transaction}->{expires} ) {
220 82         325 $expires =~ s/ GMT$/ +0000/;
221 82         419 $expires = Time::Piece->strptime($expires, $RFC5322_DATE) + $self->{server_info}->{time_diff};
222 82         13226 $self->{active_tx}->{ $tx->{transaction_endpoint} } = $expires;
223             }
224             }
225              
226              
227             # Query list of active transactions, removing expired ones.
228             sub _is_active_tx {
229 47     47   100 my ($self, $tx) = @_;
230            
231 47         179 my $now = Time::Piece->new;
232 47         3613 foreach my $tx_key ( keys %{$self->{active_tx}} ) {
  47         213  
233 55         285 my $expires = $self->{active_tx}->{$tx_key};
234 55 50       225 delete $self->{active_tx}->{$tx_key} if $now > $expires;
235             }
236            
237 47         1221 my $tx_endpoint = $tx->{transaction_endpoint};
238 47         396 $tx_endpoint =~ s|/$COMMIT_ENDPOINT$||; # for tx in the (auto)commit state, both endpoints are set to commit
239 47         293 return exists $self->{active_tx}->{ $tx_endpoint };
240             }
241              
242              
243             1;