Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
# encoding: ascii-8bit## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#require 'socket'module Thriftclass Socket < BaseTransportdef initialize(host='localhost', port=9090, timeout=nil)@host = host@port = port@timeout = timeout@desc = "#{host}:#{port}"@handle = nilendattr_accessor :handle, :timeoutdef openbeginaddrinfo = ::Socket::getaddrinfo(@host, @port).first@handle = ::Socket.new(addrinfo[4], ::Socket::SOCK_STREAM, 0)sockaddr = ::Socket.sockaddr_in(addrinfo[1], addrinfo[3])begin@handle.connect_nonblock(sockaddr)rescue Errno::EINPROGRESSunless IO.select(nil, [ @handle ], nil, @timeout)raise TransportException.new(TransportException::NOT_OPEN, "Connection timeout to #{@desc}")endbegin@handle.connect_nonblock(sockaddr)rescue Errno::EISCONNendend@handlerescue StandardError => eraise TransportException.new(TransportException::NOT_OPEN, "Could not connect to #{@desc}: #{e}")endenddef open?!@handle.nil? and !@handle.closed?enddef write(str)raise IOError, "closed stream" unless open?beginif @timeout.nil? or @timeout == 0@handle.write(str)elselen = 0start = Time.nowwhile Time.now - start < @timeoutrd, wr, = IO.select(nil, [@handle], nil, @timeout)if wr and not wr.empty?len += @handle.write_nonblock(str[len..-1])break if len >= str.lengthendendif len < str.lengthraise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out writing #{str.length} bytes to #{@desc}")elselenendendrescue TransportException => e# pass this onraise erescue StandardError => e@handle.close@handle = nilraise TransportException.new(TransportException::NOT_OPEN, e.message)endenddef read(sz)raise IOError, "closed stream" unless open?beginif @timeout.nil? or @timeout == 0data = @handle.readpartial(sz)else# it's possible to interrupt select for something other than the timeout# so we need to ensure we've waited long enoughstart = Time.nowrd = nil # scopingloop dord, = IO.select([@handle], nil, nil, @timeout)break if (rd and not rd.empty?) or Time.now - start >= @timeoutendif rd.nil? or rd.empty?raise TransportException.new(TransportException::TIMED_OUT, "Socket: Timed out reading #{sz} bytes from #{@desc}")elsedata = @handle.readpartial(sz)endendrescue TransportException => e# don't let this get caught by the StandardError handlerraise erescue StandardError => e@handle.close unless @handle.closed?@handle = nilraise TransportException.new(TransportException::NOT_OPEN, e.message)endif (data.nil? or data.length == 0)raise TransportException.new(TransportException::UNKNOWN, "Socket: Could not read #{sz} bytes from #{@desc}")enddataenddef close@handle.close unless @handle.nil? or @handle.closed?@handle = nilenddef to_io@handleendendend