Rev 30 | Blame | Compare with Previous | Last modification | View Log | RSS feed
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#from cStringIO import StringIOfrom struct import pack,unpackfrom thrift.Thrift import TExceptionclass TTransportException(TException):"""Custom Transport Exception class"""UNKNOWN = 0NOT_OPEN = 1ALREADY_OPEN = 2TIMED_OUT = 3END_OF_FILE = 4def __init__(self, type=UNKNOWN, message=None):TException.__init__(self, message)self.type = typeclass TTransportBase:"""Base class for Thrift transport layer."""def isOpen(self):passdef open(self):passdef close(self):passdef read(self, sz):passdef readAll(self, sz):buff = ''have = 0while (have < sz):chunk = self.read(sz-have)have += len(chunk)buff += chunkif len(chunk) == 0:raise EOFError()return buffdef write(self, buf):passdef flush(self):pass# This class should be thought of as an interface.class CReadableTransport:"""base class for transports that are readable from C"""# TODO(dreiss): Think about changing this interface to allow us to use# a (Python, not c) StringIO instead, because it allows# you to write after reading.# NOTE: This is a classic class, so properties will NOT work# correctly for setting.@propertydef cstringio_buf(self):"""A cStringIO buffer that contains the current chunk we are reading."""passdef cstringio_refill(self, partialread, reqlen):"""Refills cstringio_buf.Returns the currently used buffer (which can but need not be the same asthe old cstringio_buf). partialread is what the C code has read from thebuffer, and should be inserted into the buffer before any more reads. Thereturn value must be a new, not borrowed reference. Something along thelines of self._buf should be fine.If reqlen bytes can't be read, throw EOFError."""passclass TServerTransportBase:"""Base class for Thrift server transports."""def listen(self):passdef accept(self):passdef close(self):passclass TTransportFactoryBase:"""Base class for a Transport Factory"""def getTransport(self, trans):return transclass TBufferedTransportFactory:"""Factory transport that builds buffered transports"""def getTransport(self, trans):buffered = TBufferedTransport(trans)return bufferedclass TBufferedTransport(TTransportBase,CReadableTransport):"""Class that wraps another transport and buffers its I/O."""DEFAULT_BUFFER = 4096def __init__(self, trans):self.__trans = transself.__wbuf = StringIO()self.__rbuf = StringIO("")def isOpen(self):return self.__trans.isOpen()def open(self):return self.__trans.open()def close(self):return self.__trans.close()def read(self, sz):ret = self.__rbuf.read(sz)if len(ret) != 0:return retself.__rbuf = StringIO(self.__trans.read(max(sz, self.DEFAULT_BUFFER)))return self.__rbuf.read(sz)def write(self, buf):self.__wbuf.write(buf)def flush(self):out = self.__wbuf.getvalue()# reset wbuf before write/flush to preserve state on underlying failureself.__wbuf = StringIO()self.__trans.write(out)self.__trans.flush()# Implement the CReadableTransport interface.@propertydef cstringio_buf(self):return self.__rbufdef cstringio_refill(self, partialread, reqlen):retstring = partialreadif reqlen < self.DEFAULT_BUFFER:# try to make a read of as much as we can.retstring += self.__trans.read(self.DEFAULT_BUFFER)# but make sure we do read reqlen bytes.if len(retstring) < reqlen:retstring += self.__trans.readAll(reqlen - len(retstring))self.__rbuf = StringIO(retstring)return self.__rbufclass TMemoryBuffer(TTransportBase, CReadableTransport):"""Wraps a cStringIO object as a TTransport.NOTE: Unlike the C++ version of this class, you cannot write to itthen immediately read from it. If you want to read from aTMemoryBuffer, you must either pass a string to the constructor.TODO(dreiss): Make this work like the C++ version."""def __init__(self, value=None):"""value -- a value to read from for stringioIf value is set, this will be a transport for reading,otherwise, it is for writing"""if value is not None:self._buffer = StringIO(value)else:self._buffer = StringIO()def isOpen(self):return not self._buffer.closeddef open(self):passdef close(self):self._buffer.close()def read(self, sz):return self._buffer.read(sz)def write(self, buf):self._buffer.write(buf)def flush(self):passdef getvalue(self):return self._buffer.getvalue()# Implement the CReadableTransport interface.@propertydef cstringio_buf(self):return self._bufferdef cstringio_refill(self, partialread, reqlen):# only one shot at reading...raise EOFError()class TFramedTransportFactory:"""Factory transport that builds framed transports"""def getTransport(self, trans):framed = TFramedTransport(trans)return framedclass TFramedTransport(TTransportBase, CReadableTransport):"""Class that wraps another transport and frames its I/O when writing."""def __init__(self, trans,):self.__trans = transself.__rbuf = StringIO()self.__wbuf = StringIO()def isOpen(self):return self.__trans.isOpen()def open(self):return self.__trans.open()def close(self):return self.__trans.close()def read(self, sz):ret = self.__rbuf.read(sz)if len(ret) != 0:return retself.readFrame()return self.__rbuf.read(sz)def readFrame(self):buff = self.__trans.readAll(4)sz, = unpack('!i', buff)self.__rbuf = StringIO(self.__trans.readAll(sz))def write(self, buf):self.__wbuf.write(buf)def flush(self):wout = self.__wbuf.getvalue()wsz = len(wout)# reset wbuf before write/flush to preserve state on underlying failureself.__wbuf = StringIO()# N.B.: Doing this string concatenation is WAY cheaper than making# two separate calls to the underlying socket object. Socket writes in# Python turn out to be REALLY expensive, but it seems to do a pretty# good job of managing string buffer operations without excessive copiesbuf = pack("!i", wsz) + woutself.__trans.write(buf)self.__trans.flush()# Implement the CReadableTransport interface.@propertydef cstringio_buf(self):return self.__rbufdef cstringio_refill(self, prefix, reqlen):# self.__rbuf will already be empty here because fastbinary doesn't# ask for a refill until the previous buffer is empty. Therefore,# we can start reading new frames immediately.while len(prefix) < reqlen:self.readFrame()prefix += self.__rbuf.getvalue()self.__rbuf = StringIO(prefix)return self.__rbufclass TFileObjectTransport(TTransportBase):"""Wraps a file-like object to make it work as a Thrift transport."""def __init__(self, fileobj):self.fileobj = fileobjdef isOpen(self):return Truedef close(self):self.fileobj.close()def read(self, sz):return self.fileobj.read(sz)def write(self, buf):self.fileobj.write(buf)def flush(self):self.fileobj.flush()