Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
#
2
# Licensed to the Apache Software Foundation (ASF) under one
3
# or more contributor license agreements. See the NOTICE file
4
# distributed with this work for additional information
5
# regarding copyright ownership. The ASF licenses this file
6
# to you under the Apache License, Version 2.0 (the
7
# "License"); you may not use this file except in compliance
8
# with the License. You may obtain a copy of the License at
9
#
10
#   http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing,
13
# software distributed under the License is distributed on an
14
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
# KIND, either express or implied. See the License for the
16
# specific language governing permissions and limitations
17
# under the License.
18
#
19
 
20
require 5.6.0;
21
use strict;
22
use warnings;
23
 
24
use Thrift;
25
use Thrift::BufferedTransport;
26
use Thrift::BinaryProtocol;
27
 
28
#
29
# Server base class module
30
#
31
package Thrift::Server;
32
 
33
# 3 possible constructors:
34
#   1.  (processor, serverTransport)
35
#   2.  (processor, serverTransport, transportFactory, protocolFactory)
36
#   3.  (processor, serverTransport,
37
#        inputTransportFactory, outputTransportFactory,
38
#        inputProtocolFactory, outputProtocolFactory)
39
sub new
40
{
41
    my $classname    = shift;
42
    my @args         = @_;
43
 
44
    my $self;
45
 
46
    if (scalar @args == 2)
47
    {
48
      $self = _init($args[0], $args[1],
49
                    Thrift::BufferedTransportFactory->new(),
50
                    Thrift::BufferedTransportFactory->new(),
51
                    Thrift::BinaryProtocolFactory->new(),
52
                    Thrift::BinaryProtocolFactory->new());
53
    }
54
    elsif (scalar @args == 4)
55
    {
56
      $self = _init($args[0], $args[1], $args[2], $args[2], $args[3], $args[3]);
57
    }
58
    elsif (scalar @args == 6)
59
    {
60
      $self = _init($args[0], $args[1], $args[2], $args[3], $args[4], $args[5]);
61
    }
62
    else
63
    {
64
      die "Thrift::Server expects exactly 2, 4, or 6 args";
65
    }
66
 
67
    return bless($self,$classname);
68
}
69
 
70
sub _init
71
{
72
    my $processor              = shift;
73
    my $serverTransport        = shift;
74
    my $inputTransportFactory  = shift;
75
    my $outputTransportFactory = shift;
76
    my $inputProtocolFactory   = shift;
77
    my $outputProtocolFactory  = shift;
78
 
79
    my $self = {
80
        processor              => $processor,
81
        serverTransport        => $serverTransport,
82
        inputTransportFactory  => $inputTransportFactory,
83
        outputTransportFactory => $outputTransportFactory,
84
        inputProtocolFactory   => $inputProtocolFactory,
85
        outputProtocolFactory  => $outputProtocolFactory,
86
    };
87
}
88
 
89
sub serve
90
{
91
    die "abstract";
92
}
93
 
94
sub _clientBegin
95
{
96
    my $self  = shift;
97
    my $iprot = shift;
98
    my $oprot = shift;
99
 
100
    if (exists  $self->{serverEventHandler} and
101
        defined $self->{serverEventHandler})
102
    {
103
        $self->{serverEventHandler}->clientBegin($iprot, $oprot);
104
    }
105
}
106
 
107
sub _handleException
108
{
109
    my $self = shift;
110
    my $e    = shift;
111
 
112
    if ($e =~ m/TException/ and exists $e->{message}) {
113
        my $message = $e->{message};
114
        my $code    = $e->{code};
115
        my $out     = $code . ':' . $message;
116
 
117
        $message =~ m/TTransportException/ and die $out;
118
        if ($message =~ m/TSocket/) {
119
            # suppress TSocket messages
120
        } else {
121
            warn $out;
122
        }
123
    } else {
124
        warn $e;
125
    }
126
}
127
 
128
 
129
#
130
# SimpleServer from the Server base class that handles one connection at a time
131
#
132
package Thrift::SimpleServer;
133
use base qw( Thrift::Server );
134
 
135
sub new
136
{
137
    my $classname = shift;
138
    my @args      = @_;
139
 
140
    my $self      = $classname->SUPER::new(@args);
141
    return bless($self,$classname);
142
}
143
 
