File Coverage

blib/lib/Eixo/Queue/SocketPair.pm
Criterion Covered Total %
statement 56 74 75.6
branch 5 12 41.6
condition n/a
subroutine 16 18 88.8
pod 0 4 0.0
total 77 108 71.3


line stmt bran cond sub pod time code
1             package Eixo::Queue::SocketPair;
2              
3 1     1   61046 use strict;
  1         2  
  1         48  
4 1     1   632 use Eixo::Base::Clase qw(Eixo::QueueInmediate);
  1         15237  
  1         8  
5              
6              
7 1     1   787 use Socket;
  1         4417  
  1         595  
8 1     1   797 use IO::Handle;
  1         7757  
  1         62  
9 1     1   699 use IO::Select;
  1         2026  
  1         57  
10 1     1   533 use Eixo::Queue::Job;
  1         8  
  1         6  
11              
12 1     1   508 use Eixo::Queue::SocketPairDriver;
  1         2  
  1         3  
13              
14             has(
15              
16             backend=>undef,
17              
18             pid_c=>undef,
19              
20             initiated=>undef,
21              
22             jobSent=>undef,
23              
24             input=>undef,
25              
26             output=>undef,
27              
28             );
29              
30             sub init{
31 1     1 0 204 my ($self) = @_;
32              
33 1         2 my ($driver) = $self->__openCommunications;
34            
35 1         53 $self->__startBackend($driver);
36              
37 1         15 $self->initiated(1);
38              
39             }
40              
41             sub DESTROY{
42 1     1   667 my ($self) = @_;
43              
44 1 50       5 if($self->pid_c){
45              
46 1         8 kill(10, $self->pid_c);
47              
48 1         1282 waitpid($self->pid_c, 0);
49              
50             }
51              
52             }
53              
54              
55             sub __openCommunications{
56 3     3   37 my ($self) = @_;
57              
58 3         7 &Eixo::Queue::SocketPairDriver::open();
59             }
60              
61             sub __startBackend{
62 1     1   2 my ($self, $driver) = @_;
63              
64 1         2 my ($a, $b) = ($self->__openCommunications(), $self->__openCommunications);
65              
66              
67 1 50       992 if(my $pid = fork){
68            
69 1         45 $a->A;
70 1         9 $b->A;
71              
72 1         4 $self->{input} = $a;
73 1         5 $self->{output} = $b;
74              
75 1         15 $self->pid_c($pid);
76              
77             }
78             else{
79 0         0 $a->B;
80 0         0 $b->B;
81              
82 0         0 $self->{input} = $b;
83 0         0 $self->{output} = $a;
84              
85 0         0 eval{
86              
87 0         0 $self->__backendLoop();
88              
89             };
90 0 0       0 if($@){
91            
92 1     1   226 use Data::Dumper;
  1         2  
  1         359  
93              
94 0         0 print Dumper($@);
95              
96 0         0 exit 1;
97             }
98              
99 0         0 exit 0;
100            
101             }
102            
103             }
104              
105             sub __backendLoop{
106 0     0   0 my ($self) = @_;
107              
108 0         0 while(my $job = $self->input->receive){
109              
110 0         0 $job = Eixo::Queue::Job->unserialize($job);
111              
112 0         0 $self->backend->($job);
113              
114 0         0 $self->output->send($job->serialize);
115             }
116             }
117              
118             sub add{
119 1000     1000 0 717 my ($self, $job) = @_;
120              
121 1000 50       1417 unless($self->initiated){
122 0         0 die(ref($self) . '::add Queue not initiated');
123             }
124              
125 1000 50       3706 return undef if($self->jobSent);
126              
127 1000         3712 $self->__toBackend($job);
128              
129 1000         3663 $self->jobSent(1);
130              
131 1000         4369 return 1;
132             }
133              
134             sub __toBackend{
135 1000     1000   807 my ($self, $job) = @_;
136              
137 1000         1372 $self->output->send($job->serialize);
138             }
139              
140             sub wait{
141 1000     1000 0 735 my ($self) = @_;
142              
143 1000 50       1410 if($self->jobSent){
144              
145 1000         4334 $self->__waitBackend;
146              
147             }
148             }
149             sub __waitBackend{
150              
151 1000     1000   1508 my $data = $_[0]->input->receive;
152              
153 1000         1992 $_[0]->jobSent(0);
154              
155 1000         4473 return Eixo::Queue::Job->unserialize($data);
156              
157            
158             }
159              
160              
161             sub status{
162 0     0 0   my ($self) = @_;
163              
164 0           $self->jobSent;
165             }
166              
167             1;