File Coverage

lib/REST/Neo4p/Batch.pm
Criterion Covered Total %
statement 30 85 35.2
branch 0 38 0.0
condition 0 12 0.0
subroutine 11 13 84.6
pod 1 1 100.0
total 42 149 28.1


line stmt bran cond sub pod time code
1             #$Id$
2 3     3   2164 use v5.10.1;
  3         12  
3             package REST::Neo4p::Batch;
4 3     3   19 use REST::Neo4p::Exceptions;
  3         5  
  3         65  
5 3     3   25 use JSON::XS;
  3         8  
  3         141  
6 3     3   25 use REST::Neo4p::ParseStream;
  3         7  
  3         173  
7 3     3   19 use HOP::Stream qw/drop head/;
  3         5  
  3         179  
8             require REST::Neo4p;
9              
10 3     3   20 use base qw(Exporter);
  3         6  
  3         242  
11 3     3   25 use strict;
  3         6  
  3         80  
12 3     3   15 use warnings;
  3         5  
  3         101  
13 3     3   18 no warnings qw(once);
  3         4  
  3         128  
14              
15             BEGIN {
16 3     3   849 $REST::Neo4p::Batch::VERSION = '0.4000';
17             }
18              
19             our @EXPORT = qw(batch);
20             our @BATCH_ACTIONS = qw(keep_objs discard_objs);
21             our $BUFSIZE = 50000;
22              
23             sub batch (&@) {
24 0     0 1   my ($coderef,$action) = @_;
25 0           my $agent = REST::Neo4p->agent;
26 0 0         if ($agent->is_version_4) {
27 0           REST::Neo4p::NotSuppException->throw("Batch mode not supported on Neo4j server v4.0+");
28             }
29 0           my @errors;
30 0 0         REST::Neo4p::CommException->throw("Not connected\n") unless REST::Neo4p->connected;
31 0 0         warn 'Agent already in batch_mode on batch() call' if ($agent->batch_mode);
32 0 0 0       REST::Neo4p::LocalException->throw("batch requires argument 'keep_objs' or 'discard_objs'\n") unless ($action && grep(/^$action$/,qw/keep_objs discard_objs/));
33 0           $agent->batch_mode(1);
34 0           $coderef->();
35 0           my $tmpfh = $agent->execute_batch_chunk;
36 0           my $jsonr = JSON::XS->new->utf8;
37 0           my $buf;
38 0           $tmpfh->read($buf, $BUFSIZE);
39 0           $jsonr->incr_parse($buf);
40 0           my $res = j_parse($jsonr);
41 0 0         die "j_parse: expecting BATCH stream" unless ($res->[0] eq 'BATCH');
42 0           my $str = $res->[1]->();
43 0           while (my $obj = drop($str)) {
44 3     3   22 use experimental qw/smartmatch/;
  3         6  
  3         35  
45 0           $obj = $obj->[1];
46 0           given ($obj) {
47 0           when (!!ref($obj)) {
48 0 0         if ($obj->{status} !~ m/^2../) {
    0          
49 0 0         warn "Error at id ".$obj->{id}." from ".$obj->{from}.": status ".$obj->{status} if $REST::Neo4p::VERBOSE;
50             push @errors, REST::Neo4p::Neo4jException->new(
51             code=>$obj->{status},
52             message => 'Server returned '.$obj->{status}.' at job id '.$obj->{id}.' from '.$obj->{from}, neo4j_message=>$obj->{message}
53 0           );
54             }
55             elsif (!$obj->{status}) {
56 0           $obj->{status} = 599;
57 0 0         warn "Error at id ".$obj->{id}." from ".$obj->{from}.": status ".$obj->{status} if $REST::Neo4p::VERBOSE;
58             push @errors, REST::Neo4p::Neo4jException->new(
59             code=>$obj->{status},
60             message => 'Server returned no status at job id '.$obj->{id}.' from '.$obj->{from}, neo4j_message=>$obj->{message}
61 0           );
62             }
63             else {
64 0 0         _register_object($obj) if $action eq 'keep_objs';
65             }
66             }
67 0           when ('PENDING') {
68 0           $tmpfh->read($buf,$BUFSIZE);
69 0           $jsonr->incr_parse($buf)
70             }
71 0           when (!defined) {
72 0           last;
73             }
74 0           default {
75 0           die "j_parse: batch response ended prematurely";
76             }
77             }
78              
79             }
80 0           $agent->batch_mode(undef);
81 0           return @errors;
82             }
83              
84             # create new nodes, relationships as they are encountered
85             #
86             # TODO: handling indexes, queries? Prevent queries in batch mode?
87             # TODO: use JSON streaming from file
88              
89             sub _register_object {
90 0     0     my $decoded_batch_resp = shift;
91 0           my ($id, $from, $body) = @{$decoded_batch_resp}{qw(id from body)};
  0            
92 0 0         return unless $body;
93 0 0         return if ($decoded_batch_resp->{status} !~ m/^2../); # ignore an error here
94 0           my $obj;
95 0 0 0       if ($body->{template}) {
    0 0        
    0 0        
    0          
96 0           $obj = REST::Neo4p::Index->new_from_json_response($body);
97             }
98             elsif ($body->{from} and $body->{from} =~ /properties/) {
99 0           1; # ignore
100             }
101             elsif ($body->{self} and $body->{self} =~ m|node/[0-9]+$|) {
102 0           $obj = REST::Neo4p::Node->new_from_json_response($body);
103             }
104             elsif ($body->{self} and $body->{self} =~ m|relationship/[0-9]+$|) {
105 0           $obj = REST::Neo4p::Relationship->new_from_json_response($body);
106             }
107             else {
108 0 0         warn "Don't understand object in batch response: id ".$id if $REST::Neo4p::VERBOSE;
109             }
110 0 0         if ($obj) {
111 0           my $batch_objs = $REST::Neo4p::Entity::ENTITY_TABLE->{batch_objs};
112 0 0         if ( my $batch_obj = delete $batch_objs->{ "{$id}" } ) {
113 0           $$batch_obj = $$obj;
114             }
115             }
116 0           return;
117             }
118              
119             =head1 NAME
120              
121             REST::Neo4p::Batch - Mixin for batch processing
122              
123             =head1 SYNOPSIS
124              
125             use REST::Neo4p;
126             use REST::Neo4p::Batch;
127             use List::MoreUtils qw(pairwise);
128              
129             my @bunch = map { "new_node_$_" } (1..100);
130             my @nodes;
131             batch {
132             my $idx = REST::Neo4p::Index->new('node','bunch');
133             @nodes = map { REST::Neo4p::Node->new({name => $_}) } @bunch;
134             pairwise { $idx->add_entry($a, name => $b) } @nodes, @bunch;
135             $nodes[$_]->relate_to($nodes[$_+1],'next_node') for (0..$#nodes-1);
136             } 'keep_objs';
137              
138             $idx = REST::Neo4p->get_index_by_name('node','bunch');
139             ($the_99th_node) = $nodes[98];
140             ($points_to_100th_node) = $the_99th_node->get_outgoing_relationships;
141             ($the_100th_node) = $idx->find_entries( name => 'new_node_100');
142              
143              
144             =head1 DESCRIPTION
145              
146             REST::Neo4p::Batch adds some syntactic sugar allowing ordinary
147             REST::Neo4p code to be processed through the Neo4j REST batch API.
148              
149             Batch mode is not supported in Neo4j version 4.0+. The methods in this
150             module will barf.
151              
152             =head1 batch {} ($action)
153              
154             To execute server calls generated by REST::Neo4p code,
155             wrap the code in a batch block:
156              
157             batch {
158             # create and manipulate REST::Neo4p objects
159             } $action;
160              
161             The C<$action> parameter B (there is no default) one of
162              
163             =over
164              
165             =item * 'keep_objs'
166              
167             If C is specified, any nodes, relationships or indexes
168             returned in the server reponse will be created in memory as
169             REST::Neo4p objects.
170              
171             =item * 'discard_objs'
172              
173             If C is specified, Neo4j entities in the server response
174             will not be automatically registered as REST::Neo4p objects. Of
175             course, these objects can be retrieved from the server through object
176             creation and other methods, outside of the batch block.
177              
178             #!perl
179             # loader...
180             use REST::Neo4p;
181             use REST::Neo4p::Batch;
182            
183             open $f, shift() or die $!;
184             batch {
185             while (<$f>) {
186             chomp;
187             ($name, $value) = split /\t/;
188             REST::Neo4p::Node->new({name => $name, value => $value});
189             } 'discard_objs';
190             exit(0);
191              
192             =back
193              
194             =head2 Errors in batch jobs
195              
196             C returns returns an array of
197             L error objects for each job that returns
198             a server-generated error. If no errors were encountered, it returns
199             undef.
200              
201             foreach ( batch { _do_stuff() } 'discard_objs' ) {
202             print STDERR $_->message, "(", $_->code, ")\n";
203             }
204              
205             C will C for each error immediately if
206             C<$REST::Neo4p::VERBOSE> is set.
207              
208             =head1 CAVEATS
209              
210             =over
211              
212             =item *
213              
214             No call to the server is made until after the block is executed. There
215             is some magic provided, but not all object functionality is available
216             to REST::Neo4p entities obtained within the C block.
217              
218             For example, this works:
219              
220             my $idx = REST::Neo4p::Index->new('node' => 'pals_of_bob');
221             my $name = 'fred'
222             batch {
223             my $node = REST::Neo4p::Node->new({name => $name});
224             $idx->add_entry($node, name => $name);
225             } 'keep_objs';
226              
227             but this does not:
228              
229             my $idx = REST::Neo4p::Index->new('node' => 'pals_of_bob');
230             my $name = 'fred';
231             batch {
232             my $node = REST::Neo4p::Node->new({name => $name});
233             $idx->add_entry($node, name => $node->get_property('name'));
234             } 'keep_objs';
235              
236             because $node has not been created on the server at the time that
237             add_entry() is executed, so C fails.
238              
239             =back
240              
241             =head1 SEE ALSO
242              
243             L, L
244              
245             =head1 AUTHOR
246              
247             Mark A. Jensen
248             CPAN ID: MAJENSEN
249             majensen -at- cpan -dot- org
250              
251             =head1 LICENSE
252              
253             Copyright (c) 2012-2020 Mark A. Jensen. This program is free software; you
254             can redistribute it and/or modify it under the same terms as Perl
255             itself.
256              
257             =cut
258              
259             1;