File Coverage

blib/lib/Thrift/Socket.pm
Criterion Covered Total %
statement 29 106 27.3
branch 0 26 0.0
condition 0 19 0.0
subroutine 10 25 40.0
pod 0 10 0.0
total 39 186 20.9


line stmt bran cond sub pod time code
1             #
2             # Licensed to the Apache Software Foundation (ASF) under one
3             # or more contributor license agreements. See the NOTICE file
4             # distributed with this work for additional information
5             # regarding copyright ownership. The ASF licenses this file
6             # to you under the Apache License, Version 2.0 (the
7             # "License"); you may not use this file except in compliance
8             # with the License. You may obtain a copy of the License at
9             #
10             # http://www.apache.org/licenses/LICENSE-2.0
11             #
12             # Unless required by applicable law or agreed to in writing,
13             # software distributed under the License is distributed on an
14             # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15             # KIND, either express or implied. See the License for the
16             # specific language governing permissions and limitations
17             # under the License.
18             #
19              
20 1     1   367 use 5.10.0;
  1         2  
21 1     1   4 use strict;
  1         2  
  1         16  
22 1     1   4 use warnings;
  1         1  
  1         19  
23              
24 1     1   3 use Thrift;
  1         1  
  1         14  
25 1     1   3 use Thrift::Exception;
  1         2  
  1         12  
26 1     1   4 use Thrift::Transport;
  1         1  
  1         24  
27              
28 1     1   401 use IO::Socket::INET;
  1         17262  
  1         5  
29 1     1   728 use IO::Select;
  1         1354  
  1         45  
30              
31             package Thrift::Socket;
32 1     1   6 use base qw( Thrift::Transport );
  1         2  
  1         90  
33 1     1   6 use version 0.77; our $VERSION = version->declare("$Thrift::VERSION");
  1         13  
  1         4  
