File Coverage

lib/IPC/Transit/Internal.pm
Criterion Covered Total %
statement 18 105 17.1
branch 0 32 0.0
condition 0 11 0.0
subroutine 6 19 31.5
pod n/a
total 24 167 14.3


line stmt bran cond sub pod time code
1             package IPC::Transit::Internal;
2             $IPC::Transit::Internal::VERSION = '1.171860';
3 16     16   95 use strict;use warnings;
  16     16   25  
  16         363  
  16         65  
  16         25  
  16         303  
4 16     16   6179 use IPC::SysV;
  16         13542  
  16         594  
5 16     16   6086 use IPC::Msg;
  16         60567  
  16         447  
6 16     16   6837 use POSIX;
  16         77083  
  16         151  
7              
8              
9 16         15753 use vars qw(
10             $config
11 16     16   36639 );
  16         33  
12              
13             {
14             my $queue_cache = {};
15             sub _initialize_queue {
16 0     0     my %args = @_;
17 0           my $qid = _get_queue_id(%args);
18 0 0         if(not $queue_cache->{$qid}) {
19 0 0         $queue_cache->{$qid} = IPC::Msg->new($qid, _get_flags('create_ipc'))
20             or die "failed to _initialize_queue: failed to create queue_id $qid: $!\n";
21             }
22 0           return $queue_cache->{$qid};
23             }
24              
25             sub _remove {
26 0     0     my %args = @_;
27 0           my $qname = $args{qname};
28 0           my $qid = _get_queue_id(%args);
29 0 0         $queue_cache->{$qid}->remove if $queue_cache->{$qid};
30 0           unlink _get_transit_config_dir() . "/$qname";
31             }
32              
33             sub _stat {
34 0     0     my %args = @_;
35 0           my $qid = _get_queue_id(%args);
36 0           _initialize_queue(%args);
37 0           my @heads = qw(uid gid cuid cgid mode qnum qbytes lspid lrpid stime rtime ctime);
38 0           my $ret = {};
39 0           my @items = @{$queue_cache->{$qid}->stat};
  0            
40 0           foreach my $item (@items) {
41 0           $ret->{shift @heads} = $item;
42             }
43 0           $ret->{qname} = $args{qname};
44 0           $ret->{qid} = $qid;
45 0           return $ret;
46             }
47             }
48              
49             sub _drop_all_queues {
50 0     0     foreach my $qname (keys %{$config->{queues}}) {
  0            
51 0           _remove(qname => $qname);
52             }
53             }
54             sub _stats {
55 0     0     my $ret = [];
56 0           _gather_queue_info();
57 0           foreach my $queue_name (keys %{$config->{queues}}) {
  0            
58 0           push @$ret, IPC::Transit::stat(qname => $queue_name);
59             }
60 0           return $ret;
61             }
62              
63             sub _get_transit_config_dir {
64 0   0 0     my $dir = $IPC::Transit::config_dir || '/tmp/ipc_transit/';
65 0           return $dir;
66             }
67              
68             sub _lock_config_dir {
69 0     0     my $lock_file = _get_transit_config_dir() . '/.lock';
70 0           my ($have_lock, $fh);
71 0           for (1..2) {
72 0 0         if(sysopen($fh, $lock_file, _get_flags('exclusive_lock'))) {
73 0           $have_lock = 1;
74 0           last;
75             }
76 0           sleep 1;
77             }
78 0 0         if(not $have_lock) {
79 0           _unlock_config_dir();
80 0           sysopen($fh, $lock_file, _get_flags('exclusive_lock'));
81             }
82             #we have the advisory lock for sure now
83             }
84              
85              
86             sub _unlock_config_dir {
87 0     0     my $lock_file = _get_transit_config_dir() . '/.lock';
88 0 0         unlink $lock_file or die "_unlock_config_dir: failed to unlink $lock_file: $!";
89             }
90              
91             sub _gather_queue_info {
92 0     0     _mk_queue_dir();
93 0 0         $config->{queues} = {} unless $config->{queues};
94 0           foreach my $filename (glob _get_transit_config_dir() . '/*') {
95 0           my $info = {};
96 0 0         open my $fh, '<', $filename
97             or die "IPC::Transit::Internal::_gather_queue_info: failed to open $filename for reading: $!";
98 0           while(my $line = <$fh>) {
99 0           chomp $line;
100 0           my ($key, $value) = split '=', $line;
101 0           $info->{$key} = $value;
102             }
103 0 0         die 'required key "qid" not found' unless $info->{qid};
104 0 0         die 'required key "qname" not found' unless $info->{qname};
105 0           $config->{queues}->{$info->{qname}} = $info;
106             }
107             }
108              
109             sub _queue_exists {
110 0     0     my $qname = shift;
111 0           _mk_queue_dir();
112 0           return $config->{queues}->{$qname};
113             }
114              
115             sub _get_queue_id {
116 0     0     my %args = @_;
117 0           _mk_queue_dir();
118 0           my $qname = $args{qname};
119              
120             #return it if we have it
121             return $config->{queues}->{$qname}->{qid}
122 0 0 0       if $config->{queues} and $config->{queues}->{$qname};
123              
124             #we don't have it; let's load it and try again
125 0           _gather_queue_info();
126             return $config->{queues}->{$qname}->{qid}
127 0 0 0       if $config->{queues} and $config->{queues}->{$qname};
128              
129             #we still don't have it; get a lock, load it, try again, ane make
130             #it if necessary
131 0           _lock_config_dir();
132 0           eval {
133             #now re-load the config
134 0           _gather_queue_info();
135              
136             #if we now have it, unlock and return it
137 0 0 0       if($config->{queues} and $config->{queues}->{$qname}) {
138 0           _unlock_config_dir();
139 0           return $config->{queues}->{$qname}->{qid};
140             }
141              
142             #otherwise, we need to make one
143 0           { my $file = _get_transit_config_dir() . "/$qname";
  0            
144 0 0         open my $fh, '>', $file or die "IPC::Transit::Internal::_get_queue_id: failed to open $file for writing: $!";
145 0           my $new_qid = IPC::SysV::ftok($file, 1);
146 0           print $fh "qid=$new_qid\n";
147 0           print $fh "qname=$qname\n";
148 0           close $fh;
149             }
150 0           _unlock_config_dir();
151             };
152 0 0         if($@) {
153 0           _unlock_config_dir();
154             }
155 0           _gather_queue_info();
156 0           return $config->{queues}->{$qname}->{qid};
157             }
158              
159             sub _mk_queue_dir {
160 0 0   0     mkdir _get_transit_config_dir(), 0777
161             unless -d _get_transit_config_dir();
162             }
163              
164             #gnarly looking UNIX goop hidden below
165             {
166             my $flags = {
167             create_ipc => IPC::SysV::S_IRUSR() |
168             IPC::SysV::S_IWUSR() |
169             IPC::SysV::S_IRGRP() |
170             IPC::SysV::S_IWGRP() |
171             IPC::SysV::S_IROTH() |
172             IPC::SysV::S_IWOTH() |
173             IPC::SysV::IPC_CREAT(),
174              
175             nowait => IPC::SysV::IPC_NOWAIT(),
176              
177             exclusive_lock => POSIX::O_RDWR() |
178             POSIX::O_CREAT() |
179             POSIX::O_EXCL(),
180              
181             nonblock => POSIX::O_NONBLOCK(),
182             };
183              
184             sub
185             _get_flags {
186 0     0     my $name = shift;
187 0           return $flags->{$name};
188             }
189             }
190             1;