Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
#
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
#
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
#
19
 
20
require 5.6.0;
21
use strict;
22
use warnings;
23
 
24
use Thrift;
25
use Thrift::Transport;
26
 
27
use IO::Socket::INET;
28
use IO::Select;
29
 
30
package Thrift::Socket;
31
 
32
use base('Thrift::Transport');
33
 
34
sub new
35
{
36
    my $classname    = shift;
37
    my $host         = shift || "localhost";
38
    my $port         = shift || 9090;
39
    my $debugHandler = shift;
40
 
41
    my $self = {
42
        host         => $host,
43
        port         => $port,
44
        debugHandler => $debugHandler,
45
        debug        => 0,
46
        sendTimeout  => 10000,
47
        recvTimeout  => 10000,
48
        handle       => undef,
49
    };
50
 
51
    return bless($self,$classname);
52
}
53
 
54
 
55
sub setSendTimeout
56
{
57
    my $self    = shift;
58
    my $timeout = shift;
59
 
60
    $self->{sendTimeout} = $timeout;
61
}
62
 
63
sub setRecvTimeout
64
{
65
    my $self    = shift;
66
    my $timeout = shift;
67
 
68
    $self->{recvTimeout} = $timeout;
69
}
70
 
71
 
72
#
73
#Sets debugging output on or off
74
#
75
# @param bool $debug
76
#
77
sub setDebug
78
{
79
    my $self  = shift;
80
    my $debug = shift;
81
 
82
    $self->{debug} = $debug;
83
}
84
 
85
#
86
# Tests whether this is open
87
#
88
# @return bool true if the socket is open
89
#
90
sub isOpen
91
{
92
    my $self = shift;
93
 
94
    if( defined $self->{handle} ){
95
        return ($self->{handle}->handles())[0]->connected;
96
    }
97
 
98
    return 0;
99
}
100
 
101
#
102
# Connects the socket.
103
#
104
sub open
105
{
106
    my $self = shift;
107
 
108
    my $sock = IO::Socket::INET->new(PeerAddr => $self->{host},
109
                                            PeerPort => $self->{port},
110
                                            Proto    => 'tcp',
111
                                            Timeout  => $self->{sendTimeout}/1000)
112
        || do {
113
            my $error = 'TSocket: Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')';
114
 
115
            if ($self->{debug}) {
116
                $self->{debugHandler}->($error);
117
            }
118
 
119
            die new Thrift::TException($error);
120
 
121
        };
122
 
123
 
124
    $self->{handle} = new IO::Select( $sock );
125
}
126
 
127
#
128
# Closes the socket.
129
#
130
sub close
131
{
132
    my $self = shift;
133
 
134
    if( defined $self->{handle} ){
135
        CORE::close( ($self->{handle}->handles())[0] );
136
    }
137
}
138
 
139
#
140
# Uses stream get contents to do the reading
141
#
142
# @param int $len How many bytes
143
# @return string Binary data
144
#
145
sub readAll
146
{
147
    my $self = shift;
148
    my $len  = shift;
149
 
150
 
151
    return unless defined $self->{handle};
152
 
153
    my $pre = "";
154
    while (1) {
155
 
156
        #check for timeout
157
        my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
158
 
159
        if(@sockets == 0){
160
            die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '.
161
                                       $self->{host}.':'.$self->{port});
162
        }
163
 
164
        my $sock = $sockets[0];
165
 
166
        my ($buf,$sz);
167
        $sock->recv($buf, $len);
168
 
169
        if (!defined $buf || $buf eq '') {
170
 
171
            die new Thrift::TException('TSocket: Could not read '.$len.' bytes from '.
172
                               $self->{host}.':'.$self->{port});
173
 
174
        } elsif (($sz = length($buf)) < $len) {
175
 
176
            $pre .= $buf;
177
            $len -= $sz;
178
 
179
        } else {
180
            return $pre.$buf;
181
        }
182
    }
183
}
184
 
185
#
186
# Read from the socket
187
#
188
# @param int $len How many bytes
189
# @return string Binary data
190
#
191
sub read
192
{
193
    my $self = shift;
194
    my $len  = shift;
195
 
196
    return unless defined $self->{handle};
197
 
198
    #check for timeout
199
    my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
200
 
201
    if(@sockets == 0){
202
        die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '.
203
                                   $self->{host}.':'.$self->{port});
204
    }
205
 
206
    my $sock = $sockets[0];
207
 
208
    my ($buf,$sz);
209
    $sock->recv($buf, $len);
210
 
211
    if (!defined $buf || $buf eq '') {
212
 
213
        die new TException('TSocket: Could not read '.$len.' bytes from '.
214
                           $self->{host}.':'.$self->{port});
215
 
216
    }
217
 
218
    return $buf;
219
}
220
 
221
 
222
#
223
# Write to the socket.
224
#
225
# @param string $buf The data to write
226
#
227
sub write
228
{
229
    my $self = shift;
230
    my $buf  = shift;
231
 
232
 
233
    return unless defined $self->{handle};
234
 
235
    while (length($buf) > 0) {
236
 
237
 
238
        #check for timeout
239
        my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 );
240
 
241
        if(@sockets == 0){
242
            die new Thrift::TException('TSocket: timed out writing to bytes from '.
243
                                       $self->{host}.':'.$self->{port});
244
        }
245
 
246
        my $sock = $sockets[0];
247
 
248
        my $got = $sock->send($buf);
249
 
250
        if (!defined $got || $got == 0 ) {
251
            die new Thrift::TException('TSocket: Could not write '.length($buf).' bytes '.
252
                                 $self->{host}.':'.$self->{host});
253
        }
254
 
255
        $buf = substr($buf, $got);
256
    }
257
}
258
 
259
#
260
# Flush output to the socket.
261
#
262
sub flush
263
{
264
    my $self = shift;
265
 
266
    return unless defined $self->{handle};
267
 
268
    my $ret  = ($self->{handle}->handles())[0]->flush;
269
}
270
 
271
 
272
#
273
# Build a ServerSocket from the ServerTransport base class
274
#
275
package  Thrift::ServerSocket;
276
 
277
use base qw( Thrift::Socket Thrift::ServerTransport );
278
 
279
use constant LISTEN_QUEUE_SIZE => 128;
280
 
281
sub new
282
{
283
    my $classname   = shift;
284
    my $port        = shift;
285
 
286
    my $self        = $classname->SUPER::new(undef, $port, undef);
287
    return bless($self,$classname);
288
}
289
 
290
sub listen
291
{
292
    my $self = shift;
293
 
294
    # Listen to a new socket
295
    my $sock = IO::Socket::INET->new(LocalAddr => undef, # any addr
296
                                     LocalPort => $self->{port},
297
                                     Proto     => 'tcp',
298
                                     Listen    => LISTEN_QUEUE_SIZE,
299
                                     ReuseAddr => 1)
300
        || do {
301
            my $error = 'TServerSocket: Could not bind to ' .
302
                        $self->{host} . ':' . $self->{port} . ' (' . $! . ')';
303
 
304
            if ($self->{debug}) {
305
                $self->{debugHandler}->($error);
306
            }
307
 
308
            die new Thrift::TException($error);
309
        };
310
 
311
    $self->{handle} = $sock;
312
}
313
 
314
sub accept
315
{
316
    my $self = shift;
317
 
318
    if ( exists $self->{handle} and defined $self->{handle} )
319
    {
320
        my $client        = $self->{handle}->accept();
321
        my $result        = new Thrift::Socket;
322
        $result->{handle} = new IO::Select($client);
323
        return $result;
324
    }
325
 
326
    return 0;
327
}
328
 
329
 
330
1;