File Coverage

blib/lib/IO/Async/JSONStream.pm
Criterion Covered Total %
statement 9 9 100.0
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 12 12 100.0


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2014 -- leonerd@leonerd.org.uk
5              
6             package IO::Async::JSONStream;
7              
8 1     1   766 use strict;
  1         2  
  1         42  
9 1     1   6 use warnings;
  1         2  
  1         57  
10              
11             our $VERSION = '0.02';
12              
13 1     1   6 use base qw( IO::Async::Stream );
  1         10  
  1         873  
14             IO::Async::Stream->VERSION( '0.57' ); # ->read future
15              
16             use JSON qw( encode_json decode_json );
17              
18             use Carp;
19              
20             =head1 NAME
21              
22             C - send or receive lines of JSON data in C
23              
24             =head1 SYNOPSIS
25              
26             use IO::Async::JSONStream;
27              
28             use IO::Async::Loop;
29             my $loop = IO::Async::Loop->new;
30              
31             my $jsonstream = IO::Async::JSONStream->new;
32             $loop->add( $jsonstream );
33              
34             $jsonstream->connect(
35             host => "my.server",
36             service => 12345,
37             )->then( sub {
38             $jsonstream->write_json( [ data => { goes => "here" } ] );
39             $jsonstream->read_json
40             })->on_done( sub {
41             my ( $data ) = @_;
42              
43             print "Received the data $data\n";
44             })->get;
45              
46             =head1 DESCRIPTION
47              
48             This subclass of L implements a simple JSON-encoded data
49             stream, sending and receiving Perl data structures by JSON-encoded lines of
50             text.
51              
52             =cut
53              
54             =head1 EVENTS
55              
56             The following events are invoked, either using subclass methods or CODE
57             references in parameters:
58              
59             =head2 on_json $data
60              
61             Invoked when a line of JSON-encoded data is received. It is passed the decoded
62             data as a regular Perl data structure.
63              
64             =head2 on_json_error $error
65              
66             Invoked when a line is received but JSON decoding fails. It is passed the
67             failure exception from the JSON decoder.
68              
69             =cut
70              
71             =head1 PARAMETERS
72              
73             The following named parameters may be passed to C or C:
74              
75             =over 8
76              
77             =item on_json => CODE
78              
79             =item on_json_error => CODE
80              
81             CODE references for event handlers.
82              
83             =item eol => STRING
84              
85             Optional. Sets the string used for the line ending on the stream when writing
86             JSON. Defaults to C<\n> if not given.
87              
88             =back
89              
90             =cut
91              
92             sub _init
93             {
94             my $self = shift;
95             $self->SUPER::_init( @_ );
96              
97             $self->{eol} = "\n";
98             $self->{json_read_f} = [];
99             $self->{json} = JSON->new;
100             }
101              
102             sub configure
103             {
104             my $self = shift;
105             my %args = @_;
106              
107             foreach (qw( on_json on_json_error eol )) {
108             $self->{$_} = delete $args{$_} if exists $args{$_};
109             }
110              
111             if( $self->read_handle ) {
112             $self->can_event( $_ ) or croak "Expected either an $_ callback or to be able to ->$_"
113             for qw( on_json on_json_error );
114             }
115              
116             $self->SUPER::configure( %args );
117             }
118              
119             sub on_read
120             {
121             my $self = shift;
122             my ( $buffref, $eof ) = @_;
123             return if $eof;
124              
125             my $json = $self->{json};
126              
127             $json->incr_parse( $$buffref );
128             $$buffref = '';
129              
130             PARSE_ONE: {
131             my $data;
132              
133             my $fail = not eval {
134             $data = $json->incr_parse;
135             1
136             };
137             chomp( my $e = $@ );
138              
139             my $f;
140             1 while $f = shift @{ $self->{json_read_f} } and $f->is_cancelled;
141              
142             if( $data ) {
143             $f ? $f->done( $data )
144             : $self->invoke_event( on_json => $data );
145             redo PARSE_ONE;
146             }
147             elsif( $fail ) {
148             $f ? $f->fail( $e, json => )
149             : $self->invoke_event( on_json_error => $e );
150             $json->incr_skip;
151             redo PARSE_ONE;
152             }
153             # else last
154             }
155              
156             return 0;
157             }
158              
159             =head1 METHODS
160              
161             =cut
162              
163             =head2 $jsonstream->write_json( $data, %args )
164              
165             Writes a new line of JSON-encoded data from the given Perl data structure.
166              
167             Other arguments are passed to the C method. Returns a C which
168             will complete when the line is flushed.
169              
170             =cut
171              
172             sub write_json
173             {
174             my $self = shift;
175             my ( $data, @args ) = @_;
176              
177             $self->write( encode_json( $data ) . $self->{eol}, @args );
178             }
179              
180             =head2 $jsonstream->read_json ==> $data
181              
182             Returns a L that will yield the next line of JSON-encoded data to be
183             read from the stream. This takes place instead of the C event.
184              
185             If a JSON decoding error occurs it will result in a failed Future with the
186             operation name C and the line on which decoding failed as its argument.
187              
188             =cut
189              
190             sub read_json
191             {
192             my $self = shift;
193              
194             push @{ $self->{json_read_f} }, my $f = $self->loop->new_future;
195              
196             return $f;
197             }
198              
199             =head1 AUTHOR
200              
201             Paul Evans
202              
203             Incremental parsing support added by Frew Schmidt
204              
205             =cut
206              
207             0x55AA;