File Coverage

blib/lib/TheSchwartz/Worker/SendEmail.pm
Criterion Covered Total %
statement 21 176 11.9
branch 0 56 0.0
condition 0 25 0.0
subroutine 7 24 29.1
pod 3 12 25.0
total 31 293 10.5


line stmt bran cond sub pod time code
1             =head1 NAME
2              
3             TheSchwartz::Worker::SendEmail - sends email using SMTP
4              
5             =head1 SYNOPSIS
6              
7             use TheSchwartz;
8             use TheSchwartz::Worker::SendEmail;
9             TheSchwartz::Worker::SendEmail->set_HELO("example.com");
10             my $sclient = TheSchwartz->new(databases => \@Conf::YOUR_DBS);
11             $sclient->can_do("TheSchwartz::Worker::SendEmail");
12             $sclient->work; # main loop of program; goes forever, sending email
13              
14             =head1 DESCRIPTION
15              
16             This is a worker class for sending email (designed for B of
17             email) using L job queue and a slightly-tweaked subclass
18             of L. See L for more information.
19              
20             =head1 JOB ARGUMENTS
21              
22             When constructing a SendEmail job using L's insert_job
23             method, construct your L instance with its
24             'argument' of the following form:
25              
26             {
27             # recipients:
28             rcpts => [ $email1, $email2, ... ],
29             env_from => $envelope_from_address,
30             data => $headers_and_body_as_big_string,
31             }
32              
33             Note that "Bcc:" headers will be removed, and a "Message-ID" header
34             will be added if not present, but nothing else is magical. This
35             module does no MIME, etc. There are other modules for that.
36              
37             =cut
38              
39             package TheSchwartz::Worker::SendEmail;
40 1     1   52629 use base 'TheSchwartz::Worker';
  1         2  
  1         608  
41 1     1   432505 use Net::DNS qw(mx);
  1         486049  
  1         168  
42 1     1   1299 use Storable;
  1         4709  
  1         16524  
