File Coverage

blib/lib/REST/Neo4p/ParseStream.pm
Criterion Covered Total %
statement 135 149 90.6
branch 42 62 67.7
condition 12 15 80.0
subroutine 14 14 100.0
pod 0 3 0.0
total 203 243 83.5


line stmt bran cond sub pod time code
1 38     38   75179 use v5.10.1;
  38         165  
2             package REST::Neo4p::ParseStream;
3 38     38   3016 use base Exporter;
  38         81  
  38         3049  
4 38     38   25778 use HOP::Stream qw/node promise/;
  38         74367  
  38         2644  
5              
6 38     38   271 use strict;
  38         1314  
  38         5403  
7 38     38   1720 use warnings;
  38         73  
  38         1224  
8              
9             BEGIN {
10 38     38   7670 $REST::Neo4p::ParseStream::VERSION = '0.4001';
11             }
12              
13             our @EXPORT = qw/j_parse/;# j_parse_object j_parse_array /;
14              
15             # lazy linked lists
16             # qry response in txn - "data":[{ "row" : [...]},...]
17             # qry response for qry - "data":[ [{...},...],... ]
18             # so - for txn response, j_parse_query_response returns hashes,
19             # for qry response, j_parse_query_response returns arrays
20              
21             sub j_parse {
22 15     15 0 4921 my $j = shift;
23 15         25 state $state; # undef == first call?
24 15 100       469 if ($j->incr_text =~ s/^\s*\[\s*//) {
    50          
    0          
25             # batch (simple array of objects)
26 1         5 return ['BATCH', j_parse_array($j)];
27             }
28             elsif ($j->incr_text =~ s/^\s*{\s*//) {
29             # object
30 14         32 my $type;
31 38     38   1401 use experimental 'smartmatch';
  38         7542  
  38         336  
32 14         51 given ($j->incr_text) {
33 14         62 when (/^\s*"commit"/i) {
34 8         28 $type = 'TXN';
35             }
36 6         18 when (/^\s*"columns"/i) {
37 5         15 $type = 'QUERY';
38             }
39 1         2 default {
40 1         4 $type = 'OBJECT';
41             }
42             }
43 14         47 return [$type, j_parse_object($j)]
44             }
45             elsif ($j->incr_text =~ m/^\s*"[a-zA-Z_]+"/) {
46             # after a stream, next key of object
47 0         0 return ['OBJECT', j_parse_object($j)];
48             }
49             else {
50             # problem
51 0         0 return;
52             }
53             }
54              
55             # generic parse array stream
56             # return decoded json entities at the top level
57             # opening [ must be removed
58             # handle empty array too
59             sub j_parse_array {
60 17     17 0 33 my $j = shift;
61 17         30 my $po;
62             $po = sub {
63 474     474   6229 my $elt;
64 474         680 my $int_po = $po;
65 474         606 state $last_text;
66 474         673 my $done = eval {$j->incr_text =~ s/^\s*\]\s*//};
  474         16081  
67 474 100       1184 return node(undef,undef) if $done;
68 470         652 eval {
69 470         6408 $elt = $j->incr_parse;
70             };
71 470 100       1162 if (defined $elt) {
    100          
72 412         1348 $last_text = $j->incr_text;
73 412 100       1448 if ($j->incr_text =~ m/^\]}\],/) { # JSON::XS <=3.04 transaction kludge
    100          
74 5         37 $j->incr_text =~ s/(\])(}\],)//;
75 5         13 $done=1;
76             }
77             elsif ($elt eq 'transaction') { # JSON::XS 4.02 transaction kludge
78 4         22 $j->incr_text = '"transaction"'.$j->incr_text;
79 4         18 return node(undef,undef);
80             }
81             else {
82 403         10171 $j->incr_text =~ s/^(\s*,\s*)|(\s*\]\s*)//;
83 403         1245 $done = !!$2;
84             }
85              
86             }
87             elsif ($@) {
88 1 50 33     18 if ($@ =~ /already started parsing/) {
    50          
89 0         0 $elt = 'PENDING';
90             }
91             elsif ($@ =~ /must be an object or array/ &&
92             $last_text =~ /("[^"]+")/) {
93             # txn kludge
94 0         0 $j->incr_skip;
95 0         0 $j->incr_text = $1.$j->incr_text;
96 0         0 return node(undef,undef);
97             }
98             else {
99 1         7 die "j_parse: $@";
100             }
101             }
102             else {
103 57         89 $elt = 'PENDING';
104             }
105 465 100       2661 node(['ARELT'=>$elt], $done ? undef : promise { $po->() });
  429         215085  
