File Coverage

blib/lib/IPC/Message/Minivan.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             package IPC::Message::Minivan;
2 1     1   55743 use warnings;
  1         3  
  1         80  
3 1     1   6 use strict;
  1         2  
  1         38  
4 1     1   32 use 5.008;
  1         8  
  1         54  
5 1     1   636 use IPC::Messaging;
  0            
  0            
6             use JSON::XS;
7             use Time::HiRes;
8             use Regexp::Common;
9              
10             use vars '$VERSION';
11             $VERSION = '0.01_08';
12              
13             my $DEF_PORT = 6826;
14              
15             sub new
16             {
17             my ($class, %p) = @_;
18             die "want host\n" unless $p{host};
19             $p{port} ||= $DEF_PORT;
20             eval { $p{sock} = IPC::Messaging->tcp_client($p{host}, $p{port}, by_line => 1); };
21             $p{connected} = 0;
22             $p{queue} = [];
23             $p{chan} = {};
24             $p{json} = JSON::XS->new->ascii->allow_nonref;
25             my $me = bless \%p, $class;
26             $me->_poll;
27             $me;
28             }
29              
30             sub subscribe
31             {
32             my ($me, @chan) = @_;
33             for my $chan (@chan) {
34             $me->{chan}{$chan} = 1;
35             }
36             $me->_need_connect;
37             return 0 unless $me->{connected};
38             for my $chan (@chan) {
39             syswrite $me->{sock}, "subscribe $chan\n";
40             }
41             return 1;
42             }
43              
44             sub msg
45             {
46             my ($me, $chan, $msg) = @_;
47             $me->_need_connect;
48             return 0 unless $me->{connected};
49             my $json = $me->{json}->encode($msg);
50             syswrite $me->{sock}, "put $chan $json\n";
51             return 1;
52             }
53              
54             sub get
55             {
56             my ($me, @chan) = @_;
57              
58             my $classify = 0;
59             my $want_one = !wantarray;
60             my $timeout = 0;
61             if (@chan && $chan[0] =~ /^$RE{num}{real}$/ && $chan[0] >= 0) {
62             $timeout = shift @chan;
63             }
64             if (@chan == 1 && ref $chan[0]) {
65             @chan = @{$chan[0]};
66             $classify = 1;
67             }
68              
69             my @r = $me->_scan(want_one => $want_one, classify => $classify, chan => \@chan);
70             if (@r) {
71             return @r unless $want_one;
72             return $r[0] if defined $r[0];
73             }
74              
75             $me->_poll($timeout);
76             return $me->_scan(want_one => $want_one, classify => $classify, chan => \@chan);
77             }
78              
79             sub _scan
80             {
81             my ($me, %p) = @_;
82              
83             my (@r, $r, $v, $found);
84             unless (@{$p{chan}}) {
85             my @m = @{$me->{queue}};
86             $me->{queue} = [];
87             for my $m (@m) {
88             if (!$found) {
89             $v = $me->{json}->decode($m->[1]);
90             $v = [$m->[0], $v] if $p{classify};
91             if ($p{want_one}) {
92             $r = $v;
93             $found = 1;
94             } else {
95             push @r, $v;
96             }
97             } else {
98             push @{$me->{queue}}, $m;
99             }
100             }
101             } else {
102             my %c = map { $_ => 1 } @{$p{chan}};
103             my @m = @{$me->{queue}};
104             $me->{queue} = [];
105             for my $m (@m) {
106             if ($c{$m->[0]} && !$found) {
107             $v = $me->{json}->decode($m->[1]);
108             $v = [$m->[0], $v] if $p{classify};
109             if ($p{want_one}) {
110             $r = $v;
111             $found = 1;
112             } else {
113             push @r, $v;
114             }
115             } else {
116             push @{$me->{queue}}, $m;
117             }
118             }
119             }
120             return $p{want_one} ? ($r) : @r;
121             }
122              
123             sub _poll
124             {
125             my ($me, $to) = @_;
126             $to ||= 0;
127             my $timeout = 0;
128             if (!$me->{sock}) {
129             eval { $me->{sock} = IPC::Messaging->tcp_client($me->{host}, $me->{port}, by_line => 1); };
130             return unless $me->{sock};
131             }
132             while (1) {
133             receive {
134             got tcp_connected => $me->{sock} => then {
135             $me->{connected} = 1;
136             for my $chan (keys %{$me->{chan}}) {
137             syswrite $me->{sock}, "subscribe $chan\n";
138             }
139             };
140             got tcp_line => $me->{sock} => then {
141             my $s = $_[1]->{line};
142             $s =~ s/\r?\n?$//;
143             if ($s =~ /^msg\s+(\S+)\s+(.*)$/) {
144             push @{$me->{queue}}, [$1, $2];
145             }
146             };
147             got tcp_error => $me->{sock} => then {
148             $me->{connected} = 0;
149             $me->{sock} = undef;
150             };
151             got tcp_disconnect => $me->{sock} => then {
152             $me->{connected} = 0;
153             $me->{sock} = undef;
154             };
155             after $to => then { $timeout = 1; };
156             };
157             $to = 0; # XXX tricky, this
158             return if $timeout;
159             return unless $me->{sock};
160             }
161             }
162              
163             sub _need_connect
164             {
165             my $me = shift;
166             if ($me->{connected}) {
167             $me->_poll;
168             } else {
169             $me->_poll(5);
170             }
171             }
172              
173             1;
174             __END__