Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
#
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
#
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
#
19
"""Implementation of non-blocking server.
20
 
21
The main idea of the server is reciving and sending requests
22
only from main thread.
23
 
24
It also makes thread pool server in tasks terms, not connections.
25
"""
26
import threading
27
import socket
28
import Queue
29
import select
30
import struct
31
import logging
32
 
33
from thrift.transport import TTransport
34
from thrift.protocol.TBinaryProtocol import TBinaryProtocolFactory
35
 
36
__all__ = ['TNonblockingServer']
37
 
38
class Worker(threading.Thread):
39
    """Worker is a small helper to process incoming connection."""
40
    def __init__(self, queue):
41
        threading.Thread.__init__(self)
42
        self.queue = queue
43
 
44
    def run(self):
45
        """Process queries from task queue, stop if processor is None."""
46
        while True:
47
            try:
48
                processor, iprot, oprot, otrans, callback = self.queue.get()
49
                if processor is None:
50
                    break
51
                processor.process(iprot, oprot)
52
                callback(True, otrans.getvalue())
53
            except Exception:
54
                logging.exception("Exception while processing request")
55
                callback(False, '')
56
 
57
WAIT_LEN = 0
58
WAIT_MESSAGE = 1
59
WAIT_PROCESS = 2
60
SEND_ANSWER = 3
61
CLOSED = 4
62
 
63
def locked(func):
64
    "Decorator which locks self.lock."
65
    def nested(self, *args, **kwargs):
66
        self.lock.acquire()
67
        try:
68
            return func(self, *args, **kwargs)
69
        finally:
70
            self.lock.release()
71
    return nested
72
 
73
def socket_exception(func):
74
    "Decorator close object on socket.error."
75
    def read(self, *args, **kwargs):
76
        try:
77
            return func(self, *args, **kwargs)
78
        except socket.error:
79
            self.close()
80
    return read
81
 
82
class Connection:
83
    """Basic class is represented connection.
84
 
85
    It can be in state:
86
        WAIT_LEN --- connection is reading request len.
87
        WAIT_MESSAGE --- connection is reading request.
88
        WAIT_PROCESS --- connection has just read whole request and 
89
            waits for call ready routine.
90
        SEND_ANSWER --- connection is sending answer string (including length
91
            of answer).
92
        CLOSED --- socket was closed and connection should be deleted.
93
    """
94
    def __init__(self, new_socket, wake_up):
95
        self.socket = new_socket
96
        self.socket.setblocking(False)
97
        self.status = WAIT_LEN
98
        self.len = 0
99
        self.message = ''
100
        self.lock = threading.Lock()
101
        self.wake_up = wake_up
102
 
103
    def _read_len(self):
104
        """Reads length of request.
105
 
106
        It's really paranoic routine and it may be replaced by 
107
        self.socket.recv(4)."""
108
        read = self.socket.recv(4 - len(self.message))
109
        if len(read) == 0:
110
            # if we read 0 bytes and self.message is empty, it means client close 
111
            # connection
112
            if len(self.message) != 0:
113
                logging.error("can't read frame size from socket")
114
            self.close()
115
            return
116
        self.message += read
117
        if len(self.message) == 4:
118
            self.len, = struct.unpack('!i', self.message)
119
            if self.len < 0:
120
                logging.error("negative frame size, it seems client"\
121
                    " doesn't use FramedTransport")
122
                self.close()
123
            elif self.len == 0:
124
                logging.error("empty frame, it's really strange")
125
                self.close()
126
            else:
127
                self.message = ''
128
                self.status = WAIT_MESSAGE
129
 
130
    @socket_exception
131
    def read(self):
132
        """Reads data from stream and switch state."""
133
        assert self.status in (WAIT_LEN, WAIT_MESSAGE)
134
        if self.status == WAIT_LEN:
135
            self._read_len()
136
            # go back to the main loop here for simplicity instead of
137
            # falling through, even though there is a good chance that
138
            # the message is already available
139
        elif self.status == WAIT_MESSAGE:
140
            read = self.socket.recv(self.len - len(self.message))
141
            if len(read) == 0:
142
                logging.error("can't read frame from socket (get %d of %d bytes)" %
143
                    (len(self.message), self.len))
144
                self.close()
145
                return
146
            self.message += read
147
            if len(self.message) == self.len:
148
                self.status = WAIT_PROCESS
149
 
150
    @socket_exception
151
    def write(self):
152
        """Writes data from socket and switch state."""
153
        assert self.status == SEND_ANSWER
154
        sent = self.socket.send(self.message)
155
        if sent == len(self.message):
156
            self.status = WAIT_LEN
157
            self.message = ''
158
            self.len = 0
159
        else:
160
            self.message = self.message[sent:]
161
 
162
    @locked
163
    def ready(self, all_ok, message):
