File Coverage

blib/lib/Email/Sender/Server/Manager.pm
Criterion Covered Total %
statement 33 144 22.9
branch 0 36 0.0
condition 0 8 0.0
subroutine 11 17 64.7
pod 4 5 80.0
total 48 210 22.8


line stmt bran cond sub pod time code
1             # ABSTRACT: Email Server Manager
2              
3             package Email::Sender::Server::Manager;
4              
5 1     1   7 use Moo;
  1         2  
  1         8  
6 1     1   362 use utf8;
  1         3  
  1         7  
7              
8             with 'Email::Sender::Server::Base';
9              
10 1     1   45 use Carp 'confess';
  1         2  
  1         81  
11 1     1   1700 use Data::Dumper 'Dumper';
  1         13438  
  1         124  
12 1     1   1552 use File::Copy 'move';
  1         2739  
  1         72  
13 1     1   6 use File::Path 'mkpath';
  1         1  
  1         57  
14 1     1   1357 use File::Spec::Functions 'splitdir', 'splitpath';
  1         1109  
  1         94  
15 1     1   1061 use File::Slurp 'write_file', 'read_file';
  1         18596  
  1         84  
16 1     1   986 use Class::Date;
  1         17303  
  1         20  
17              
18 1     1   916 use Email::Sender::Server::Message;
  1         6  
  1         39  
19 1     1   694 use Email::Sender::Server::Worker;
  1         4  
  1         3016  