43              
44             our $VERSION = '1.00';
45              
46             my $resolver;
47             my $hello_domain;
48             my $keep_exit_status_for = 0;
49             my $on_5xx = sub {};
50              
51             =head1 CLASS METHODS
52              
53             =head2 set_resolver
54              
55             TheSchwartz::Worker::SendEmail->set_resolver($net_dns_resolver_obj)
56              
57             Sets the DNS resolver object to use. By default, just uses a new L.
58              
59             =cut
60              
61             sub set_resolver {
62 0     0 1   $resolver = $_[1];
63             }
64              
65             sub resolver {
66 0   0 0 0   return $resolver ||= Net::DNS::Resolver->new();
67             }
68              
69             =head2 set_HELO
70              
71             TheSchwartz::Worker::SendEmail->set_HELO("example.com");
72              
73             Sets the domain to announce in your HELO.
74              
75             =cut
76              
77             sub set_HELO {
78 0     0 1   $hello_domain = $_[1];
79             }
80              
81             =head2 set_on_5xx
82              
83             TheSchwartz::Worker::SendEmail->set_on_5xx(sub {
84             my ($email, $thesch_job, $smtp_code_space_message) = @_;
85             });
86              
87             Set a subref to be run upon encountering a 5xx error. Arguments to
88             your subref are the email address, L object, and a
89             scalar string of the form "SMTP_CODE SMTP_MESSAGE". The return value
90             of your subref is ignored.
91              
92             =cut
93              
94             sub set_on_5xx {
95 0     0 1   $on_5xx = $_[1];
96             }
97              
98 0     0 0   sub set_keep_exit_status { $keep_exit_status_for = $_[1] }
99              
100             sub work {
101 0     0 0   my ($class, $job) = @_;
102 0           my $args = $job->arg;
103 0           my $client = $job->handle->client;
104 0           my $rcpts = $args->{rcpts}; # arrayref of recipients
105              
106 0           my %dom_rcpts; # domain -> [ $rcpt, ... ]
107 0           foreach my $to (@$rcpts) {
108 0           my ($host) = $to =~ /\@(.+?)$/;
109 0 0         next unless $host;
110 0           $host = lc $host;
111              
112 0   0       $dom_rcpts{$host} ||= [];
113 0           push @{$dom_rcpts{$host}}, $to;
  0            
114             }
115              
116             # uh, whack.
117 0 0         unless (%dom_rcpts) {
118             # FIXME: log or something. for artur.
119 0           $job->completed;
120 0           return;
121             }
122              
123             # split into jobs per host.
124 0 0         if (scalar keys %dom_rcpts > 1) {
125 0           $0 = "send-email [splitting]";
126 0           my @new_jobs;
127 0           foreach my $dom (keys %dom_rcpts) {
128 0           my $new_args = Storable::dclone($args);
129 0           $new_args->{rcpts} = $dom_rcpts{$dom};
130 0           my $new_job = TheSchwartz::Job->new(
131             funcname => 'TheSchwartz::Worker::SendEmail',
132             arg => $new_args,
133             coalesce => "$dom\@",
134             );
135 0           push @new_jobs, $new_job;
136             }
137 0           $job->replace_with(@new_jobs);
138 0           return;
139             }
140              
141             # all rcpts on same server, proceed...
142 0           (my($host), $rcpts) = %dom_rcpts; # (there's only one key)
143 0           $0 = "send-email [$host]";
144              
145 0           my @mailhosts = mx(resolver(), $host);
146              
147 0           my @ex = map { $_->exchange } @mailhosts;
  0            
148              
149             # seen in wild: no MX records, but port 25 of domain is an SMTP server. think it's in SMTP spec too?
150 0 0         @ex = ($host) unless @ex;
151              
152 0           my $smtp = Net::SMTP::BetterConnecting->new(
153             \@ex,
154             Hello => $hello_domain,
155             PeerPort => 25,
156             ConnectTimeout => 4,
157             );
158 0 0         die "Connection failed to domain '$host', MXes: [@ex]\n" unless $smtp;
159              
160 0           $smtp->timeout(300);
161             # FIXME: need to detect timeouts to log to errors, so people with ridiculous timeouts can see that's why we're not delivering mail
162              
163 0           my $done = 0;
164 0   0       while ($job && $class->_send_job_on_connection($smtp, $job) && ++$done < 50) {
      0        
165 0           my $job1 = $job;
166 0           $job = $client->find_job_with_coalescing_prefix(__PACKAGE__, "$host\@");
167              
168 0           my $handle = '';
169 0 0         if ($job) {
170 0           $job->set_as_current;
171 0           $handle = $job->handle->as_string;
172 0 0         die "RSET failed" unless $smtp->reset;
173             }
174              
175 0           $job1->debug("sent successfully. trying another. found: " . $handle);
176             }
177              
178 0           $smtp->quit;
179             }
180              
181             sub _send_job_on_connection {
182 0     0     my ($class, $smtp, $job) = @_;
183              
184 0           my $args = $job->arg;
185 0           my $hstr = $job->handle->as_string;
186              
187 0 0         if ($ENV{DEBUG}) {
188 0           require Data::Dumper;
189 0           warn "sending email on $smtp: " . Data::Dumper::Dumper($args);
190             }
191              
192 0           my $env_from = $args->{env_from}; # Envelope From
193 0           my $rcpts = $args->{rcpts}; # arrayref of recipients
194 0           my $body = $args->{data};
195 0           my $headers;
196              
197 0           my ($this_domain) = $env_from =~ /\@(.+)/;
198              
199             # remove bcc
200 0           $body =~ s/^(.+?\r?\n\r?\n)//s;
201 0           $headers = $1;
202 0           $headers =~ s/^bcc:.+\r?\n//mig;
203              
204             # unless they specified a message ID, let's prepend our own:
205 0 0         unless ($headers =~ m!^message-id:.+!mi) {
206 0           $headers = "Message-ID: \r\n" . $headers;
207             }
208              
209             my $details = sub {
210 0     0     return eval {
211 0           $smtp->code . " " . $smtp->message;
212             }
213 0           };
214              
215             my $not_ok = sub {
216 0     0     my $cmd = shift;
217 0 0         if ($smtp->status == 5) {
218 0           $job->permanent_failure("Permanent failure during $cmd phase to [@$rcpts]: " . $details->());
219 0           return 0; # let's not re-use this connection anymore.
220             }
221 0           die "Error during $cmd phase to [@$rcpts]: " . $details->() . "\n";
222 0           };
223              
224 0 0         return $not_ok->("MAIL") unless $smtp->mail($env_from);
225              
226 0           my $got_an_okay = 0;
227 0           my %perm_fail;
228 0           foreach my $rcpt (@$rcpts) {
229 0 0         if ($smtp->to($rcpt)) {
230 0           $got_an_okay = 1;
231 0           next;
232             }
233 0 0         if ($smtp->status == 5) {
234 0           $perm_fail{$rcpt} = 1;
235 0           $class->on_5xx_rcpt($job, $rcpt, $details->());
236 0           next;
237             }
238 0           die "Error during TO phase to [@$rcpts]: " . $details->() . "\n";
239             }
240              
241 0 0         unless ($got_an_okay) {
242 0           $job->permanent_failure("Permanent failure TO [@$rcpts]: " . $details->() . "\n");
243 0           return 0;
244             }
245              
246             # have to add a fake "Received: " line in here, otherwise some
247             # stupid over-strict MTAs like bellsouth.net reject it thinking
248             # it's spam that was sent directly (it was). Called
249             # "NoHopsNoAuth".
250 0           my $mailid = $hstr;
251 0           $mailid =~ s/-/00/; # not sure if hyphen is allowed in
252 0           my $date = _rfc2822_date(time());
253 0           my $rcvd = qq{Received: from localhost (theschwartz [127.0.0.1])
254             by $this_domain (TheSchwartzMTA) with ESMTP id $mailid;
255             $date
256             };
257 0           $rcvd =~ s/\s+$//;
258 0           $rcvd =~ s/\n\s+/\r\n\t/g;
259 0           $rcvd .= "\r\n";
260              
261 0 0         return $not_ok->("DATA") unless $smtp->data;
262 0 0         return $not_ok->("DATASEND") unless $smtp->datasend($rcvd . $headers . $body);
263 0 0         return $not_ok->("DATAEND") unless $smtp->dataend;
264              
265 0           $job->completed;
266 0           return 1;
267             }
268              
269             sub on_5xx_rcpt {
270 0     0 0   my ($class, $job, $email, $details) = @_;
271 0           $on_5xx->($email, $job, $details);
272              
273             }
274              
275             sub keep_exit_status_for {
276 0 0   0 0   return 0 unless $keep_exit_status_for;
277 0 0         return $keep_exit_status_for->() if ref $keep_exit_status_for eq "CODE";
278 0           return $keep_exit_status_for;
279             }
280              
281 0     0 0   sub grab_for { 500 }
282 0     0 0   sub max_retries { 5 * 24 } # 5 days * 24 hours
283             sub retry_delay {
284 0     0 0   my ($class, $fails) = @_;
285 0   0       return ((5*60, 5*60, 15*60, 30*60)[$fails] || 3600);
286             }
287              
288             # TODO:
289             sub on_job_is_done_forever {
290 0     0 0   my ($class, $job) = @_;
291             # .... run subref to, say, put in LJ db that this email is undeliverable
292             }
293              
294             sub _rfc2822_date {
295 0     0     my $time = shift;
296 0           my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday) =
297             gmtime($time);
298 0           my @days = qw(Sun Mon Tue Wed Thu Fri Sat Sun);
299 0           my @mon = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);
300 0           return sprintf("%s, %d %s %4d %02d:%02d:%02d +0000 (UTC)",
301             $days[$wday], $mday, $mon[$mon], $year+1900, $hour, $min, $sec);
302             }
303              
304             package Net::SMTP::BetterConnecting;
305 1     1   14 use strict;
  1         2  
  1         40  