106 17         111 };
107 17         53 return $po;
108             }
109              
110             # generic parse object to one level
111             # opening { must be removed
112             # when another stream is returned as the value,
113             # another call to j_parse_object has to wait until that stream is
114             # exhausted...
115             sub j_parse_object {
116 23     23 0 53 my $j = shift;
117 23         31 my $po;
118             $po = sub {
119 84     84   1327 state $key;
120 84         133 state $current = '';
121 84         187 my ($text, $head,$obj);
122 84         0 my $done;
123 84 50       202 unless ($current eq 'PENDING') {
124 84         113 my $m;
125 38     38   27284 use experimental 'smartmatch';
  38         124  
  38         182  
126 84         125 eval {
127 84         623 $j->incr_text =~ m/^(?:(\s*"([^"]+)"\s*:\s*)|(\s*}\s*))/; # look ahead
128 84   100     367 $m = $2||$3;
129 84 100 100     407 if ($m && ($m eq 'columns') && ($current eq 'RESULTS_STREAM')) {
      100        
130             # if this is a 'results' item, don't eat the 'columns',
131             # let the results stream do it
132 8         17 $m = undef;
133             }
134             else {
135 76         891 $j->incr_text =~ s/^(?:(\s*"([^"]+)"\s*:\s*)|(\s*}\s*))//; # consume
136             }
137              
138             };
139              
140 84 100 66     417 if ($@ =~ /already started parsing/ || !$m) {
    50          
141             # either another function instance is in the middle of parsing,
142             # or no key was found (which is the same thing)
143             # so report where this instance is:
144 25         151 return node([$key => $current], promise { $po->() });
  10         7558  
145             }
146             elsif ($@) {
147 0         0 die "j_parse: incr parser error: $@";
148             }
149 59         121 $key = $m;
150             }
151 38     38   11244 use experimental 'smartmatch';
  38         92  
  38         171  
152 59         90 given ($key) {
153 59         162 when ('columns') {
154 14         21 eval {
155 14         83 $obj = $j->incr_parse;
156             };
157 14 50       41 die "j_parse: incr parser error: $@" if $@;
158 14         32 $current = 'COMPLETE';
159             }
160 45         107 when (/commit/) {
161 8         105 $j->incr_text =~ s/^"([^"]+)"\s*,?\s*//;
162 8         23 $obj = $1;
163 8         22 $current = 'COMPLETE';
164             }
165 37         73 when (/transaction/) {
166 6         13 eval {
167 6         43 $obj = $j->incr_parse; # get txn info obj
168             };
169 6 50       20 die "j_parse: incr parser error: $@" if $@;
170 6         19 $current = 'COMPLETE';
171             }
172 31         62 when ('data') {
173 12 100       318 if ($j->incr_text =~ s/^\[\s*//) {
174 10         40 $obj = j_parse_array($j);
175 10         29 $current = 'DATA_STREAM';
176             }
177             else {
178 2         22 die "j_parse: expecting an array value for 'data' key";
179             }
180             }
181 19         42 when ('results') {
182 8 50       117 if ($j->incr_text =~ s/^\[\s*//) {
    0          
183 8         101 $j->incr_text =~ s/^\s*{\s*//;
184 8         19 eval {
185 8         22 $obj = j_parse_object($j);
186             };
187 8 50       28 die "j_parse: incr parser error: $@" if $@;
188 8         22 $current = 'RESULTS_STREAM';
189             }
190             elsif ($j->incr_text eq '') {
191 0         0 $current = 'DONE';
192 0         0 $done=1;
193             }
194             }
195 11         39 when ('errors') {
196 6 50       46 if ($j->incr_text=~ s/^\[\s*//) {
    0          
197 6         20 $obj = j_parse_array($j);
198 6         20 $current = 'ERRORS_STREAM';
199             }
200             elsif ($j->incr_text =~ s/^\s*\]\s*,?\s*//) {
201 0         0 $current = 'DONE';
202 0         0 $done=1;
203             }
204             }
205 5         23 when (/}/) {
206 3 50       14 if ($current eq 'DATA_STREAM') {
207 3 100       101 if ($j->incr_text =~ s/^\s*,\s*{//) {
    100          
    50          
208             # prepared for next results object
209 1         11 $obj = j_parse_object($j);
210 1         2 $key = 'results';
211 1         4 $current = 'RESULTS_STREAM';
212             }
213             elsif ($j->incr_text =~ s/^\s*\]\s*,?\s*//) {
214 1         4 $current = 'DONE';
215 1         3 $done=1;
216             }
217             elsif ($j->incr_text eq '') {
218 1         3 $current = 'DONE';
219 1         5 $done=1;
220             }
221             }
222             else {
223 0         0 $current = 'DONE';
224 0         0 $done=1;
225             }
226             }
227 2         6 when (undef) {
228 0         0 die "j_parse: No key found";
229             }
230 2         4 default {
231             # why am I here?
232 2         22 die "j_parse: Unexpected key '$key' in stream";
233             }
234             }
235 55 100       130 if (defined $obj) {
236 53         112 $head = [$key => $obj];
237 53         542 $j->incr_text =~ s/^(?:(\s*,\s*)|(\s*}\s*))//;
238 53         136 $done = !!$2;
239             }
240             else {
241 2 50       10 $head = $done ? undef : [$key => $current = 'PENDING'];
242             }
243 55 50       362 return node($head, $done ? undef : promise { $po->() }) if $head;
  52 100       13509  
244 2         12 return node(undef, undef);
245 23         156 };
246 23         93 return $po;
247             }
248              
249             =head1 NAME
250              
251             REST::Neo4p::ParseStream - Parse Neo4j REST responses on the fly
252              
253             =head1 SYNOPSIS
254              
255             Not for human consumption.
256             This module is ignored by the Neo4j::Driver-based agent.
257              
258             =head1 DESCRIPTION
259              
260             This module helps L exploit the L
261             server's chunked transfer encoding of its JSON REST responses. It is
262             based on the fast L incremental parser and
263             L's L
264             Perl|http://hop.perl.plover.com> ideas as implemented in
265             L.
266              
267             The goal is to be able to pull in objects from the server stream as
268             soon as they are available. In practice, this means specifically
269             finding and incrementally processing the potentially large arrays of
270             objects that are returned from cypher queries, transaction queries,
271             and batch requests.
272              
273             Because of inconsistencies among the Neo4j response formats for each
274             of these functions, this module does a significant amount of
275             "hand-parsing". Currently the code will not be very robust to changes
276             in those response formats. If you find your query handling is breaking
277             with a new server version, L
278             ticket|https://rt.cpan.org/Public/Bug/Report.html?Queue=REST-Neo4p>. In
279             the meantime, you should be able to keep things going (albeit more
280             slowly) by turning off streaming at the agent:
281              
282             use REST::Neo4p;
283             REST::Neo4p->agent->no_stream;
284             ...
285              
286             =head1 SEE ALSO
287              
288             L, L, L,
289             L, L.
290              
291             =head1 AUTHOR
292              
293             Mark A. Jensen
294             CPAN ID: MAJENSEN
295             majensen -at- cpan -dot- org
296              
297             =head1 LICENSE
298              
299             Copyright (c) 2012-2021 Mark A. Jensen. This program is free software; you
300             can redistribute it and/or modify it under the same terms as Perl
301             itself.
302              
303             =cut
304              
305             1;