File Coverage

blib/lib/REST/Neo4p/ParseStream.pm
Criterion Covered Total %
statement 135 149 90.6
branch 42 62 67.7
condition 12 15 80.0
subroutine 14 14 100.0
pod 0 3 0.0
total 203 243 83.5


line stmt bran cond sub pod time code
1 38     38   72797 use v5.10.1;
  38         152  
2             package REST::Neo4p::ParseStream;
3 38     38   229 use base Exporter;
  38         61  
  38         2977  
4 38     38   20174 use HOP::Stream qw/node promise/;
  38         62047  
  38         2686  
5              
6 38     38   280 use strict;
  38         71  
  38         739  
7 38     38   179 use warnings;
  38         77  
  38         1192  
8              
9             BEGIN {
10 38     38   5956 $REST::Neo4p::ParseStream::VERSION = '0.4000';
11             }
12              
13             our @EXPORT = qw/j_parse/;# j_parse_object j_parse_array /;
14              
15             # lazy linked lists
16             # qry response in txn - "data":[{ "row" : [...]},...]
17             # qry response for qry - "data":[ [{...},...],... ]
18             # so - for txn response, j_parse_query_response returns hashes,
19             # for qry response, j_parse_query_response returns arrays
20              
21             sub j_parse {
22 15     15 0 4879 my $j = shift;
23 15         32 state $state; # undef == first call?
24 15 100       491 if ($j->incr_text =~ s/^\s*\[\s*//) {
    50          
    0          
25             # batch (simple array of objects)
26 1         5 return ['BATCH', j_parse_array($j)];
27             }
28             elsif ($j->incr_text =~ s/^\s*{\s*//) {
29             # object
30 14         30 my $type;
31 38     38   1516 use experimental 'smartmatch';
  38         7230  
  38         304  
32 14         52 given ($j->incr_text) {
33 14         64 when (/^\s*"commit"/i) {
34 8         30 $type = 'TXN';
35             }
36 6         18 when (/^\s*"columns"/i) {
37 5         14 $type = 'QUERY';
38             }
39 1         3 default {
40 1         3 $type = 'OBJECT';
41             }
42             }
43 14         55 return [$type, j_parse_object($j)]
44             }
45             elsif ($j->incr_text =~ m/^\s*"[a-zA-Z_]+"/) {
46             # after a stream, next key of object
47 0         0 return ['OBJECT', j_parse_object($j)];
48             }
49             else {
50             # problem
51 0         0 return;
52             }
53             }
54              
55             # generic parse array stream
56             # return decoded json entities at the top level
57             # opening [ must be removed
58             # handle empty array too
59             sub j_parse_array {
60 17     17 0 49 my $j = shift;
61 17         31 my $po;
62             $po = sub {
63 474     474   6298 my $elt;
64 474         762 my $int_po = $po;
65 474         615 state $last_text;
66 474         719 my $done = eval {$j->incr_text =~ s/^\s*\]\s*//};
  474         15362  
67 474 100       1281 return node(undef,undef) if $done;
68 470         702 eval {
69 470         6307 $elt = $j->incr_parse;
70             };
71 470 100       1141 if (defined $elt) {
    100          
72 412         1373 $last_text = $j->incr_text;
73 412 100       1630 if ($j->incr_text =~ m/^\]}\],/) { # JSON::XS <=3.04 transaction kludge
    100          
74 5         39 $j->incr_text =~ s/(\])(}\],)//;
75 5         14 $done=1;
76             }
77             elsif ($elt eq 'transaction') { # JSON::XS 4.02 transaction kludge
78 4         24 $j->incr_text = '"transaction"'.$j->incr_text;
79 4         18 return node(undef,undef);
80             }
81             else {
82 403         10260 $j->incr_text =~ s/^(\s*,\s*)|(\s*\]\s*)//;
83 403         1190 $done = !!$2;
84             }
85              
86             }
87             elsif ($@) {
88 1 50 33     15 if ($@ =~ /already started parsing/) {
    50          
89 0         0 $elt = 'PENDING';
90             }
91             elsif ($@ =~ /must be an object or array/ &&
92             $last_text =~ /("[^"]+")/) {
93             # txn kludge
94 0         0 $j->incr_skip;
95 0         0 $j->incr_text = $1.$j->incr_text;
96 0         0 return node(undef,undef);
97             }
98             else {
99 1         7 die "j_parse: $@";
100             }
101             }
102             else {
103 57         103 $elt = 'PENDING';
104             }
105 465 100       2628 node(['ARELT'=>$elt], $done ? undef : promise { $po->() });
  429         213451  
