Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#require 5.6.0;use strict;use warnings;use Thrift;use Thrift::Transport;use IO::Socket::INET;use IO::Select;package Thrift::Socket;use base('Thrift::Transport');sub new{my $classname = shift;my $host = shift || "localhost";my $port = shift || 9090;my $debugHandler = shift;my $self = {host => $host,port => $port,debugHandler => $debugHandler,debug => 0,sendTimeout => 10000,recvTimeout => 10000,handle => undef,};return bless($self,$classname);}sub setSendTimeout{my $self = shift;my $timeout = shift;$self->{sendTimeout} = $timeout;}sub setRecvTimeout{my $self = shift;my $timeout = shift;$self->{recvTimeout} = $timeout;}##Sets debugging output on or off## @param bool $debug#sub setDebug{my $self = shift;my $debug = shift;$self->{debug} = $debug;}## Tests whether this is open## @return bool true if the socket is open#sub isOpen{my $self = shift;if( defined $self->{handle} ){return ($self->{handle}->handles())[0]->connected;}return 0;}## Connects the socket.#sub open{my $self = shift;my $sock = IO::Socket::INET->new(PeerAddr => $self->{host},PeerPort => $self->{port},Proto => 'tcp',Timeout => $self->{sendTimeout}/1000)|| do {my $error = 'TSocket: Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')';if ($self->{debug}) {$self->{debugHandler}->($error);}die new Thrift::TException($error);};$self->{handle} = new IO::Select( $sock );}## Closes the socket.#sub close{my $self = shift;if( defined $self->{handle} ){CORE::close( ($self->{handle}->handles())[0] );}}## Uses stream get contents to do the reading## @param int $len How many bytes# @return string Binary data#sub readAll{my $self = shift;my $len = shift;return unless defined $self->{handle};my $pre = "";while (1) {#check for timeoutmy @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );if(@sockets == 0){die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '.$self->{host}.':'.$self->{port});}my $sock = $sockets[0];my ($buf,$sz);$sock->recv($buf, $len);if (!defined $buf || $buf eq '') {die new Thrift::TException('TSocket: Could not read '.$len.' bytes from '.$self->{host}.':'.$self->{port});} elsif (($sz = length($buf)) < $len) {$pre .= $buf;$len -= $sz;} else {return $pre.$buf;}}}## Read from the socket## @param int $len How many bytes# @return string Binary data#sub read{my $self = shift;my $len = shift;return unless defined $self->{handle};#check for timeoutmy @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );if(@sockets == 0){die new Thrift::TException('TSocket: timed out reading '.$len.' bytes from '.$self->{host}.':'.$self->{port});}my $sock = $sockets[0];my ($buf,$sz);$sock->recv($buf, $len);if (!defined $buf || $buf eq '') {die new TException('TSocket: Could not read '.$len.' bytes from '.$self->{host}.':'.$self->{port});}return $buf;}## Write to the socket.## @param string $buf The data to write#sub write{my $self = shift;my $buf = shift;return unless defined $self->{handle};while (length($buf) > 0) {#check for timeoutmy @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 );if(@sockets == 0){die new Thrift::TException('TSocket: timed out writing to bytes from '.$self->{host}.':'.$self->{port});}my $sock = $sockets[0];my $got = $sock->send($buf);if (!defined $got || $got == 0 ) {die new Thrift::TException('TSocket: Could not write '.length($buf).' bytes '.$self->{host}.':'.$self->{host});}$buf = substr($buf, $got);}}## Flush output to the socket.#sub flush{my $self = shift;return unless defined $self->{handle};my $ret = ($self->{handle}->handles())[0]->flush;}## Build a ServerSocket from the ServerTransport base class#package Thrift::ServerSocket;use base qw( Thrift::Socket Thrift::ServerTransport );use constant LISTEN_QUEUE_SIZE => 128;sub new{my $classname = shift;my $port = shift;my $self = $classname->SUPER::new(undef, $port, undef);return bless($self,$classname);}sub listen{my $self = shift;# Listen to a new socketmy $sock = IO::Socket::INET->new(LocalAddr => undef, # any addrLocalPort => $self->{port},Proto => 'tcp',Listen => LISTEN_QUEUE_SIZE,ReuseAddr => 1)|| do {my $error = 'TServerSocket: Could not bind to ' .$self->{host} . ':' . $self->{port} . ' (' . $! . ')';if ($self->{debug}) {$self->{debugHandler}->($error);}die new Thrift::TException($error);};$self->{handle} = $sock;}sub accept{my $self = shift;if ( exists $self->{handle} and defined $self->{handle} ){my $client = $self->{handle}->accept();my $result = new Thrift::Socket;$result->{handle} = new IO::Select($client);return $result;}return 0;}1;