20              
21             $Data::Dumper::Useperl = 1;
22              
23             our $VERSION = '1.000001'; # VERSION
24              
25              
26              
27             has spawn => (
28             is => 'rw',
29             default => 3
30             );
31              
32              
33             has workers => (
34             is => 'rw',
35             default => sub {[
36             # list of workers process IDs
37             ]}
38             );
39              
40              
41             has workspace => (
42             is => 'rw',
43             default => sub {
44             shift->directory('queued');
45             }
46             );
47              
48              
49             sub BUILD {
50 0     0 0   my ($self) = @_;
51              
52 0           my $queue = $self->directory('queued');
53              
54 0 0 0       confess "Couldn't find or access (write-to) the server's queue $queue"
55             unless -d $queue && -w $queue;
56              
57 0           return $self;
58             }
59              
60              
61             sub cleanup {
62 0     0 1   my ($self) = @_;
63              
64             # re-queue imcompleted work orders
65              
66 0           opendir my $directory, $self->directory('worker');
67              
68 0           my @workers = grep { !/^\./ } readdir $directory;
  0            
69              
70 0           foreach my $worker (@workers) {
71 0           my @filelist = @{ $self->message_filelist('worker', $worker) };
  0            
72              
73 0           foreach my $filepath (@filelist) {
74 0           my $filename = $self->message_filename($filepath);
75              
76 0           move $filepath, $self->filepath('queued', $filename);
77              
78 0           unlink $filepath;
79             }
80              
81 0           rmdir $self->directory('worker', $worker);
82             }
83              
84 0           closedir $directory;
85              
86             # remove shutdown flag
87              
88 0           my $killer = $self->filepath('shutdown') ;
89              
90 0 0         unlink $killer if -e $killer;
91              
92 0           return $self;
93             }
94              
95              
96             sub create_config {
97 0     0 1   my ($self) = @_;
98 0           my $cfg = $self->filepath("config");
99              
100 0 0         unless (-e $cfg) {
101             # write config file template
102             write_file $cfg, {binmode => ':utf8'}, join "\n\n",
103             '# see the Emailesque module for configuration options',
104             'use utf8;',
105             Dumper {
106             message => {
107             to => undef,
108             from => undef,
109             subject => undef,
110             cc => undef,
111             bcc => undef,
112             reply_to => undef,
113             message => undef,
114             type => 'text'
115             },
116             transport => {
117             Sendmail => {
118 0           sendmail => do {
119 0           my $path = `which sendmail`;
120 0           $path =~ s/[\r\n]//g;
121 0 0         $path || '/usr/sbin/sendmail';
122             }
123             }
124             }
125             };
126             }
127              
128 0           return $self;
129             }
130              
131              
132             sub create_work {
133 0     0 1   my ($self, %input) = @_;
134 0           my $messenger = Email::Sender::Server::Message->new;
135              
136 0           while (my($name, $value) = each(%input)) {
137 0           $messenger->$name($value);
138             }
139              
140 0           my $message = $messenger->to_hash;
141              
142 0 0         if ($message) {
143 0           my $filename = do {
144 0           my $n = $message->{created}; $n =~ s/\W//g;
  0            
145 0           my $x = join "-", $n, $$;
146 0           my $i = do {
147 0           my @chars = ('a'..'z','0'..'9');
148 0           join '',
149             $chars[rand @chars],
150             $chars[rand @chars],
151             $chars[rand @chars],
152             $chars[rand @chars]
153             };
154 0           "$x" . "-" . $i . ".msg"
155             };
156              
157 0           my $filepath = $self->filepath('queued', $filename);
158              
159 0           my $pid = fork();
160              
161 0 0         if ($pid == 0) {
162 0           write_file $filepath, {binmode => ':utf8'}, join "\n\n",
163             '# see the Emailesque module for configuration options',
164             'use utf8;', Dumper $message;
165              
166 0           exit; # zombies will self-destruct
167             }
168              
169 0 0         return $filepath if $pid;
170             }
171              
172 0           return;
173             }
174              
175              
176             sub delegate_workload {
177 0     0 1   my ($self) = @_;
178              
179             # delegate to workers (minions)
180              
181 0   0       my $i = $self->spawn || 1;
182              
183 0           for(1..$i) {
184 0           my $pid = fork;
185              
186 0 0         if ($pid == 0) {
    0          
187 0           my $worker = Email::Sender::Server::Worker->new(id => $$);
188              
189 0           while (1) {
190 0           foreach my $filepath (@{ $worker->message_filelist }) {
  0            
191 0           my $data = do $filepath;
192              
193 0           my $next_filepath;
194              
195 0           my $message = $worker->process_message($data);
196              
197 0 0 0       if ($message && ref($message->result) =~ /success/i) {
198             # move message to passed
199              
200 0           my $filename = $self->message_filename($filepath);
201              
202             # segment storage in attempt to avoid filesystem
203             # directory size error
204              
205 0           my @directory = ('passed');
206              
207 0           push @directory, ($filename =~ /\W?(\d{4})(\d{4})/);
208 0           push @directory, $filename;
209              
210 0           move $filepath,
211             $next_filepath = $self->filepath(@directory);
212             }
213             else {
214             # move message to failed
215              
216 0           my $filename = $self->message_filename($filepath);
217              
218             # segment storage in attempt to avoid filesystem
219             # directory size error
220              
221 0           my @directory = ('failed');
222              
223 0           push @directory, ($filename =~ /\W?(\d{4})(\d{4})/);
224 0           push @directory, $filename;
225              
226 0           move $filepath,
227             $next_filepath = $self->filepath(@directory);
228             }
229              
230             # log outcome (real quick)
231              
232 0 0         if ($next_filepath) {
233             # ridiculously simple stupid logging
234              
235 0           my @audit = ();
236              
237 0           push @audit, "Date: " . Class::Date::now->string;
238 0           push @audit, "To: " . $message->to;
239 0           push @audit, "From: " . $message->from;
240 0           push @audit, "Subject: " . $message->subject;
241 0           push @audit, "File: " . $next_filepath;
242 0           push @audit, "Status: " . ref $message->result;
243              
244 0 0         if (ref($message->result) =~ /failure/i) {
245 0 0         if (ref($message->result) =~ /multi/i) {
246 0           push @audit, "Errors: " . join ", ",
247 0           map { $_->message }
248             $message->result->failure;
249             }
250             else {
251 0           push @audit, "Errors: " .
252             $message->result->message;
253             }
254             }
255              
256 0           write_file $self->filepath('delivery.log'),
257             {binmode => ':utf8', append => 1},
258             join "\n", "", @audit;
259             }
260             }
261              
262 0 0         last if $worker->stop_polling;
263             }
264              
265 0           exit(0);
266             }
267             elsif ($pid) {
268 0           push @{ $self->workers }, $pid;
  0            
269             }
270             else {
271 0           die # die you!
272             }
273             }
274              
275 0           my $pid = fork;
276              
277 0 0         if ($pid == 0) {
    0          
278             # delegate and process queued messages
279              
280 0           while (1) {
281 0           foreach my $filepath (@{ $self->message_filelist }) {
  0            
282             # find suitable worker bee (currently at-random)
283              
284 0           my $random_worker =
285 0           $self->workers->[rand(@{ $self->workers })];
286              
287 0           my $filename = $self->message_filename($filepath);
288              
289 0 0         if ($filename) {
290 0           move $filepath,
291             $self->filepath('worker', $random_worker, $filename);
292             }
293             }
294              
295 0 0         last if $self->stop_polling;
296             }
297              
298 0           exit(0);
299             }
300              
301             elsif (! $pid) {
302 0           confess "Couldn't fork the manager's delegator, $!" ;
303             }
304              
305             # always cleanup behind yourself !!!
306 0     0     $SIG{INT} = sub { $self->cleanup; exit };
  0            
  0            
307              
308 0           waitpid $_, 0 for (@{$self->workers}, $pid); # blocking
  0            
309              
310 0           $self->cleanup; # cleanup server environment
311              
312             exit # return $self;
313 0           }
314              
315             1;
316              
317             __END__