Subversion Repositories SmartDukaan

Rev

Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
301 ashish 1
//  Copyright (c) 2007-2008 Facebook
2
//
3
//  Licensed under the Apache License, Version 2.0 (the "License");
4
//  you may not use this file except in compliance with the License.
5
//  You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//  Unless required by applicable law or agreed to in writing, software
10
//  distributed under the License is distributed on an "AS IS" BASIS,
11
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//  See the License for the specific language governing permissions and
13
//  limitations under the License.
14
//
15
// See accompanying file LICENSE or visit the Scribe site at:
16
// http://developers.facebook.com/scribe/
17
//
18
// @author Bobby Johnson
19
// @author James Wang
20
// @author Jason Sobel
21
// @author Avinash Lakshman
22
 
23
 
24
#include "common.h"
25
#include "scribe_server.h"
26
#include "conn_pool.h"
27
 
28
using std::string;
29
using std::ostringstream;
30
using std::map;
31
using boost::shared_ptr;
32
using namespace apache::thrift;
33
using namespace apache::thrift::protocol;
34
using namespace apache::thrift::transport;
35
using namespace apache::thrift::server;
36
using namespace scribe::thrift;
37
 
38
 
39
ConnPool::ConnPool() {
40
  pthread_mutex_init(&mapMutex, NULL);
41
}
42
 
43
ConnPool::~ConnPool() {
44
  pthread_mutex_destroy(&mapMutex);
45
}
46
 
47
string ConnPool::makeKey(const string& hostname, unsigned long port) {
48
  string key(hostname);
49
  key += ":";
50
 
51
  ostringstream oss;
52
  oss << port;
53
  key += oss.str();
54
  return key;
55
}
56
 
57
bool ConnPool::open(const string& hostname, unsigned long port, int timeout) {
58
	return openCommon(makeKey(hostname, port),
59
                    shared_ptr<scribeConn>(new scribeConn(hostname, port, timeout)));
60
}
61
 
62
bool ConnPool::open(const string &service, const server_vector_t &servers, int timeout) {
63
	return openCommon(service,
64
                    shared_ptr<scribeConn>(new scribeConn(service, servers, timeout)));
65
}
66
 
67
void ConnPool::close(const string& hostname, unsigned long port) {
68
  closeCommon(makeKey(hostname, port));
69
}
70
 
71
void ConnPool::close(const string &service) {
72
  closeCommon(service);
73
}
74
 
75
bool ConnPool::send(const string& hostname, unsigned long port,
76
                    shared_ptr<logentry_vector_t> messages) {
77
  return sendCommon(makeKey(hostname, port), messages);
78
}
79
 
80
bool ConnPool::send(const string &service,
81
                    shared_ptr<logentry_vector_t> messages) {
82
  return sendCommon(service, messages);
83
}
84
 
85
bool ConnPool::openCommon(const string &key, shared_ptr<scribeConn> conn) {
86
 
87
  // note on locking:
88
  // The mapMutex locks all reads and writes to the connMap.
89
  // The locks on each connection serialize writes and deletion.
90
  // The lock on a connection doesn't protect its refcount, as refcounts
91
  // are only accessed under the mapMutex.
92
  // mapMutex MUST be held before attempting to lock particular connection
93
 
94
  pthread_mutex_lock(&mapMutex);
95
  conn_map_t::iterator iter = connMap.find(key);
96
  if (iter != connMap.end()) {
97
    (*iter).second->addRef();
98
    pthread_mutex_unlock(&mapMutex);
99
    return true;
100
  } else {
101
    // don't need to lock the conn yet, because no one know about
102
    // it until we release the mapMutex
103
    if (conn->open()) {
104
      // ref count starts at one, so don't addRef here
105
      connMap[key] = conn;
106
      pthread_mutex_unlock(&mapMutex);
107
      return true;
108
    } else {
109
      // conn object that failed to open is deleted
110
      pthread_mutex_unlock(&mapMutex);
111
      return false;
112
    }
113
  }
114
}
115
 
116
void ConnPool::closeCommon(const string &key) {
117
  pthread_mutex_lock(&mapMutex);
118
  conn_map_t::iterator iter = connMap.find(key);
119
  if (iter != connMap.end()) {
120
    (*iter).second->releaseRef();
121
    if ((*iter).second->getRef() <= 0) {
122
      (*iter).second->lock();
123
      (*iter).second->close();
124
      (*iter).second->unlock();
125
      connMap.erase(iter);
126
    }
127
  } else {
128
    // This can be bad. If one client double closes then other cleints are screwed
129
    LOG_OPER("LOGIC ERROR: attempting to close connection <%s> that connPool has no entry for", key.c_str());
130
  }
131
  pthread_mutex_unlock(&mapMutex);
132
}
133
 
134
bool ConnPool::sendCommon(const string &key,
135
                          shared_ptr<logentry_vector_t> messages) {
136
  pthread_mutex_lock(&mapMutex);
137
  conn_map_t::iterator iter = connMap.find(key);
138
  if (iter != connMap.end()) {
139
    (*iter).second->lock();
140
    pthread_mutex_unlock(&mapMutex);
141
    bool result = (*iter).second->send(messages);
142
    (*iter).second->unlock();
143
    return result;
144
  } else {
145
    LOG_OPER("send failed. No connection pool entry for <%s>", key.c_str());
146
    pthread_mutex_unlock(&mapMutex);
147
    return false;
148
  }
149
}
150
 
151
scribeConn::scribeConn(const string& hostname, unsigned long port, int timeout_)
152
  : refCount(1),