306 1     1   5 use base 'Net::SMTP';
  1         2  
  1         3286  
307 1     1   42027 use Net::Config;
  1         3  
  1         639  
308 1     1   19 use Net::Cmd;
  1         2  
  1         684  
309              
310             # Net::SMTP's constructor could use improvement, so this is it:
311             # -- retry hosts, even if they connect and say "4xx service too busy", etc.
312             # -- let you specify different connect timeout vs. command timeout
313             sub new {
314 0     0     my $self = shift;
315 0   0       my $type = ref($self) || $self;
316 0           my ($host, %arg);
317 0 0         if (@_ % 2) {
318 0           $host = shift;
319 0           %arg = @_;
320             } else {
321 0           %arg = @_;
322 0           $host = delete $arg{Host};
323             }
324              
325 0 0         my $hosts = defined $host ? $host : $NetConfig{smtp_hosts};
326 0           my $obj;
327 0   0       my $timeout = $arg{Timeout} || 120;
328 0   0       my $connect_timeout = $arg{ConnectTimeout} || $timeout;
329              
330 0           my $h;
331 0 0         foreach $h (@{ref($hosts) ? $hosts : [ $hosts ]}) {
  0            
332 0 0 0       $obj = $type->IO::Socket::INET::new(PeerAddr => ($host = $h),
333             PeerPort => $arg{Port} || 'smtp(25)',
334             LocalAddr => $arg{LocalAddr},
335             LocalPort => $arg{LocalPort},
336             Proto => 'tcp',
337             Timeout => $connect_timeout,
338             )
339             or next;
340              
341 0           $obj->timeout($timeout); # restore the original timeout
342 0           $obj->autoflush(1);
343 0 0         $obj->debug(exists $arg{Debug} ? $arg{Debug} : undef);
344              
345 0           my $res = $obj->response();
346 0 0         unless ($res == CMD_OK) {
347 0           $obj->close();
348 0           $obj = undef;
349 0           next;
350             }
351              
352 0 0         last if $obj;
353             }
354              
355 0 0         return undef unless $obj;
356              
357 0           ${*$obj}{'net_smtp_exact_addr'} = $arg{ExactAddresses};
  0            
358 0           ${*$obj}{'net_smtp_host'} = $host;
  0            
359 0           (${*$obj}{'net_smtp_banner'}) = $obj->message;
  0            
360 0           (${*$obj}{'net_smtp_domain'}) = $obj->message =~ /\A\s*(\S+)/;
  0            
361              
362 0 0 0       unless ($obj->hello($arg{Hello} || "")) {
363 0           $obj->close();
364 0           return undef;
365             }
366              
367 0           return $obj;
368             }
369              
370             =head1 AUTHOR
371              
372             Brad Fitzpatrick -- brad@danga.com
373              
374             =head1 COPYRIGHT, LICENSE, and WARRANTY
375              
376             Copyright 2006-2007, SixApart, Ltd.
377              
378             License to use under the same terms as Perl itself.
379              
380             This software comes with no warranty of any kind.
381              
382             =head1 SEE ALSO
383              
384             L
385              
386             =cut
387              
388             1;