106 17         97 };
107 17         55 return $po;
108             }
109              
110             # generic parse object to one level
111             # opening { must be removed
112             # when another stream is returned as the value,
113             # another call to j_parse_object has to wait until that stream is
114             # exhausted...
115             sub j_parse_object {
116 23     23 0 46 my $j = shift;
117 23         39 my $po;
118             $po = sub {
119 84     84   1268 state $key;
120 84         140 state $current = '';
121 84         225 my ($text, $head,$obj);
122 84         0 my $done;
123 84 50       258 unless ($current eq 'PENDING') {
124 84         122 my $m;
125 38     38   28243 use experimental 'smartmatch';
  38         84  
  38         192  
126 84         139 eval {
127 84         646 $j->incr_text =~ m/^(?:(\s*"([^"]+)"\s*:\s*)|(\s*}\s*))/; # look ahead
128 84   100     357 $m = $2||$3;
129 84 100 100     409 if ($m && ($m eq 'columns') && ($current eq 'RESULTS_STREAM')) {
      100        
130             # if this is a 'results' item, don't eat the 'columns',
131             # let the results stream do it
132 8         19 $m = undef;
133             }
134             else {
135 76         861 $j->incr_text =~ s/^(?:(\s*"([^"]+)"\s*:\s*)|(\s*}\s*))//; # consume
136             }
137              
138             };
139              
140 84 100 66     399 if ($@ =~ /already started parsing/ || !$m) {
    50          
141             # either another function instance is in the middle of parsing,
142             # or no key was found (which is the same thing)
143             # so report where this instance is:
144 25         169 return node([$key => $current], promise { $po->() });
  10         7445  
145             }
146             elsif ($@) {
147 0         0 die "j_parse: incr parser error: $@";
148             }
149 59         120 $key = $m;
150             }
151 38     38   10945 use experimental 'smartmatch';
  38         132  
  38         183  
152 59         88 given ($key) {
153 59         200 when ('columns') {
154 14         27 eval {
155 14         78 $obj = $j->incr_parse;
156             };
157 14 50       49 die "j_parse: incr parser error: $@" if $@;
158 14         37 $current = 'COMPLETE';
159             }
160 45         103 when (/commit/) {
161 8         110 $j->incr_text =~ s/^"([^"]+)"\s*,?\s*//;
162 8         26 $obj = $1;
163 8         23 $current = 'COMPLETE';
164             }
165 37         82 when (/transaction/) {
166 6         14 eval {
167 6         39 $obj = $j->incr_parse; # get txn info obj
168             };
169 6 50       24 die "j_parse: incr parser error: $@" if $@;
170 6         23 $current = 'COMPLETE';
171             }
172 31         61 when ('data') {
173 12 100       216 if ($j->incr_text =~ s/^\[\s*//) {
174 10         46 $obj = j_parse_array($j);
175 10         33 $current = 'DATA_STREAM';
176             }
177             else {
178 2         24 die "j_parse: expecting an array value for 'data' key";
179             }
180             }
181 19         45 when ('results') {
182 8 50       161 if ($j->incr_text =~ s/^\[\s*//) {
    0          
183 8         106 $j->incr_text =~ s/^\s*{\s*//;
184 8         24 eval {
185 8         23 $obj = j_parse_object($j);
186             };
187 8 50       28 die "j_parse: incr parser error: $@" if $@;
188 8         31 $current = 'RESULTS_STREAM';
189             }
190             elsif ($j->incr_text eq '') {
191 0         0 $current = 'DONE';
192 0         0 $done=1;
193             }
194             }
195 11         29 when ('errors') {
196 6 50       41 if ($j->incr_text=~ s/^\[\s*//) {
    0          
197 6         21 $obj = j_parse_array($j);
198 6         20 $current = 'ERRORS_STREAM';
199             }
200             elsif ($j->incr_text =~ s/^\s*\]\s*,?\s*//) {
201 0         0 $current = 'DONE';
202 0         0 $done=1;
203             }
204             }
205 5         24 when (/}/) {
206 3 50       9 if ($current eq 'DATA_STREAM') {
207 3 100       55 if ($j->incr_text =~ s/^\s*,\s*{//) {
    100          
    50          
208             # prepared for next results object
209 1         6 $obj = j_parse_object($j);
210 1         3 $key = 'results';
211 1         3 $current = 'RESULTS_STREAM';
212             }
213             elsif ($j->incr_text =~ s/^\s*\]\s*,?\s*//) {
214 1         3 $current = 'DONE';
215 1         4 $done=1;
216             }
217             elsif ($j->incr_text eq '') {
218 1         3 $current = 'DONE';
219 1         5 $done=1;
220             }
221             }
222             else {
223 0         0 $current = 'DONE';
224 0         0 $done=1;
225             }
226             }
227 2         6 when (undef) {
228 0         0 die "j_parse: No key found";
229             }
230 2         4 default {
231             # why am I here?
232 2         23 die "j_parse: Unexpected key '$key' in stream";
233             }
234             }
235 55 100       127 if (defined $obj) {
236 53         120 $head = [$key => $obj];
237 53         553 $j->incr_text =~ s/^(?:(\s*,\s*)|(\s*}\s*))//;
238 53         146 $done = !!$2;
239             }
240             else {
241 2 50       10 $head = $done ? undef : [$key => $current = 'PENDING'];
242             }
243 55 50       363 return node($head, $done ? undef : promise { $po->() }) if $head;
  52 100       13210  
244 2         10 return node(undef, undef);
245 23         159 };
246 23         103 return $po;
247             }
248              
249             =head1 NAME
250              
251             REST::Neo4p::ParseStream - Parse Neo4j REST responses on the fly
252              
253             =head1 SYNOPSIS
254              
255             Not for human consumption.
256             This module is ignored by the Neo4j::Driver-based agent.
257              
258             =head1 DESCRIPTION
259              
260             This module helps L exploit the L
261             server's chunked transfer encoding of its JSON REST responses. It is
262             based on the fast L incremental parser and
263             L's L
264             Perl|http://hop.perl.plover.com> ideas as implemented in
265             L.
266              
267             The goal is to be able to pull in objects from the server stream as
268             soon as they are available. In practice, this means specifically
269             finding and incrementally processing the potentially large arrays of
270             objects that are returned from cypher queries, transaction queries,
271             and batch requests.
272              
273             Because of inconsistencies among the Neo4j response formats for each
274             of these functions, this module does a significant amount of
275             "hand-parsing". Currently the code will not be very robust to changes
276             in those response formats. If you find your query handling is breaking
277             with a new server version, L
278             ticket|https://rt.cpan.org/Public/Bug/Report.html?Queue=REST-Neo4p>. In
279             the meantime, you should be able to keep things going (albeit more
280             slowly) by turning off streaming at the agent:
281              
282             use REST::Neo4p;
283             REST::Neo4p->agent->no_stream;
284             ...
285              
286             =head1 SEE ALSO
287              
288             L, L, L,
289             L, L.
290              
291             =head1 AUTHOR
292              
293             Mark A. Jensen
294             CPAN ID: MAJENSEN
295             majensen -at- cpan -dot- org
296              
297             =head1 LICENSE
298              
299             Copyright (c) 2012-2020 Mark A. Jensen. This program is free software; you
300             can redistribute it and/or modify it under the same terms as Perl
301             itself.
302              
303             =cut
304              
305             1;