Subversion Repositories SmartDukaan

Rev

Rev 30 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
30 ashish 1
/*
2
 * Licensed to the Apache Software Foundation (ASF) under one
3
 * or more contributor license agreements. See the NOTICE file
4
 * distributed with this work for additional information
5
 * regarding copyright ownership. The ASF licenses this file
6
 * to you under the Apache License, Version 2.0 (the
7
 * "License"); you may not use this file except in compliance
8
 * with the License. You may obtain a copy of the License at
9
 *
10
 *   http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing,
13
 * software distributed under the License is distributed on an
14
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15
 * KIND, either express or implied. See the License for the
16
 * specific language governing permissions and limitations
17
 * under the License.
18
 */
19
 
20
#include <algorithm>
21
#include <iostream>
22
 
23
#include "TSocketPool.h"
24
 
25
namespace apache { namespace thrift { namespace transport {
26
 
27
using namespace std;
28
 
29
using boost::shared_ptr;
30
 
31
/**
32
 * TSocketPoolServer implementation
33
 *
34
 */
35
TSocketPoolServer::TSocketPoolServer()
36
  : host_(""),
37
    port_(0),
38
    socket_(-1),
39
    lastFailTime_(0),
40
    consecutiveFailures_(0) {}
41
 
42
/**
43
 * Constructor for TSocketPool server
44
 */
45
TSocketPoolServer::TSocketPoolServer(const string &host, int port)
46
  : host_(host),
47
    port_(port),
48
    socket_(-1),
49
    lastFailTime_(0),
50
    consecutiveFailures_(0) {}
51
 
52
/**
53
 * TSocketPool implementation.
54
 *
55
 */
56
 
57
TSocketPool::TSocketPool() : TSocket(),
58
  numRetries_(1),
59
  retryInterval_(60),
60
  maxConsecutiveFailures_(1),
61
  randomize_(true),
62
  alwaysTryLast_(true) {
63
}
64
 
65
TSocketPool::TSocketPool(const vector<string> &hosts,
66
                         const vector<int> &ports) : TSocket(),
67
  numRetries_(1),
68
  retryInterval_(60),
69
  maxConsecutiveFailures_(1),
70
  randomize_(true),
71
  alwaysTryLast_(true)
72
{
73
  if (hosts.size() != ports.size()) {
74
    GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
75
    throw TTransportException(TTransportException::BAD_ARGS);
76
  }
77
 
78
  for (unsigned int i = 0; i < hosts.size(); ++i) {
79
    addServer(hosts[i], ports[i]);
80
  }
81
}
82
 
83
TSocketPool::TSocketPool(const vector<pair<string, int> >& servers) : TSocket(),
84
  numRetries_(1),
85
  retryInterval_(60),
86
  maxConsecutiveFailures_(1),
87
  randomize_(true),
88
  alwaysTryLast_(true)
89
{
90
  for (unsigned i = 0; i < servers.size(); ++i) {
91
    addServer(servers[i].first, servers[i].second);
92
  }
93
}
94
 
95
TSocketPool::TSocketPool(const vector< shared_ptr<TSocketPoolServer> >& servers) : TSocket(),
96
  servers_(servers),
97
  numRetries_(1),
98
  retryInterval_(60),
99
  maxConsecutiveFailures_(1),
100
  randomize_(true),
101
  alwaysTryLast_(true)
102
{
103
}
104
 
105
TSocketPool::TSocketPool(const string& host, int port) : TSocket(),
106
  numRetries_(1),
107
  retryInterval_(60),
108
  maxConsecutiveFailures_(1),
109
  randomize_(true),
110
  alwaysTryLast_(true)
111
{
112
  addServer(host, port);
113
}
114
 
115
TSocketPool::~TSocketPool() {
116
  vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
117
  vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
118
  for (; iter != iterEnd; ++iter) {
119
    setCurrentServer(*iter);
120
    TSocketPool::close();
121
  }
122
}
123
 
124
void TSocketPool::addServer(const string& host, int port) {
125
  servers_.push_back(shared_ptr<TSocketPoolServer>(new TSocketPoolServer(host, port)));
126
}
127
 
128
void TSocketPool::setServers(const vector< shared_ptr<TSocketPoolServer> >& servers) {
129
  servers_ = servers;
130
}
131
 
132
void TSocketPool::getServers(vector< shared_ptr<TSocketPoolServer> >& servers) {
133
  servers = servers_;
134
}
135
 
136
void TSocketPool::setNumRetries(int numRetries) {
137
  numRetries_ = numRetries;
138
}
139
 
140
void TSocketPool::setRetryInterval(int retryInterval) {
141
  retryInterval_ = retryInterval;
142
}
143
 
144
 
145
void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) {
146
  maxConsecutiveFailures_ = maxConsecutiveFailures;
147
}
148
 
149
void TSocketPool::setRandomize(bool randomize) {
150
  randomize_ = randomize;
151
}
152
 
153
void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) {
154
  alwaysTryLast_ = alwaysTryLast;
155
}
156
 
157
void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) {
158
  currentServer_ = server;
159
  host_ = server->host_;
160
  port_ = server->port_;
161
  socket_ = server->socket_;
162
}
163
 
164
/* TODO: without apc we ignore a lot of functionality from the php version */
165
void TSocketPool::open() {
166
  if (randomize_) {
167
    random_shuffle(servers_.begin(), servers_.end());
168
  }
169
 
170
  unsigned int numServers = servers_.size();
171
  for (unsigned int i = 0; i < numServers; ++i) {
172
 
173
    shared_ptr<TSocketPoolServer> &server = servers_[i];
174
    bool retryIntervalPassed = (server->lastFailTime_ == 0);
175
    bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
176
 
177
    // Impersonate the server socket
178
    setCurrentServer(server);
179
 
180
    if (isOpen()) {
181
      // already open means we're done
182
      return;
183
    }
184
 
185
    if (server->lastFailTime_ > 0) {
186
      // The server was marked as down, so check if enough time has elapsed to retry
187
      int elapsedTime = time(NULL) - server->lastFailTime_;
188
      if (elapsedTime > retryInterval_) {
189
        retryIntervalPassed = true;
190
      }
191
    }
192
 
193
    if (retryIntervalPassed || isLastServer) {
194
      for (int j = 0; j < numRetries_; ++j) {
195
        try {
196
          TSocket::open();
197
 
198
          // Copy over the opened socket so that we can keep it persistent
199
          server->socket_ = socket_;
200
 
201
          // reset lastFailTime_ is required
202
          if (server->lastFailTime_) {
203
            server->lastFailTime_ = 0;
204
          }
205
 
206
          // success
207
          return;
208
        } catch (TException e) {
209
          string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what();
210
          GlobalOutput(errStr.c_str());
211
          // connection failed
212
        }
213
      }
214
 
215
      ++server->consecutiveFailures_;
216
      if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
217
        // Mark server as down
218
        server->consecutiveFailures_ = 0;
219
        server->lastFailTime_ = time(NULL);
220
      }
221
    }
222
  }
223
 
224
  GlobalOutput("TSocketPool::open: all connections failed");
225
  throw TTransportException(TTransportException::NOT_OPEN);
226
}
227
 
228
void TSocketPool::close() {
229
  if (isOpen()) {
230
    TSocket::close();
231
    currentServer_->socket_ = -1;
232
  }
233
}
234
 
235
}}} // apache::thrift::transport