Subversion Repositories SmartDukaan

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/**
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 *
19
 * Contains some contributions under the Thrift Software License.
20
 * Please see doc/old-thrift-license.txt in the Thrift distribution for
21
 * details.
22
 */
23
 
24
using System;
25
using System.Threading;
26
using Thrift.Protocol;
27
using Thrift.Transport;
28
 
29
namespace Thrift.Server
30
{
31
	/// <summary>
32
	/// Server that uses C# built-in ThreadPool to spawn threads when handling requests
33
	/// </summary>
34
	public class TThreadPoolServer : TServer
35
	{
36
		private const int DEFAULT_MIN_THREADS = 10;
37
		private const int DEFAULT_MAX_THREADS = 100;
38
		private volatile bool stop = false;
39
 
40
		public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport)
41
			:this(processor, serverTransport,
42
				 new TTransportFactory(), new TTransportFactory(),
43
				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
44
				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
45
		{
46
		}
47
 
48
		public TThreadPoolServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
49
			: this(processor, serverTransport,
50
				 new TTransportFactory(), new TTransportFactory(),
51
				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
52
				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, logDelegate)
53
		{
54
		}
55
 
56
 
57
		public TThreadPoolServer(TProcessor processor,
58
								 TServerTransport serverTransport,
59
								 TTransportFactory transportFactory,
60
								 TProtocolFactory protocolFactory)
61
			:this(processor, serverTransport,
62
				 transportFactory, transportFactory,
63
				 protocolFactory, protocolFactory,
64
				 DEFAULT_MIN_THREADS, DEFAULT_MAX_THREADS, DefaultLogDelegate)
65
		{
66
		}
67
 
68
		public TThreadPoolServer(TProcessor processor,
69
								 TServerTransport serverTransport,
70
								 TTransportFactory inputTransportFactory,
71
								 TTransportFactory outputTransportFactory,
72
								 TProtocolFactory inputProtocolFactory,
73
								 TProtocolFactory outputProtocolFactory,
74
								 int minThreadPoolThreads, int maxThreadPoolThreads, LogDelegate logDel)
75
			:base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
76
				  inputProtocolFactory, outputProtocolFactory, logDel)
77
		{
78
			if (!ThreadPool.SetMinThreads(minThreadPoolThreads, minThreadPoolThreads))
79
			{
80
				throw new Exception("Error: could not SetMinThreads in ThreadPool");
81
			}
82
			if (!ThreadPool.SetMaxThreads(maxThreadPoolThreads, maxThreadPoolThreads))
83
			{
84
				throw new Exception("Error: could not SetMaxThreads in ThreadPool");
85
			}
86
		}
87
 
88
		/// <summary>
89
		/// Use new ThreadPool thread for each new client connection
90
		/// </summary>
91
		public override void Serve()
92
		{
93
			try
94
			{
95
				serverTransport.Listen();
96
			}
97
			catch (TTransportException ttx)
98
			{
99
				logDelegate("Error, could not listen on ServerTransport: " + ttx);
100
				return;
101
			}
102
 
103
			while (!stop)
104
			{
105
				int failureCount = 0;
106
				try
107
				{
108
					TTransport client = serverTransport.Accept();
109
					ThreadPool.QueueUserWorkItem(this.Execute, client);
110
				}
111
				catch (TTransportException ttx)
112
				{
113
					if (stop)
114
					{
115
						logDelegate("TThreadPoolServer was shutting down, caught " + ttx.GetType().Name);
116
					}
117
					else
118
					{
119
						++failureCount;
120
						logDelegate(ttx.ToString());
121
					}
122
 
123
				}
124
			}
125
 
126
			if (stop)
127
			{
128
				try
129
				{
130
					serverTransport.Close();
131
				}
132
				catch (TTransportException ttx)
133
				{
134
					logDelegate("TServerTransport failed on close: " + ttx.Message);
135
				}
136
				stop = false;
137
			}
138
		}
139
 
140
		/// <summary>
141
		/// Loops on processing a client forever
142
		/// threadContext will be a TTransport instance
143
		/// </summary>
144
		/// <param name="threadContext"></param>
145
		private void Execute(Object threadContext)
146
		{
147
			TTransport client = (TTransport)threadContext;
148
			TTransport inputTransport = null;
149
			TTransport outputTransport = null;
150
			TProtocol inputProtocol = null;
151
			TProtocol outputProtocol = null;
152
			try
153
			{
154
				inputTransport = inputTransportFactory.GetTransport(client);
155
				outputTransport = outputTransportFactory.GetTransport(client);
156
				inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
157
				outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
158
				while (processor.Process(inputProtocol, outputProtocol))
159
				{
160
					//keep processing requests until client disconnects
161
				}
162
			}
163
			catch (TTransportException)
164
			{
165
				// Assume the client died and continue silently
166
				//Console.WriteLine(ttx);
167
			}
168
 
169
			catch (Exception x)
170
			{
171
				logDelegate("Error: " + x);
172
			}
173
 
174
			if (inputTransport != null)
175
			{
176
				inputTransport.Close();
177
			}
178
			if (outputTransport != null)
179
			{
180
				outputTransport.Close();
181
			}
182
		}
183
 
184
		public override void Stop()
185
		{
186
			stop = true;
187
			serverTransport.Close();
188
		}
189
	}
190
}