File Coverage

blib/lib/EMDIS/ECS/FileBackedMessage.pm
Criterion Covered Total %
statement 175 365 47.9
branch 71 258 27.5
condition 6 48 12.5
subroutine 24 27 88.8
pod 0 19 0.0
total 276 717 38.4


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2             #
3             # Copyright (C) 2010-2021 National Marrow Donor Program. All rights reserved.
4             #
5             # For a description of this module, please refer to the POD documentation
6             # embedded at the bottom of the file (e.g. perldoc EMDIS::ECS::FileBackedMessage).
7              
8             package EMDIS::ECS::FileBackedMessage;
9              
10 1         180 use EMDIS::ECS qw($ECS_CFG $ECS_NODE_TBL $FILEMODE $VERSION ecs_is_configured
11             format_datetime format_doc_filename format_msg_filename
12             log_debug log_info log_warn log_error log_fatal
13             send_amqp_message send_encrypted_message send_email
14 1     1   10367 dequote trim is_yes);
  1         5  
15 1     1   7 use Fcntl qw(:DEFAULT :flock);
  1         2  
  1         283  
16 1     1   6 use File::Basename;
  1         11  
  1         66  
17 1     1   7 use File::Spec::Functions qw(catdir catfile);
  1         1  
  1         44  
18 1     1   4 use File::Temp qw(tempfile);
  1         2  
  1         41  
19 1     1   6 use IO::File;
  1         7  
  1         182  
20 1     1   6 use strict;
  1         2  
  1         3372  