164
        """Callback function for switching state and waking up main thread.
165
 
166
        This function is the only function witch can be called asynchronous.
167
 
168
        The ready can switch Connection to three states:
169
            WAIT_LEN if request was oneway.
170
            SEND_ANSWER if request was processed in normal way.
171
            CLOSED if request throws unexpected exception.
172
 
173
        The one wakes up main thread.
174
        """
175
        assert self.status == WAIT_PROCESS
176
        if not all_ok:
177
            self.close()
178
            self.wake_up()
179
            return
180
        self.len = ''
181
        if len(message) == 0:
182
            # it was a oneway request, do not write answer
183
            self.message = ''
184
            self.status = WAIT_LEN
185
        else:
186
            self.message = struct.pack('!i', len(message)) + message
187
            self.status = SEND_ANSWER
188
        self.wake_up()
189
 
190
    @locked
191
    def is_writeable(self):
192
        "Returns True if connection should be added to write list of select."
193
        return self.status == SEND_ANSWER
194
 
195
    # it's not necessary, but...
196
    @locked
197
    def is_readable(self):
198
        "Returns True if connection should be added to read list of select."
199
        return self.status in (WAIT_LEN, WAIT_MESSAGE)
200
 
201
    @locked
202
    def is_closed(self):
203
        "Returns True if connection is closed."
204
        return self.status == CLOSED
205
 
206
    def fileno(self):
207
        "Returns the file descriptor of the associated socket."
208
        return self.socket.fileno()
209
 
210
    def close(self):
211
        "Closes connection"
212
        self.status = CLOSED
213
        self.socket.close()
214
 
215
class TNonblockingServer:
216
    """Non-blocking server."""
217
    def __init__(self, processor, lsocket, inputProtocolFactory=None, 
218
            outputProtocolFactory=None, threads=10):
219
        self.processor = processor
220
        self.socket = lsocket
221
        self.in_protocol = inputProtocolFactory or TBinaryProtocolFactory()
222
        self.out_protocol = outputProtocolFactory or self.in_protocol
223
        self.threads = int(threads)
224
        self.clients = {}
225
        self.tasks = Queue.Queue()
226
        self._read, self._write = socket.socketpair()
227
        self.prepared = False
228
 
229
    def setNumThreads(self, num):
230
        """Set the number of worker threads that should be created."""
231
        # implement ThreadPool interface
232
        assert not self.prepared, "You can't change number of threads for working server"
233
        self.threads = num
234
 
235
    def prepare(self):
236
        """Prepares server for serve requests."""
237
        self.socket.listen()
238
        for _ in xrange(self.threads):
239
            thread = Worker(self.tasks)
240
            thread.setDaemon(True)
241
            thread.start()
242
        self.prepared = True
243
 
244
    def wake_up(self):
245
        """Wake up main thread.
246
 
247
        The server usualy waits in select call in we should terminate one.
248
        The simplest way is using socketpair.
249
 
250
        Select always wait to read from the first socket of socketpair.
251
 
252
        In this case, we can just write anything to the second socket from
253
        socketpair."""
254
        self._write.send('1')
255
 
256
    def _select(self):
257
        """Does select on open connections."""
258
        readable = [self.socket.handle.fileno(), self._read.fileno()]
259
        writable = []
260
        for i, connection in self.clients.items():
261
            if connection.is_readable():
262
                readable.append(connection.fileno())
263
            if connection.is_writeable():
264
                writable.append(connection.fileno())
265
            if connection.is_closed():
266
                del self.clients[i]
267
        return select.select(readable, writable, readable)
268
 
269
    def handle(self):
270
        """Handle requests.
271
 
272
        WARNING! You must call prepare BEFORE calling handle.
273
        """
274
        assert self.prepared, "You have to call prepare before handle"
275
        rset, wset, xset = self._select()
276
        for readable in rset:
277
            if readable == self._read.fileno():
278
                # don't care i just need to clean readable flag
279
                self._read.recv(1024) 
280
            elif readable == self.socket.handle.fileno():
281
                client = self.socket.accept().handle
282
                self.clients[client.fileno()] = Connection(client, self.wake_up)
283
            else:
284
                connection = self.clients[readable]
285
                connection.read()
286
                if connection.status == WAIT_PROCESS:
287
                    itransport = TTransport.TMemoryBuffer(connection.message)
288
                    otransport = TTransport.TMemoryBuffer()
289
                    iprot = self.in_protocol.getProtocol(itransport)
290
                    oprot = self.out_protocol.getProtocol(otransport)
291
                    self.tasks.put([self.processor, iprot, oprot, 
292
                                    otransport, connection.ready])
293
        for writeable in wset:
294
            self.clients[writeable].write()
295
        for oob in xset:
296
            self.clients[oob].close()
297
            del self.clients[oob]
298
 
299
    def close(self):
300
        """Closes the server."""
301
        for _ in xrange(self.threads):
302
            self.tasks.put([None, None, None, None, None])
303
        self.socket.close()
304
        self.prepared = False
305
 
306
    def serve(self):
307
        """Serve forever."""
308
        self.prepare()
309
        while True:
310
            self.handle()