Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/**
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
using System;
21
using System.Collections.Generic;
22
using System.Threading;
23
using Thrift.Collections;
24
using Thrift.Protocol;
25
using Thrift.Transport;
26
 
27
namespace Thrift.Server
28
{
29
	/// <summary>
30
	/// Server that uses C# threads (as opposed to the ThreadPool) when handling requests
31
	/// </summary>
32
	public class TThreadedServer : TServer
33
	{
34
		private const int DEFAULT_MAX_THREADS = 100;
35
		private volatile bool stop = false;
36
		private readonly int maxThreads;
37
 
38
		private Queue<TTransport> clientQueue;
39
		private THashSet<Thread> clientThreads;
40
		private object clientLock;
41
		private Thread workerThread;
42
 
43
		public TThreadedServer(TProcessor processor, TServerTransport serverTransport)
44
			: this(processor, serverTransport,
45
				 new TTransportFactory(), new TTransportFactory(),
46
				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
47
				 DEFAULT_MAX_THREADS, DefaultLogDelegate)
48
		{
49
		}
50
 
51
		public TThreadedServer(TProcessor processor, TServerTransport serverTransport, LogDelegate logDelegate)
52
			: this(processor, serverTransport,
53
				 new TTransportFactory(), new TTransportFactory(),
54
				 new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory(),
55
				 DEFAULT_MAX_THREADS, logDelegate)
56
		{
57
		}
58
 
59
 
60
		public TThreadedServer(TProcessor processor,
61
								 TServerTransport serverTransport,
62
								 TTransportFactory transportFactory,
63
								 TProtocolFactory protocolFactory)
64
			: this(processor, serverTransport,
65
				 transportFactory, transportFactory,
66
				 protocolFactory, protocolFactory,
67
				 DEFAULT_MAX_THREADS, DefaultLogDelegate)
68
		{
69
		}
70
 
71
		public TThreadedServer(TProcessor processor,
72
								 TServerTransport serverTransport,
73
								 TTransportFactory inputTransportFactory,
74
								 TTransportFactory outputTransportFactory,
75
								 TProtocolFactory inputProtocolFactory,
76
								 TProtocolFactory outputProtocolFactory,
77
								 int maxThreads, LogDelegate logDel)
78
			: base(processor, serverTransport, inputTransportFactory, outputTransportFactory,
79
				  inputProtocolFactory, outputProtocolFactory, logDel)
80
		{
81
			this.maxThreads = maxThreads;
82
			clientQueue = new Queue<TTransport>();
83
			clientLock = new object();
84
			clientThreads = new THashSet<Thread>();
85
		}
86
 
87
		/// <summary>
88
		/// Use new Thread for each new client connection. block until numConnections < maxTHreads
89
		/// </summary>
90
		public override void Serve()
91
		{
92
			try
93
			{
94
				//start worker thread
95
				workerThread = new Thread(new ThreadStart(Execute));
96
				workerThread.Start();
97
				serverTransport.Listen();
98
			}
99
			catch (TTransportException ttx)
100
			{
101
				logDelegate("Error, could not listen on ServerTransport: " + ttx);
102
				return;
103
			}
104
 
105
			while (!stop)
106
			{
107
				int failureCount = 0;
108
				try
109
				{
110
					TTransport client = serverTransport.Accept();
111
					lock (clientLock)
112
					{
113
						clientQueue.Enqueue(client);
114
						Monitor.Pulse(clientLock);
115
					}
116
				}
117
				catch (TTransportException ttx)
118
				{
119
					if (stop)
120
					{
121
						logDelegate("TThreadPoolServer was shutting down, caught " + ttx);
122
					}
123
					else
124
					{
125
						++failureCount;
126
						logDelegate(ttx.ToString());
127
					}
128
 
129
				}
130
			}
131
 
132
			if (stop)
133
			{
134
				try
135
				{
136
					serverTransport.Close();
137
				}
138
				catch (TTransportException ttx)
139
				{
140
					logDelegate("TServeTransport failed on close: " + ttx.Message);
141
				}
142
				stop = false;
143
			}
144
		}
145
 
146
		/// <summary>
147
		/// Loops on processing a client forever
148
		/// threadContext will be a TTransport instance
149
		/// </summary>
150
		/// <param name="threadContext"></param>
151
		private void Execute()
152
		{
153
			while (!stop)
154
			{
155
				TTransport client;
156
				Thread t;
157
				lock (clientLock)
158
				{
159
					//don't dequeue if too many connections
160
					while (clientThreads.Count >= maxThreads)
161
					{
162
						Monitor.Wait(clientLock);
163
					}
164
 
165
					while (clientQueue.Count == 0)
166
					{
167
						Monitor.Wait(clientLock);
168
					}
169
 
170
					client = clientQueue.Dequeue();
171
					t = new Thread(new ParameterizedThreadStart(ClientWorker));
172
					clientThreads.Add(t);
173
				}
174
				//start processing requests from client on new thread
175
				t.Start(client);
176
			}
177
		}
178
 
179
		private void ClientWorker(Object context)
180
		{
181
			TTransport client = (TTransport)context;
182
			TTransport inputTransport = null;
183
			TTransport outputTransport = null;
184
			TProtocol inputProtocol = null;
185
			TProtocol outputProtocol = null;
186
			try
187
			{
188
				inputTransport = inputTransportFactory.GetTransport(client);
189
				outputTransport = outputTransportFactory.GetTransport(client);
190
				inputProtocol = inputProtocolFactory.GetProtocol(inputTransport);
191
				outputProtocol = outputProtocolFactory.GetProtocol(outputTransport);
192
				while (processor.Process(inputProtocol, outputProtocol))
193
				{
194
					//keep processing requests until client disconnects
195
				}
196
			}
197
			catch (TTransportException)
198
			{
199
			}
200
			catch (Exception x)
201
			{
202
				logDelegate("Error: " + x);
203
			}
204
 
205
			if (inputTransport != null)
206
			{
207
				inputTransport.Close();
208
			}
209
			if (outputTransport != null)
210
			{
211
				outputTransport.Close();
212
			}
213
 
214
			lock (clientLock)
215
			{
216
				clientThreads.Remove(Thread.CurrentThread);
217
				Monitor.Pulse(clientLock);
218
			}
219
			return;
220
		}
221
 
222
		public override void Stop()
223
		{
224
			stop = true;
225
			serverTransport.Close();
226
			//clean up all the threads myself
227
			workerThread.Abort();
228
			foreach (Thread t in clientThreads)
229
			{
230
				t.Abort();
231
			}
232
		}
233
	}
234
}