21              
22             # ----------------------------------------------------------------------
23             # Constructor.
24             # If error encountered, returns error message instead of object reference.
25             sub new
26             {
27 29     29 0 1100 my $arg1 = shift;
28 29         40 my $this;
29 29 50       61 if(ref $arg1)
30             {
31             # invoked as instance method
32 0         0 $this = $arg1;
33             }
34             else
35             {
36             # invoked as class method
37 29         42 $this = {};
38 29         51 bless $this, $arg1;
39             }
40              
41 29         40 my $err = '';
42 29         42 my ($sender_node_id, $seq_num, $filename);
43 29         42 my $argc = scalar(@_);
44 29 100       59 if($argc <= 1)
    50          
45             {
46 20         27 $filename = shift;
47             }
48             elsif($argc == 3)
49             {
50 9         20 ($sender_node_id, $seq_num, $filename) = @_;
51 9 100       25 $this->{sender_node_id} = $sender_node_id if $sender_node_id;
52 9 100       16 $this->{seq_num} = $seq_num if $seq_num;
53             }
54             else
55             {
56 0         0 return "Illegal usage -- expected 0, 1, or 3 parameters: " .
57             "[filename] or , , ";
58             }
59              
60             # set presumed message type flags - can be overridden by email headers or subject
61 29         55 $this->{is_ecs_message} = 1;
62 29         42 $this->{is_meta_message} = '';
63 29         43 $this->{is_document} = '';
64 29 100 66     211 if(defined $filename and $filename =~ /(\.doc|\.doc\.xml)$/io)
65             {
66             # filename ending in .doc or .doc.xml indicates document (not message)
67 2         5 $this->{is_ecs_message} = '';
68 2         4 $this->{is_meta_message} = '';
69 2         4 $this->{is_document} = 1;
70             }
71              
72 29         66 $this->{temp_files} = [];
73 29         37 $this->{is_closed} = 0;
74              
75             # if $filename not specified, read input from stdin
76 29 50       51 if(not $filename)
77             {
78             # read from stdin, create temp file
79 0         0 my $template = sprintf('%s_XXXX', format_datetime(time,
80             '%04d%02d%02d_%02d%02d%02d'));
81 0 0       0 return "Unable to create temp file from stdin: ECS is not configured!"
82             unless ecs_is_configured();
83 0         0 my $fh;
84 0         0 ($fh, $filename) = tempfile($template,
85             DIR => catdir($ECS_CFG->ECS_TMP_DIR),
86             SUFFIX => '.msg');
87 0         0 binmode(STDIN);
88 0         0 binmode($fh);
89 0         0 while(1)
90             {
91 0         0 my $buffer;
92              
93 0         0 my $readlen = read STDIN, $buffer, 65536;
94 0 0       0 if(not defined $readlen)
95             {
96 0         0 $err = "Unexpected problem reading STDIN: $!";
97 0         0 last;
98             }
99              
100 0 0       0 last if $readlen == 0;
101              
102 0 0       0 if(not print $fh $buffer)
103             {
104 0         0 $err = "Unexpected problem writing file $filename: $!";
105 0         0 last;
106             }
107             }
108 0         0 close $fh;
109 0 0       0 if($err)
110             {
111 0         0 unlink $filename;
112 0         0 return $err;
113             }
114 0         0 push @{$this->{temp_files}}, $filename;
  0         0  
115             }
116              
117 29         39 $this->{filename} = $filename;
118 29         34 my $file_handle;
119 29 50       888 return "Unable to open input file $filename: $!"
120             unless open $file_handle, "+< $filename";
121 29         137 $this->{file_handle} = $file_handle;
122 29         63 binmode $file_handle;
123              
124             # get exclusive lock (with retry loop)
125             # protects against reading a file while another process is writing it
126 29         38 my $locked = '';
127 29         61 for my $retry (1..5)
128             {
129 29         203 $locked = flock $file_handle, LOCK_EX | LOCK_NB;
130 29 50       76 last if $locked;
131             }
132 29 50       53 if(!$locked)
133             {
134 0         0 $err = "Unable to lock input file $filename: $!";
135 0         0 close $file_handle;
136 0         0 return $err;
137             }
138              
139 29         36 my $email_headers = '';
140 29         38 my $data_offset = 0;
141              
142             # attempt to read email headers only if sender_node_id not yet defined
143 29 100       51 if(not exists $this->{sender_node_id})
144             {
145             # attempt to read email headers from file, determine data offset
146 21         22 my $buf;
147 21         23 while(1)
148             {
149 5170         6337 my $bytecount = read $file_handle, $buf, 1;
150              
151 5170 50       6757 if(not defined $bytecount)
152             {
153 0         0 $err = "Unexpected problem reading from file $filename: $!";
154 0         0 last;
155             }
156              
157 5170 50 0     6164 if($bytecount > 0)
    0          
158             {
159 5170         5250 $email_headers .= $buf;
160 5170         5072 $data_offset++;
161              
162             # first empty line ends potential email header
163 5170 100       7189 last if $email_headers =~ /\r?\n\r?\n$/so;
164             }
165             elsif($bytecount == 0 or $data_offset >= 1048576)
166             {
167             # assume file does not contain email header
168             # if EOF encountered or no empty line found in first X bytes
169 0         0 $data_offset = 0;
170 0         0 last;
171             }
172             }
173 21 50       41 if($err)
174             {
175 0         0 close $file_handle;
176 0         0 return $err;
177             }
178             }
179              
180 29 100       48 if($data_offset > 0)
181             {
182             # convert headers to more easily parseable format, store in this obj
183 21         175 $email_headers =~ s/\r?\n/\n/go;
184              
185             # look for "Subject" line
186 21 100       108 if($email_headers =~ /^Subject:\s*(.+?)$/imo)
187             {
188 20         81 $this->{subject} = $1;
189 20         33 $this->{email_headers} = $email_headers;
190 20         28 $this->{data_offset} = $data_offset;
191             }
192             }
193              
194             # absence of "Subject" line indicates file contains data only
195 29 100       49 if(not exists $this->{subject})
196             {
197 9         16 $this->{data_offset} = 0;
198 9         67 return $this;
199             }
200              
201             # parse "Subject" into MAIL_MRK:sender_node_id[:seqnum]
202 20         24 my $mail_mrk = 'EMDIS';
203 20 50       72 if(ecs_is_configured())
204             {
205 0         0 $mail_mrk = $ECS_CFG->MAIL_MRK;
206             }
207             else
208             {
209 20         628 warn "ECS not configured, using MAIL_MRK = '$mail_mrk'\n";
210             }
211 20 100       253 if($this->{subject} =~ /$mail_mrk:(\S+?):(\d+)(:(\d+)\/(\d+))?\s*$/i)
    100          
    100          
212             {
213             # regular message
214 10         19 $this->{is_ecs_message} = 1;
215 10         14 $this->{is_meta_message} = '';
216 10         13 $this->{is_document} = '';
217 10         24 $this->{sender_node_id} = $1;
218 10         36 $this->{seq_num} = $2;
219 10 100       37 $this->{part_num} = $4 if defined $4;
220 10 100       21 $this->{num_parts} = $5 if defined $5;
221 10 100 66     40 if(exists $this->{part_num} and exists $this->{num_parts}
      66        
222             and $this->{part_num} > $this->{num_parts})
223             {
224 1         14 close $file_handle;
225 1         8 return "part_num is greater than num_parts: " . $this->{subject};
226             }
227             }
228             elsif($this->{subject} =~ /$mail_mrk:(\S+?):(\d+):DOC\s*$/io) {
229             # document
230 1         4 $this->{sender_node_id} = $1;
231 1         3 $this->{is_ecs_message} = '';
232 1         2 $this->{is_meta_message} = '';
233 1         2 $this->{is_document} = 1;
234 1         3 $this->{seq_num} = $2;
235             }
236             elsif($this->{subject} =~ /$mail_mrk:(\S+)\s*$/i)
237             {
238             # meta-message
239 4         12 $this->{sender_node_id} = $1;
240 4         6 $this->{is_ecs_message} = 1;
241 4         7 $this->{is_meta_message} = 1;
242 4         8 $this->{is_document} = '';
243             }
244             else
245             {
246             # subject line indicates this is not an ECS message or document
247 5         10 $this->{is_ecs_message} = '';
248 5         9 $this->{is_meta_message} = '';
249 5         7 $this->{is_document} = '';
250             }
251              
252 19 50       33 return $err if $err;
253              
254 19         76 return $this;
255             }
256              
257             # ----------------------------------------------------------------------
258             # prepare for object destruction: close $this->{file_handle}, delete
259             # temp files
260             sub cleanup
261             {
262 29     29 0 43 my $this = shift;
263 29 50       48 die "cleanum() must only be called as an instance method!"
264             unless ref $this;
265             close $this->{file_handle}
266 29 50       335 if exists $this->{file_handle};
267 29         58 foreach my $temp_file (@{$this->{temp_files}})
  29         71  
268             {
269 0         0 unlink $temp_file;
270             }
271 29         80 $this->{is_closed} = 1;
272             }
273              
274             # ----------------------------------------------------------------------
275             # Accessor method (read-only).
276             sub data_offset
277             {
278 4     4 0 7 my $this = shift;
279 4 50       9 die "data_offset() must only be called as an instance method!"
280             unless ref $this;
281 4         12 return $this->{data_offset};
282             }
283              
284             # ----------------------------------------------------------------------
285             # Accessor method (read-only).
286             sub email_headers
287             {
288 4     4 0 158 my $this = shift;
289 4 50       11 die "email_headers() must only be called as an instance method!"
290             unless ref $this;
291 4         15 return $this->{email_headers};
292             }
293              
294             # ----------------------------------------------------------------------
295             # Accessor method (read-only).
296             sub filename
297             {
298 1     1 0 2 my $this = shift;
299 1 50       3 die "filename() must only be called as an instance method!"
300             unless ref $this;
301 1         4 return $this->{filename};
302             }
303              
304             # ----------------------------------------------------------------------
305             # Accessor method (read-only).
306             sub hub_rcv
307             {
308 2     2 0 3 my $this = shift;
309 2 50       13 die "hub_rcv() must only be called as an instance method!"
310             unless ref $this;
311 2         8 return $this->{hub_rcv};
312             }
313              
314             # ----------------------------------------------------------------------
315             # Accessor method (read-only).
316             sub hub_snd
317             {
318 2     2 0 4 my $this = shift;
319 2 50       5 die "hub_snd() must only be called as an instance method!"
320             unless ref $this;
321 2         7 return $this->{hub_snd};
322             }
323              
324             # ----------------------------------------------------------------------
325             # Accessor method (read-only).
326             sub is_ecs_message
327             {
328 20     20 0 608 my $this = shift;
329 20 50       42 die "is_ecs_message() must only be called as an instance method!"
330             unless ref $this;
331 20         64 return $this->{is_ecs_message};
332             }
333              
334             # ----------------------------------------------------------------------
335             # Accessor method (read-only).
336             sub is_meta_message
337             {
338 16     16 0 28 my $this = shift;
339 16 50       30 die "is_meta_message() must only be called as an instance method!"
340             unless ref $this;
341 16         48 return $this->{is_meta_message};
342             }
343              
344             # ----------------------------------------------------------------------
345             # Accessor method (read-only).
346             sub is_document
347             {
348 3     3 0 6 my $this = shift;
349 3 50       8 die "is_document() must only be called as an instance method!"
350             unless ref $this;
351 3         10 return $this->{is_document};
352             }
353              
354             # ----------------------------------------------------------------------
355             # Accessor method (read-only).
356             sub num_parts
357             {
358 3     3 0 4 my $this = shift;
359 3 50       8 die "num_parts() must only be called as an instance method!"
360             unless ref $this;
361 3         11 return $this->{num_parts};
362             }
363              
364             # ----------------------------------------------------------------------
365             # Accessor method (read-only).
366             sub part_num
367             {
368 3     3 0 7 my $this = shift;
369 3 50       7 die "part_num() must only be called as an instance method!"
370             unless ref $this;
371 3         10 return $this->{part_num};
372             }
373              
374             # ----------------------------------------------------------------------
375             # Accessor method (read-only).
376             sub sender
377             {
378 0     0 0 0 my $this = shift;
379 0 0       0 die "sender() must only be called as an instance method!"
380             unless ref $this;
381 0         0 return $this->{sender_node_id};
382             }
383              
384             # ----------------------------------------------------------------------
385             # Accessor method (read-only).
386             sub sender_node_id
387             {
388 14     14 0 24 my $this = shift;
389 14 50       26 die "sender_node_id() must only be called as an instance method!"
390             unless ref $this;
391 14         48 return $this->{sender_node_id};
392             }
393              
394             # ----------------------------------------------------------------------
395             # Accessor method (read-only).
396             sub seq_num
397             {
398 10     10 0 17 my $this = shift;
399 10 50       21 die "seq_num() must only be called as an instance method!"
400             unless ref $this;
401 10         38 return $this->{seq_num};
402             }
403              
404             # ----------------------------------------------------------------------
405             # Accessor method (read-only).
406             sub subject
407             {
408 8     8 0 377 my $this = shift;
409 8 50       25 die "subject() must only be called as an instance method!"
410             unless ref $this;
411 8         33 return $this->{subject};
412             }
413              
414             # ----------------------------------------------------------------------
415             # Accessor method (read only)
416             sub temp_files
417             {
418 0     0 0 0 my $this = shift;
419 0 0       0 die "temp_files() must only be called as an instance method!"
420             unless ref $this;
421 0         0 return @{$this->{temp_files}};
  0         0  
422             }
423              
424             # ----------------------------------------------------------------------
425             # object destructor, called by perl garbage collector
426             sub DESTROY
427             {
428 56     56   121 my $this = shift;
429 56 50       103 die "DESTROY() must only be called as an instance method!"
430             unless ref $this;
431 56 100       262 $this->cleanup unless $this->{is_closed};
432             }
433              
434             # ----------------------------------------------------------------------
435             # read first portion of message, attempt to extract HUB_SND and HUB_RCV
436             # (deprecated -- may be called explicitly, but is no longer used by FileBackedMessage constructor)
437             sub inspect_fml
438             {
439 6     6 0 16 my $this = shift;
440 6 50       14 return "inspect_fml() must only be called as an instance method!"
441             unless ref $this;
442             return "inspect_fml(): this FileBackedMessage object is closed!"
443 6 50       13 if $this->{is_closed};
444              
445             # read first part of FML payload, look for HUB_SND, HUB_RCV
446              
447             return "Unable to position file pointer for file $this->{filename}" .
448             " to position $this->{data_offset}: $!"
449 6 50       57 unless seek $this->{file_handle}, $this->{data_offset}, 0;
450 6         11 my $fml;
451 6         100 my $bytecount = read $this->{file_handle}, $fml, 65536;
452 6 50       14 return "Unable to read from file " . $this->{filename} . ": $!"
453             unless defined $bytecount;
454              
455             # compute is_ecs_message and is_meta_message
456 6 100       43 if($fml =~ /^\s*(BLOCK_BEGIN\s+\w+\s*;\s*)?\w+\s*:/iso)
    100          
457             {
458 4         8 $this->{is_ecs_message} = 1;
459 4         6 $this->{is_meta_message} = '';
460 4         6 $this->{is_document} = '';
461             }
462             elsif($fml =~ /^\s*msg_type\s*=\s*\S+/isom)
463             {
464 1         2 $this->{is_ecs_message} = 1;
465 1         2 $this->{is_meta_message} = 1;
466 1         3 $this->{is_document} = '';
467 1         3 return '';
468             }
469             else
470             {
471 1         3 $this->{is_ecs_message} = '';
472 1         3 $this->{is_meta_message} = '';
473 1         2 $this->{is_document} = '';
474 1         4 return '';
475             }
476              
477             # Note: this code only understands the simple forms of FML assignments
478             # (not the extended /FIELDS form)
479              
480             # look for HUB_RCV
481 4 100       16 if($fml =~ /HUB_RCV\s*=\s*([^,;]+)/iso) # presumes [^,;] in HUB_RCV
482             {
483 3         12 $this->{hub_rcv} = dequote(trim($1));
484             }
485              
486             # look for HUB_SND
487 4 100       19 if($fml =~ /HUB_SND\s*=\s*([^,;]+)/iso) # presumes [^,;] in HUB_SND
488             {
489 3         13 $this->{hub_snd} = dequote(trim($1));
490             }
491              
492 4         11 return '';
493             }
494              
495             # ----------------------------------------------------------------------
496             sub send_this_message
497             {
498 0     0 0   my $this = shift;
499 0 0         return "send_this_message() must only be called as an instance method!"
500             unless ref $this;
501             return "send_this_message(): this FileBackedMessage object is closed!"
502 0 0         if $this->{is_closed};
503             return "send_this_message(): this FileBackedMessage object represents " .
504             "only a partial message!"
505 0 0 0       if defined $this->{num_parts} and $this->{num_parts} > 1;
506              
507             # initialize
508 0           my $rcv_node_id = shift;
509 0           my $is_re_send = shift;
510 0           my $part_num = shift;
511 0 0         return "send_this_message(): ECS has not been configured."
512             unless ecs_is_configured();
513 0           my $cfg = $ECS_CFG;
514 0           my $node_tbl = $ECS_NODE_TBL;
515 0           my $err = '';
516              
517 0 0 0       return "send_this_message(): Missing \$rcv_node_id!"
518             unless defined $rcv_node_id and $rcv_node_id;
519              
520             # lock node_tbl, look up $rcv_node_id
521 0           my $was_locked = $node_tbl->LOCK;
522 0 0         if(not $was_locked)
523             {
524 0 0         $node_tbl->lock() # lock node_tbl
525             or return "send_this_message(): unable to lock node_tbl: " .
526             $node_tbl->ERROR;
527             }
528 0           my $node = $node_tbl->read($rcv_node_id);
529 0 0         if(not $node)
530             {
531 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
532 0           return "send_this_message(): node not found: $rcv_node_id";
533             }
534 0 0         if(not $node->{addr})
535             {
536 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
537 0           return "send_this_message(): addr not defined for node: $rcv_node_id";
538             }
539              
540             # compute or assign message seq_num
541 0           my $seq_num = '';
542 0 0 0       if($is_re_send and not $this->{is_document})
    0 0        
    0          
    0          
543             {
544             # sanity checks
545 0 0         if(not defined $this->{seq_num})
546             {
547 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
548 0           return "send_this_message(): seq_num not defined for RE_SEND";
549             }
550 0 0         if($this->{seq_num} > $node->{out_seq})
551             {
552 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
553             return "send_this_message(): seq_num for RE_SEND (" .
554             $this->{seq_num} . ") is greater than out_seq for node " .
555 0           "$rcv_node_id (" . $node->{out_seq} . ")!";
556             }
557 0           $seq_num = $this->{seq_num};
558             }
559             elsif($is_re_send and $this->{is_document})
560             {
561             # sanity checks
562 0 0         if(not defined $this->{seq_num})
563             {
564 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
565 0           return "send_this_message(): seq_num not defined for DOC_RE_SEND";
566             }
567 0 0         if($this->{seq_num} > $node->{doc_out_seq})
568             {
569 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
570             return "send_this_message(): seq_num for DOC_RE_SEND (" .
571             $this->{seq_num} . ") is greater than doc_out_seq for node " .
572 0           "$rcv_node_id (" . $node->{doc_out_seq} . ")!";
573             }
574 0           $seq_num = $this->{seq_num};
575             }
576             elsif($this->{is_document})
577             {
578             # automatically get next (doc) sequence number
579 0           $node->{doc_out_seq}++;
580 0           $seq_num = $node->{doc_out_seq};
581             }
582             elsif(not $this->{is_meta_message})
583             {
584             # only allow $part_num to be specified if this is a RE_SEND request
585 0 0         if($part_num)
586             {
587 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
588 0           return "send_this_message(): part_num specified ($part_num), for " .
589             "non- RE_SEND request!";
590             }
591             # automatically get next (msg) sequence number
592 0           $node->{out_seq}++;
593 0           $seq_num = $node->{out_seq};
594             }
595              
596             # compute message part size
597 0           my $msg_part_size = $cfg->MSG_PART_SIZE_DFLT;
598 0 0 0       if(defined $node->{msg_part_size} and $node->{msg_part_size} > 0)
599             {
600 0           $msg_part_size = $node->{msg_part_size};
601             }
602              
603             # compute data size
604 0           my $file_size = (stat $this->{file_handle})[7];
605 0           my $data_size = $file_size - $this->{data_offset};
606 0 0         if($data_size <= 0)
607             {
608 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
609 0           return "send_this_message(): data_size is <= 0 ($data_size)!";
610             }
611              
612             # for document, force num_parts = 1
613 0 0         if($this->{is_document})
614             {
615 0           $msg_part_size = $data_size;
616             }
617              
618             # compute num_parts
619 0           my $num_parts = int($data_size / $msg_part_size);
620 0 0         $num_parts++ if ($data_size % $msg_part_size) > 0;
621             # num_parts should be 1 for meta message
622 0 0 0       if($this->{is_meta_message} and $num_parts > 1)
623             {
624 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
625 0           return "send_this_message(): num_parts cannot be > 1 for meta message!";
626             }
627             # $part_num cannot be greater than $num_parts
628 0 0 0       if(defined $part_num and $part_num and $part_num > $num_parts)
      0        
629             {
630 0 0         $node_tbl->unlock() unless $was_locked; # unlock node_tbl if needed
631 0           return "send_this_message(): part_num ($part_num) cannot be greater " .
632             "than num_parts ($num_parts)!";
633             }
634              
635             # compute base subject
636 0           my $subject = $cfg->MAIL_MRK . ':' . $cfg->THIS_NODE;
637 0 0         $subject .= ":$seq_num" if $seq_num;
638 0 0         $subject .= ":DOC" if $this->{is_document};
639              
640 0 0         if($is_re_send)
641             {
642             # to save disk space, don't copy message to file for RE_SEND
643 0 0         log_info("send_this_message(): transmitting $rcv_node_id RE_SEND " .
644             "message $seq_num" . ($part_num ? ":$part_num" : '') . "\n");
645             }
646             else
647             {
648             # copy message to file (for non- RE_SEND)
649              
650 0           my $filename;
651              
652 0 0         if($this->{is_meta_message})
653             {
654             # copy meta message to mboxes/out subdirectory
655 0           $filename = sprintf("%s_%s_%s.msg",
656             $cfg->THIS_NODE, $rcv_node_id, "META");
657 0           my $dirname = $cfg->ECS_MBX_OUT_DIR;
658             # create directory if it doesn't already exist
659 0 0         mkdir $dirname unless -e $dirname;
660 0           $filename = catfile($dirname, $filename);
661             }
662             else
663             {
664             # copy regular message or document file to mboxes/out_NODE subdirectory
665 0 0         if($this->{is_document})
666             {
667 0           $filename = format_doc_filename($rcv_node_id, $seq_num);
668             }
669             else
670             {
671 0           $filename = format_msg_filename($rcv_node_id, $seq_num);
672             }
673             # create directory if it doesn't already exist
674 0           my $dirname = dirname($filename);
675 0 0         mkdir $dirname unless -e $dirname;
676             }
677              
678             # don't overwrite $filename file if it already exists
679 0           my $fh;
680 0 0         if(-e $filename)
681             {
682 0           my $template = $filename . "_XXXXXX";
683 0           ($fh, $filename) = tempfile($template);
684 0 0         return "send_this_message(): unable to open _XXXX file: " .
685             "$filename"
686             unless $fh;
687             }
688             else
689             {
690 0           $fh = new IO::File;
691 0 0         return "send_this_message(): unable to open file: " .
692             "$filename"
693             unless $fh->open("> $filename");
694             }
695              
696 0           print $fh "Subject: $subject\n";
697 0           print $fh "To: $node->{addr}\n";
698 0           print $fh "From: " . $cfg->SMTP_FROM . "\n\n";
699             # copy data to $fh
700             $err = "Unable to position file pointer for file $this->{filename}" .
701             " to position $this->{data_offset}: $!"
702 0 0         unless seek $this->{file_handle}, $this->{data_offset}, 0;
703 0           my $buffer;
704 0           while(1)
705             {
706 0 0         if($err)
707             {
708 0 0         $node_tbl->unlock() unless $was_locked; # unlock if needed
709 0           close $fh;
710 0           unlink $filename;
711 0           return $err;
712             }
713              
714 0           my $bytecount = read $this->{file_handle}, $buffer, 65536;
715 0 0         if(not defined $bytecount)
    0          
716             {
717 0           $err = "send_this_message(): Problem reading input file " .
718             "$this->{filename}: $!";
719             }
720             elsif($bytecount == 0)
721             {
722 0           last; # EOF
723             }
724             else
725             {
726 0 0         print $fh $buffer
727             or $err = "send_this_message(): Problem writing output " .
728             "file $filename: $!";
729             }
730             }
731 0           close $fh;
732 0           chmod $FILEMODE, $filename;
733             }
734              
735 0           my $custom_headers = {};
736 0           $custom_headers->{'x-emdis-hub-rcv'} = $rcv_node_id;
737 0           $custom_headers->{'x-emdis-hub-snd'} = $cfg->THIS_NODE;
738              
739 0 0         if($num_parts == 1)
740             {
741             # read all data, send single email message
742             $err = "send_this_message(): Unable to position file pointer for " .
743             "file $this->{filename} to position $this->{data_offset}: $!"
744 0 0         unless seek $this->{file_handle}, $this->{data_offset}, 0;
745              
746 0 0         if(not $err)
747             {
748 0           my $all_data;
749 0           my $bytecount = read $this->{file_handle}, $all_data, $data_size;
750              
751 0 0 0       if(not defined $bytecount)
    0          
    0          
752             {
753 0           $err = "send_this_message(): Problem reading input file " .
754             "$this->{filename}: $!";
755             }
756             elsif($bytecount != $data_size)
757             {
758 0           $err = "send_this_message(): Problem reading from input file " .
759             "$this->{filename}: expected $msg_part_size bytes, " .
760             "found $bytecount bytes.";
761             }
762             elsif($this->{is_meta_message}
763             and ($node->{encr_meta} !~ /true/io))
764             {
765             # don't encrypt meta-message
766 0 0 0       if(is_yes($cfg->ENABLE_AMQP) and exists $node->{amqp_addr_meta} and $node->{amqp_addr_meta}) {
    0 0        
767             # send meta-message via AMQP (if indicated by node config)
768             $err = send_amqp_message(
769             $node->{amqp_addr_meta},
770 0           $subject,
771             $node,
772             $custom_headers,
773             $all_data);
774             }
775             elsif(is_yes($node->{amqp_only})) {
776 0           $err = "send_this_message(): Unable to send email META message " .
777             "to node $rcv_node_id: amqp_only selected.";
778             }
779             else {
780 0           $err = send_email($node->{addr}, $subject, undef, $all_data);
781             }
782             }
783             else
784             {
785             # send encrypted message
786             $err = send_encrypted_message(
787             $node->{encr_typ},
788             $node->{addr_r},
789             $node->{addr},
790             $node->{encr_out_keyid},
791             $node->{encr_out_passphrase},
792 0           $node,
793             $subject,
794             $custom_headers,
795             $all_data);
796             }
797             }
798             }
799             else
800             {
801             # process message parts ...
802              
803 0           my $min_part_num = 1;
804 0           my $max_part_num = $num_parts;
805 0 0         if($part_num)
806             {
807             # if $part_num specified, limit to that $part_num
808 0           $min_part_num = $part_num;
809 0           $max_part_num = $part_num;
810             }
811              
812             # iterate through message part(s), send email message(s)
813 0           my $parts_sent = 0;
814 0           for($part_num = $min_part_num; $part_num <= $max_part_num; $part_num++)
815             {
816             my $part_offset = $this->{data_offset} +
817 0           ($part_num -1) * $msg_part_size;
818             $err = "send_this_message(): Unable to position file pointer for " .
819             "file $this->{filename} to position $this->{data_offset}: $!"
820 0 0         unless seek $this->{file_handle}, $part_offset, 0;
821              
822 0 0         if(not $err)
823             {
824 0           my $part_data;
825 0           my $bytecount = read $this->{file_handle}, $part_data,
826             $msg_part_size;
827              
828 0 0 0       if(not defined $bytecount)
    0          
    0          
829             {
830 0           $err = "send_this_message(): Problem reading input file " .
831             "$this->{filename}: $!";
832             }
833             elsif($part_num < $num_parts and $bytecount != $msg_part_size)
834             {
835 0           $err = "send_this_message(): Problem reading $rcv_node_id " .
836             "message part $part_num/$num_parts from input file " .
837             "$this->{filename}: expected $msg_part_size bytes, " .
838             "found $bytecount bytes.";
839             }
840             elsif($bytecount <= 0)
841             {
842 0           $err = "send_this_message(): Problem reading $rcv_node_id " .
843             "message part $part_num/$num_parts from input file " .
844             "$this->{filename}: found $bytecount bytes.";
845             }
846             else
847             {
848             # send encrypted email message
849             $err = send_encrypted_message(
850             $node->{encr_typ},
851             $node->{addr_r},
852             $node->{addr},
853             $node->{encr_out_keyid},
854             $node->{encr_out_passphrase},
855 0           $node,
856             "$subject:$part_num/$num_parts",
857             $custom_headers,
858             $part_data);
859             }
860             }
861              
862 0 0         if($err)
863             {
864 0 0         if($parts_sent == 0)
865             {
866             # nothing sent yet, so quit now (possible smtp problem?)
867 0           last;
868             }
869             else
870             {
871             # part of message was sent, so log error and continue
872 0           log_error($err);
873 0           $err = '';
874             }
875             }
876             else
877             {
878 0           $parts_sent++;
879             }
880             }
881             }
882              
883 0 0         if(not $err)
884             {
885             # update node last_out, possibly out_seq
886 0           $node->{last_out} = time();
887 0 0         $err = $node_tbl->ERROR
888             unless $node_tbl->write($rcv_node_id, $node);
889             }
890             $node_tbl->unlock() # unlock node_tbl if needed
891 0 0         unless $was_locked;
892              
893 0           return $err;
894             }
895              
896             1;
897              
898             __DATA__