144
sub serve
145
{
146
    my $self = shift;
147
 
148
    $self->{serverTransport}->listen();
149
    while (1)
150
    {
151
        my $client = $self->{serverTransport}->accept();
152
        my $itrans = $self->{inputTransportFactory}->getTransport($client);
153
        my $otrans = $self->{outputTransportFactory}->getTransport($client);
154
        my $iprot  = $self->{inputProtocolFactory}->getProtocol($itrans);
155
        my $oprot  = $self->{outputProtocolFactory}->getProtocol($otrans);
156
        eval {
157
            $self->_clientBegin($iprot, $oprot);
158
            while (1)
159
            {
160
                $self->{processor}->process($iprot, $oprot);
161
            }
162
        }; if($@) {
163
            $self->_handleException($@);
164
        }
165
 
166
        $itrans->close();
167
        $otrans->close();
168
    }
169
}
170
 
171
 
172
#
173
# ForkingServer that forks a new process for each request
174
#
175
package Thrift::ForkingServer;
176
use base qw( Thrift::Server );
177
 
178
use POSIX ":sys_wait_h";
179
 
180
sub new
181
{
182
    my $classname = shift;
183
    my @args      = @_;
184
 
185
    my $self      = $classname->SUPER::new(@args);
186
    return bless($self,$classname);
187
}
188
 
189
 
190
sub serve
191
{
192
    my $self = shift;
193
 
194
    $self->{serverTransport}->listen();
195
    while (1)
196
    {
197
        my $client = $self->{serverTransport}->accept();
198
        $self->_client($client);
199
    }
200
}
201
 
202
sub _client
203
{
204
    my $self   = shift;
205
    my $client = shift;
206
 
207
    eval {
208
        my $itrans = $self->{inputTransportFactory}->getTransport($client);
209
        my $otrans = $self->{outputTransportFactory}->getTransport($client);
210
 
211
        my $iprot  = $self->{inputProtocolFactory}->getProtocol($itrans);
212
        my $oprot  = $self->{outputProtocolFactory}->getProtocol($otrans);
213
 
214
        $self->_clientBegin($iprot, $oprot);
215
 
216
        my $pid = fork();
217
 
218
        if ($pid) #parent
219
        {
220
            $self->_parent($pid, $itrans, $otrans);
221
        } else {
222
            $self->_child($itrans, $otrans, $iprot, $oprot);
223
        }
224
    }; if($@) {
225
        $self->_handleException($@);
226
    }
227
}
228
 
229
sub _parent
230
{
231
    my $self   = shift;
232
    my $pid    = shift;
233
    my $itrans = shift;
234
    my $otrans = shift;
235
 
236
    # add before collect, otherwise you race w/ waitpid
237
    $self->{children}->{$pid} = 1;
238
    $self->_collectChildren();
239
 
240
    # Parent must close socket or the connection may not get closed promptly
241
    $self->tryClose($itrans);
242
    $self->tryClose($otrans);
243
}
244
 
245
sub _child
246
{
247
    my $self   = shift;
248
    my $itrans = shift;
249
    my $otrans = shift;
250
    my $iprot  = shift;
251
    my $oprot  = shift;
252
 
253
    my $ecode = 0;
254
    eval {
255
        while (1)
256
        {
257
            $self->{processor}->process($iprot, $oprot);
258
        }
259
    }; if($@) {
260
        $ecode = 1;
261
        $self->_handleException($@);
262
    }
263
 
264
    $self->tryClose($itrans);
265
    $self->tryClose($otrans);
266
 
267
    exit($ecode);
268
}
269
 
270
sub tryClose
271
{
272
    my $self = shift;
273
    my $file = shift;
274
 
275
    eval {
276
        if (defined $file)
277
        {
278
          $file->close();
279
        }
280
    }; if($@) {
281
        if ($@ =~ m/TException/ and exists $@->{message}) {
282
            my $message = $@->{message};
283
            my $code    = $@->{code};
284
            my $out     = $code . ':' . $message;
285
 
286
            warn $out;
287
        } else {
288
            warn $@;
289
        }
290
    }
291
}
292
 
293
sub _collectChildren
294
{
295
    my $self = shift;
296
 
297
    while (scalar keys %{$self->{children}})
298
    {
299
        my $pid    = waitpid(-1, WNOHANG);
300
 
301
        if ($pid>0)
302
        {
303
            delete $self->{children}->{$pid};
304
        }
305
        else
306
        {
307
            last;
308
        }
309
    }
310
}
311
 
312
 
313
1;