File Coverage

blib/lib/IO/Lambda/Thread.pm
Criterion Covered Total %
statement 21 77 27.2
branch 0 26 0.0
condition 0 9 0.0
subroutine 7 16 43.7
pod 4 5 80.0
total 32 133 24.0


line stmt bran cond sub pod time code
1             # $Id: Thread.pm,v 1.25 2010/03/27 19:51:24 dk Exp $
2             package IO::Lambda::Thread;
3 1     1   896 use base qw(IO::Lambda);
  1         1  
  1         105  
4 1     1   5 use strict;
  1         2  
  1         22  
5 1     1   4 use warnings;
  1         2  
  1         23  
6 1     1   5 use Exporter;
  1         13  
  1         32  
7 1     1   898 use Socket;
  1         3843  
  1         642  
8 1     1   913 use IO::Handle;
  1         6818  
  1         46  
9 1     1   6 use IO::Lambda qw(:all :dev swap_frame);
  1         2  
  1         1234  
10              
11             our $DISABLED;
12             eval { require threads; };
13             $DISABLED = $@ if $@;
14              
15             our $DEBUG = $IO::Lambda::DEBUG{thread};
16              
17             our @EXPORT_OK = qw(threaded new_thread);
18             our %EXPORT_TAGS = (all => \@EXPORT_OK);
19              
20 0     0     sub _d { "threaded(" . _o($_[0]) . ")" }
21              
22             sub thread_init
23             {
24 0     0 0   my ( $r, $cb, $pass_handle, @param) = @_;
25              
26 0     0     $SIG{KILL} = sub { threads-> exit(0) };
  0            
27 0           $SIG{PIPE} = 'IGNORE';
28 0           eval "END { IO::Lambda::__end(); };";
29 0 0         warn "thread(", threads->tid, ") started\n" if $DEBUG;
30              
31 0           my @ret;
32 0 0         eval { @ret = $cb-> (( $pass_handle ? $r : ()), @param) if $cb };
  0 0          
33              
34 0 0         warn "thread(", threads->tid, ") ended: [@ret]\n" if $DEBUG;
35 0           close($r);
36 0           undef $r;
37 0 0         die $@ if $@;
38              
39 0           return @ret;
40             }
41              
42             sub new_thread
43             {
44 0 0   0 1   return undef, $DISABLED if $DISABLED;
45              
46 0           my ( @args, $cb, $pass_handle, @param);
47 0 0 0       @args = shift if $_[0] and ref($_[0]) and ref($_[0]) eq 'HASH';
      0        
48 0           ( $cb, $pass_handle, @param) = @_;
49            
50 0           my $r = IO::Handle-> new;
51 0           my $w = IO::Handle-> new;
52 0           socketpair( $r, $w, AF_UNIX, SOCK_STREAM, PF_UNSPEC);
53 0           $w-> blocking(0);
54              
55 0           my ($t) = threads-> create(
56             @args,
57             \&thread_init,
58             $r, $cb, $pass_handle, @param
59             );
60              
61 0           close($r);
62              
63 0 0         warn "new thread(", $t->tid, ")\n" if $DEBUG;
64 0           return ($t, $w);
65             }
66              
67             # overridden IO::Lambda methods
68              
69             sub DESTROY
70             {
71 0     0     my $self = shift;
72              
73 0 0 0       return if defined($self->{tid}) and $self->{tid} != threads-> tid;
74              
75 0 0         close($self->{socket}) if $self-> {socket};
76 0           delete @{$self}{qw(socket thread)};
  0            
77              
78 0           $self-> SUPER::DESTROY;
79             }
80              
81 0     0 1   sub thread { $_[0]-> {thread} }
82 0     0 1   sub socket { $_[0]-> {socket} }
83              
84             sub threaded(&)
85             {
86 0     0 1   my $cb = shift;
87              
88             # use overridden IO::Lambda, because we need
89             # give the caller a chance to join
90             # for it, if the lambda gets terminated
91             __PACKAGE__-> new( sub {
92             # Save context. This is needed because the caller
93             # may have his own this. lambda(&) does the same
94             # protection
95 0     0     my $this = shift;
96 0           my @frame = swap_frame($this);
97              
98 0 0         warn _d($this), " started\n" if $DEBUG;
99              
100             # can start a thread?
101 0           my ( $t, $r) = new_thread( $cb, 1 );
102 0 0         return $r unless $t;
103              
104             # save this
105 0           $this-> {tid} = threads-> tid;
106 0           $this-> {thread} = $t;
107 0           $this-> {socket} = $r;
108              
109             # now wait
110 0           context $this-> {socket};
111             readable {
112 0           my $this = this;
113 0           delete $this-> {thread};
114 0           close($this-> {socket});
115 0           delete @{$this}{qw(socket thread)};
  0            
116 0           $this-> clear;
117 0 0         warn _d($this), " joining\n" if $DEBUG;
118 0           $t-> join;
119 0           };
120              
121             # restore context
122 0           swap_frame(@frame);
123 0           });
124             }
125              
126             1;
127              
128             __DATA__