34              
35             #
36             # Construction and usage
37             #
38             # my $opts = {}
39             # my $socket = Thrift::Socket->new(\%opts);
40             #
41             # options:
42             #
43             # host => host to connect to
44             # port => port to connect to
45             # sendTimeout => timeout used for send and for connect
46             # recvTimeout => timeout used for recv
47             #
48              
49             sub new
50             {
51 0     0 0   my $classname = shift;
52 0           my $opts = shift;
53              
54             # default settings:
55 0           my $self = {
56             host => 'localhost',
57             port => 9090,
58             recvTimeout => 10000,
59             sendTimeout => 10000,
60             proto => 'tcp',
61             handle => undef
62             };
63              
64 0 0 0       if (defined $opts and ref $opts eq ref {}) {
65              
66             # argument is a hash of options so override the defaults
67 0           $self->{$_} = $opts->{$_} for keys %$opts;
68              
69             } else {
70              
71             # older style constructor takes 3 arguments, none of which are required
72 0   0       $self->{host} = $opts || 'localhost';
73 0   0       $self->{port} = shift || 9090;
74              
75             }
76              
77 0           return bless($self,$classname);
78             }
79              
80              
81             sub setSendTimeout
82             {
83 0     0 0   my $self = shift;
84 0           my $timeout = shift;
85              
86 0           $self->{sendTimeout} = $timeout;
87             }
88              
89             sub setRecvTimeout
90             {
91 0     0 0   my $self = shift;
92 0           my $timeout = shift;
93              
94 0           $self->{recvTimeout} = $timeout;
95             }
96              
97              
98             #
99             # Tests whether this is open
100             #
101             # @return bool true if the socket is open
102             #
103             sub isOpen
104             {
105 0     0 0   my $self = shift;
106              
107 0 0         if( defined $self->{handle} ){
108 0           return ($self->{handle}->handles())[0]->connected;
109             }
110              
111 0           return 0;
112             }
113              
114             #
115             # Connects the socket.
116             #
117             sub open
118             {
119 0     0 0   my $self = shift;
120              
121 0   0       my $sock = $self->__open() || do {
122             my $error = ref($self).': Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')';
123             die Thrift::TTransportException->new($error, Thrift::TTransportException::NOT_OPEN);
124             };
125              
126 0           $self->{handle} = IO::Select->new( $sock );
127             }
128              
129             #
130             # Closes the socket.
131             #
132             sub close
133             {
134 0     0 0   my $self = shift;
135 0 0         if( defined $self->{handle} ) {
136 0           $self->__close();
137             }
138             }
139              
140             #
141             # Uses stream get contents to do the reading
142             #
143             # @param int $len How many bytes
144             # @return string Binary data
145             #
146             sub readAll
147             {
148 0     0 0   my $self = shift;
149 0           my $len = shift;
150              
151              
152 0 0         return unless defined $self->{handle};
153              
154 0           my $pre = "";
155 0           while (1) {
156              
157 0           my $sock = $self->__wait();
158 0           my $buf = $self->__recv($sock, $len);
159              
160 0 0 0       if (!defined $buf || $buf eq '') {
    0          
161              
162             die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '.
163 0           $self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE);
164              
165             }
166             elsif ((my $sz = length($buf)) < $len) {
167              
168 0           $pre .= $buf;
169 0           $len -= $sz;
170              
171             }
172             else {
173 0           return $pre.$buf;
174             }
175             }
176             }
177              
178             #
179             # Read from the socket
180             #
181             # @param int $len How many bytes
182             # @return string Binary data
183             #
184             sub read
185             {
186 0     0 0   my $self = shift;
187 0           my $len = shift;
188              
189 0 0         return unless defined $self->{handle};
190              
191 0           my $sock = $self->__wait();
192 0           my $buf = $self->__recv($sock, $len);
193              
194 0 0 0       if (!defined $buf || $buf eq '') {
195              
196             die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '.
197 0           $self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE);
198              
199             }
200              
201 0           return $buf;
202             }
203              
204              
205             #
206             # Write to the socket.
207             #
208             # @param string $buf The data to write
209             #
210             sub write
211             {
212 0     0 0   my $self = shift;
213 0           my $buf = shift;
214              
215 0 0         return unless defined $self->{handle};
216              
217 0           while (length($buf) > 0) {
218             #check for timeout
219 0           my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 );
220              
221 0 0         if(@sockets == 0){
222             die Thrift::TTransportException->new(ref($self).': timed out writing to bytes from '.
223 0           $self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT);
224             }
225              
226 0           my $sent = $self->__send($sockets[0], $buf);
227              
228 0 0 0       if (!defined $sent || $sent == 0 ) {
229              
230             die Thrift::TTransportException->new(ref($self).': Could not write '.length($buf).' bytes '.
231 0           $self->{host}.':'.$self->{host}, Thrift::TTransportException::END_OF_FILE);
232              
233             }
234              
235 0           $buf = substr($buf, $sent);
236             }
237             }
238              
239             #
240             # Flush output to the socket.
241             #
242             sub flush
243             {
244 0     0 0   my $self = shift;
245              
246 0 0         return unless defined $self->{handle};
247              
248 0           my $ret = ($self->{handle}->handles())[0]->flush;
249             }
250              
251             ###
252             ### Overridable methods
253             ###
254              
255             #
256             # Open a connection to a server.
257             #
258             sub __open
259             {
260 0     0     my $self = shift;
261             return IO::Socket::INET->new(PeerAddr => $self->{host},
262             PeerPort => $self->{port},
263             Proto => $self->{proto},
264 0           Timeout => $self->{sendTimeout} / 1000);
265             }
266              
267             #
268             # Close the connection
269             #
270             sub __close
271             {
272 0     0     my $self = shift;
273 0           CORE::close(($self->{handle}->handles())[0]);
274             }
275              
276             #
277             # Read data
278             #
279             # @param[in] $sock the socket
280             # @param[in] $len the length to read
281             # @returns the data buffer that was read
282             #
283             sub __recv
284             {
285 0     0     my $self = shift;
286 0           my $sock = shift;
287 0           my $len = shift;
288 0           my $buf = undef;
289 0           $sock->recv($buf, $len);
290 0           return $buf;
291             }
292              
293             #
294             # Send data
295             #
296             # @param[in] $sock the socket
297             # @param[in] $buf the data buffer
298             # @returns the number of bytes written
299             #
300             sub __send
301             {
302 0     0     my $self = shift;
303 0           my $sock = shift;
304 0           my $buf = shift;
305 0           return $sock->send($buf);
306             }
307              
308             #
309             # Wait for data to be readable
310             #
311             # @returns a socket that can be read
312             #
313             sub __wait
314             {
315 0     0     my $self = shift;
316 0           my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
317              
318 0 0         if (@sockets == 0) {
319             die Thrift::TTransportException->new(ref($self).': timed out reading from '.
320 0           $self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT);
321             }
322              
323 0           return $sockets[0];
324             }
325              
326              
327             1;