153
  smcBased(false),
154
  remoteHost(hostname),
155
  remotePort(port),
156
  timeout(timeout_) {
157
  pthread_mutex_init(&mutex, NULL);
158
}
159
 
160
scribeConn::scribeConn(const string& service, const server_vector_t &servers, int timeout_)
161
  : refCount(1),
162
  smcBased(true),
163
  smcService(service),
164
  serverList(servers),
165
  timeout(timeout_) {
166
  pthread_mutex_init(&mutex, NULL);
167
}
168
 
169
scribeConn::~scribeConn() {
170
  pthread_mutex_destroy(&mutex);
171
}
172
 
173
void scribeConn::addRef() {
174
  ++refCount;
175
}
176
 
177
void scribeConn::releaseRef() {
178
  --refCount;
179
}
180
 
181
unsigned scribeConn::getRef() {
182
  return refCount;
183
}
184
 
185
void scribeConn::lock() {
186
  pthread_mutex_lock(&mutex);
187
}
188
 
189
void scribeConn::unlock() {
190
  pthread_mutex_unlock(&mutex);
191
}
192
 
193
bool scribeConn::isOpen() {
194
  return framedTransport->isOpen();
195
}
196
 
197
bool scribeConn::open() {
198
  try {
199
 
200
    socket = smcBased ?
201
      shared_ptr<TSocket>(new TSocketPool(serverList)) :
202
      shared_ptr<TSocket>(new TSocket(remoteHost, remotePort));
203
 
204
    if (!socket) {
205
      throw std::runtime_error("Failed to create socket");
206
    }
207
 
208
    socket->setConnTimeout(timeout);
209
    socket->setRecvTimeout(timeout);
210
    socket->setSendTimeout(timeout);
211
 
212
    framedTransport = shared_ptr<TFramedTransport>(new TFramedTransport(socket));
213
    if (!framedTransport) {
214
      throw std::runtime_error("Failed to create framed transport");
215
    }
216
    protocol = shared_ptr<TBinaryProtocol>(new TBinaryProtocol(framedTransport));
217
    if (!protocol) {
218
      throw std::runtime_error("Failed to create protocol");
219
    }
220
    protocol->setStrict(false, false);
221
    resendClient = shared_ptr<scribeClient>(new scribeClient(protocol));
222
    if (!resendClient) {
223
      throw std::runtime_error("Failed to create network client");
224
    }
225
 
226
    framedTransport->open();
227
    if (smcBased) {
228
      remoteHost = socket->getPeerHost();
229
    }
230
  } catch (TTransportException& ttx) {
231
    LOG_OPER("failed to open connection to remote scribe server %s thrift error <%s>",
232
             connectionString().c_str(), ttx.what());
233
    return false;
234
  } catch (std::exception& stx) {
235
    LOG_OPER("failed to open connection to remote scribe server %s std error <%s>",
236
             connectionString().c_str(), stx.what());
237
    return false;
238
  }
239
  LOG_OPER("Opened connection to remote scribe server %s",
240
           connectionString().c_str());
241
  return true;
242
}
243
 
244
void scribeConn::close() {
245
  try {
246
    framedTransport->close();
247
  } catch (TTransportException& ttx) {
248
    LOG_OPER("error <%s> while closing connection to remote scribe server %s",
249
             ttx.what(), connectionString().c_str());
250
  }
251
}
252
 
253
bool scribeConn::send(boost::shared_ptr<logentry_vector_t> messages) {
254
  int size = messages->size();
255
  if (size <= 0) {
256
    return true;
257
  }
258
  if (!isOpen()) {
259
    if (!open()) {
260
      return false;
261
    }
262
  }
263
 
264
  // Copy the vector of pointers to a vector of objects
265
  // This is because thrift doesn't support vectors of pointers,
266
  // but we need to use them internally to avoid even more copies.
267
  std::vector<LogEntry> msgs;
268
  msgs.reserve(size);
269
  for (logentry_vector_t::iterator iter = messages->begin();
270
       iter != messages->end();
271
       ++iter) {
272
    msgs.push_back(**iter);
273
  }
274
  ResultCode result = TRY_LATER;
275
  try {
276
    result = resendClient->Log(msgs);
277
 
278
    if (result == OK) {
279
      if (g_Handler) {
280
        g_Handler->incrementCounter("sent", size);
281
      }
282
      LOG_OPER("Successfully sent <%d> messages to remote scribe server %s",
283
          size, connectionString().c_str());
284
      return true;
285
    } else {
286
      LOG_OPER("Failed to send <%d> messages, remote scribe server %s returned error code <%d>",
287
          size, connectionString().c_str(),
288
          (int) result);
289
    }
290
  } catch (TTransportException& ttx) {
291
    LOG_OPER("Failed to send <%d> messages to remote scribe server %s error <%s>",
292
        size, connectionString().c_str(),
293
        ttx.what());
294
  } catch (...) {
295
    LOG_OPER("Unknown exception sending <%d> messages to remote scribe server %s",
296
        size, connectionString().c_str());
297
  }
298
   // we only get here if sending failed
299
  close();
300
  return false;
301
}
302
 
303
std::string scribeConn::connectionString() {
304
	if (smcBased) {
305
		return "<" + remoteHost + " SMC service: " + smcService + ">";
306
	} else {
307
		char port[10];
308
		snprintf(port, 10, "%lu", remotePort);
309
		return "<" + remoteHost + ":" + string(port) + ">";
310
	}
311
}