| 30 |
ashish |
1 |
/*
|
|
|
2 |
* Licensed to the Apache Software Foundation (ASF) under one
|
|
|
3 |
* or more contributor license agreements. See the NOTICE file
|
|
|
4 |
* distributed with this work for additional information
|
|
|
5 |
* regarding copyright ownership. The ASF licenses this file
|
|
|
6 |
* to you under the Apache License, Version 2.0 (the
|
|
|
7 |
* "License"); you may not use this file except in compliance
|
|
|
8 |
* with the License. You may obtain a copy of the License at
|
|
|
9 |
*
|
|
|
10 |
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
11 |
*
|
|
|
12 |
* Unless required by applicable law or agreed to in writing,
|
|
|
13 |
* software distributed under the License is distributed on an
|
|
|
14 |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
15 |
* KIND, either express or implied. See the License for the
|
|
|
16 |
* specific language governing permissions and limitations
|
|
|
17 |
* under the License.
|
|
|
18 |
*/
|
|
|
19 |
|
|
|
20 |
#ifdef HAVE_CONFIG_H
|
|
|
21 |
#include "config.h"
|
|
|
22 |
#endif
|
|
|
23 |
|
|
|
24 |
#include "TFileTransport.h"
|
|
|
25 |
#include "TTransportUtils.h"
|
|
|
26 |
|
|
|
27 |
#include <pthread.h>
|
|
|
28 |
#ifdef HAVE_SYS_TIME_H
|
|
|
29 |
#include <sys/time.h>
|
|
|
30 |
#else
|
|
|
31 |
#include <time.h>
|
|
|
32 |
#endif
|
|
|
33 |
#include <fcntl.h>
|
|
|
34 |
#include <errno.h>
|
|
|
35 |
#include <unistd.h>
|
|
|
36 |
#ifdef HAVE_STRINGS_H
|
|
|
37 |
#include <strings.h>
|
|
|
38 |
#endif
|
|
|
39 |
#include <cstdlib>
|
|
|
40 |
#include <cstring>
|
|
|
41 |
#include <iostream>
|
|
|
42 |
#include <sys/stat.h>
|
|
|
43 |
|
|
|
44 |
namespace apache { namespace thrift { namespace transport {
|
|
|
45 |
|
|
|
46 |
using boost::shared_ptr;
|
|
|
47 |
using namespace std;
|
|
|
48 |
using namespace apache::thrift::protocol;
|
|
|
49 |
|
|
|
50 |
#ifndef HAVE_CLOCK_GETTIME
|
|
|
51 |
|
|
|
52 |
/**
|
|
|
53 |
* Fake clock_gettime for systems like darwin
|
|
|
54 |
*
|
|
|
55 |
*/
|
|
|
56 |
#define CLOCK_REALTIME 0
|
|
|
57 |
static int clock_gettime(int clk_id /*ignored*/, struct timespec *tp) {
|
|
|
58 |
struct timeval now;
|
|
|
59 |
|
|
|
60 |
int rv = gettimeofday(&now, NULL);
|
|
|
61 |
if (rv != 0) {
|
|
|
62 |
return rv;
|
|
|
63 |
}
|
|
|
64 |
|
|
|
65 |
tp->tv_sec = now.tv_sec;
|
|
|
66 |
tp->tv_nsec = now.tv_usec * 1000;
|
|
|
67 |
return 0;
|
|
|
68 |
}
|
|
|
69 |
#endif
|
|
|
70 |
|
|
|
71 |
TFileTransport::TFileTransport(string path, bool readOnly)
|
|
|
72 |
: readState_()
|
|
|
73 |
, readBuff_(NULL)
|
|
|
74 |
, currentEvent_(NULL)
|
|
|
75 |
, readBuffSize_(DEFAULT_READ_BUFF_SIZE)
|
|
|
76 |
, readTimeout_(NO_TAIL_READ_TIMEOUT)
|
|
|
77 |
, chunkSize_(DEFAULT_CHUNK_SIZE)
|
|
|
78 |
, eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE)
|
|
|
79 |
, flushMaxUs_(DEFAULT_FLUSH_MAX_US)
|
|
|
80 |
, flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES)
|
|
|
81 |
, maxEventSize_(DEFAULT_MAX_EVENT_SIZE)
|
|
|
82 |
, maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS)
|
|
|
83 |
, eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US)
|
|
|
84 |
, corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US)
|
|
|
85 |
, writerThreadId_(0)
|
|
|
86 |
, dequeueBuffer_(NULL)
|
|
|
87 |
, enqueueBuffer_(NULL)
|
|
|
88 |
, closing_(false)
|
|
|
89 |
, forceFlush_(false)
|
|
|
90 |
, filename_(path)
|
|
|
91 |
, fd_(0)
|
|
|
92 |
, bufferAndThreadInitialized_(false)
|
|
|
93 |
, offset_(0)
|
|
|
94 |
, lastBadChunk_(0)
|
|
|
95 |
, numCorruptedEventsInChunk_(0)
|
|
|
96 |
, readOnly_(readOnly)
|
|
|
97 |
{
|
|
|
98 |
// initialize all the condition vars/mutexes
|
|
|
99 |
pthread_mutex_init(&mutex_, NULL);
|
|
|
100 |
pthread_cond_init(¬Full_, NULL);
|
|
|
101 |
pthread_cond_init(¬Empty_, NULL);
|
|
|
102 |
pthread_cond_init(&flushed_, NULL);
|
|
|
103 |
|
|
|
104 |
openLogFile();
|
|
|
105 |
}
|
|
|
106 |
|
|
|
107 |
void TFileTransport::resetOutputFile(int fd, string filename, int64_t offset) {
|
|
|
108 |
filename_ = filename;
|
|
|
109 |
offset_ = offset;
|
|
|
110 |
|
|
|
111 |
// check if current file is still open
|
|
|
112 |
if (fd_ > 0) {
|
|
|
113 |
// flush any events in the queue
|
|
|
114 |
flush();
|
|
|
115 |
GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str());
|
|
|
116 |
if (-1 == ::close(fd_)) {
|
|
|
117 |
int errno_copy = errno;
|
|
|
118 |
GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy);
|
|
|
119 |
throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
|
|
|
120 |
}
|
|
|
121 |
}
|
|
|
122 |
|
|
|
123 |
if (fd) {
|
|
|
124 |
fd_ = fd;
|
|
|
125 |
} else {
|
|
|
126 |
// open file if the input fd is 0
|
|
|
127 |
openLogFile();
|
|
|
128 |
}
|
|
|
129 |
}
|
|
|
130 |
|
|
|
131 |
|
|
|
132 |
TFileTransport::~TFileTransport() {
|
|
|
133 |
// flush the buffer if a writer thread is active
|
|
|
134 |
if (writerThreadId_ > 0) {
|
|
|
135 |
// reduce the flush timeout so that closing is quicker
|
|
|
136 |
setFlushMaxUs(300*1000);
|
|
|
137 |
|
|
|
138 |
// flush output buffer
|
|
|
139 |
flush();
|
|
|
140 |
|
|
|
141 |
// set state to closing
|
|
|
142 |
closing_ = true;
|
|
|
143 |
|
|
|
144 |
// TODO: make sure event queue is empty
|
|
|
145 |
// currently only the write buffer is flushed
|
|
|
146 |
// we dont actually wait until the queue is empty. This shouldn't be a big
|
|
|
147 |
// deal in the common case because writing is quick
|
|
|
148 |
|
|
|
149 |
pthread_join(writerThreadId_, NULL);
|
|
|
150 |
writerThreadId_ = 0;
|
|
|
151 |
}
|
|
|
152 |
|
|
|
153 |
if (dequeueBuffer_) {
|
|
|
154 |
delete dequeueBuffer_;
|
|
|
155 |
dequeueBuffer_ = NULL;
|
|
|
156 |
}
|
|
|
157 |
|
|
|
158 |
if (enqueueBuffer_) {
|
|
|
159 |
delete enqueueBuffer_;
|
|
|
160 |
enqueueBuffer_ = NULL;
|
|
|
161 |
}
|
|
|
162 |
|
|
|
163 |
if (readBuff_) {
|
|
|
164 |
delete[] readBuff_;
|
|
|
165 |
readBuff_ = NULL;
|
|
|
166 |
}
|
|
|
167 |
|
|
|
168 |
if (currentEvent_) {
|
|
|
169 |
delete currentEvent_;
|
|
|
170 |
currentEvent_ = NULL;
|
|
|
171 |
}
|
|
|
172 |
|
|
|
173 |
// close logfile
|
|
|
174 |
if (fd_ > 0) {
|
|
|
175 |
if(-1 == ::close(fd_)) {
|
|
|
176 |
GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", errno);
|
|
|
177 |
}
|
|
|
178 |
}
|
|
|
179 |
}
|
|
|
180 |
|
|
|
181 |
bool TFileTransport::initBufferAndWriteThread() {
|
|
|
182 |
if (bufferAndThreadInitialized_) {
|
|
|
183 |
T_ERROR("Trying to double-init TFileTransport");
|
|
|
184 |
return false;
|
|
|
185 |
}
|
|
|
186 |
|
|
|
187 |
if (writerThreadId_ == 0) {
|
|
|
188 |
if (pthread_create(&writerThreadId_, NULL, startWriterThread, (void *)this) != 0) {
|
|
|
189 |
T_ERROR("Could not create writer thread");
|
|
|
190 |
return false;
|
|
|
191 |
}
|
|
|
192 |
}
|
|
|
193 |
|
|
|
194 |
dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
|
|
|
195 |
enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_);
|
|
|
196 |
bufferAndThreadInitialized_ = true;
|
|
|
197 |
|
|
|
198 |
return true;
|
|
|
199 |
}
|
|
|
200 |
|
|
|
201 |
void TFileTransport::write(const uint8_t* buf, uint32_t len) {
|
|
|
202 |
if (readOnly_) {
|
|
|
203 |
throw TTransportException("TFileTransport: attempting to write to file opened readonly");
|
|
|
204 |
}
|
|
|
205 |
|
|
|
206 |
enqueueEvent(buf, len, false);
|
|
|
207 |
}
|
|
|
208 |
|
|
|
209 |
void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
|
|
|
210 |
// can't enqueue more events if file is going to close
|
|
|
211 |
if (closing_) {
|
|
|
212 |
return;
|
|
|
213 |
}
|
|
|
214 |
|
|
|
215 |
// make sure that event size is valid
|
|
|
216 |
if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
|
|
|
217 |
T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_);
|
|
|
218 |
return;
|
|
|
219 |
}
|
|
|
220 |
|
|
|
221 |
if (eventLen == 0) {
|
|
|
222 |
T_ERROR("cannot enqueue an empty event");
|
|
|
223 |
return;
|
|
|
224 |
}
|
|
|
225 |
|
|
|
226 |
eventInfo* toEnqueue = new eventInfo();
|
|
|
227 |
toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4);
|
|
|
228 |
// first 4 bytes is the event length
|
|
|
229 |
memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4);
|
|
|
230 |
// actual event contents
|
|
|
231 |
memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen);
|
|
|
232 |
toEnqueue->eventSize_ = eventLen + 4;
|
|
|
233 |
|
|
|
234 |
// lock mutex
|
|
|
235 |
pthread_mutex_lock(&mutex_);
|
|
|
236 |
|
|
|
237 |
// make sure that enqueue buffer is initialized and writer thread is running
|
|
|
238 |
if (!bufferAndThreadInitialized_) {
|
|
|
239 |
if (!initBufferAndWriteThread()) {
|
|
|
240 |
delete toEnqueue;
|
|
|
241 |
pthread_mutex_unlock(&mutex_);
|
|
|
242 |
return;
|
|
|
243 |
}
|
|
|
244 |
}
|
|
|
245 |
|
|
|
246 |
// Can't enqueue while buffer is full
|
|
|
247 |
while (enqueueBuffer_->isFull()) {
|
|
|
248 |
pthread_cond_wait(¬Full_, &mutex_);
|
|
|
249 |
}
|
|
|
250 |
|
|
|
251 |
// add to the buffer
|
|
|
252 |
if (!enqueueBuffer_->addEvent(toEnqueue)) {
|
|
|
253 |
delete toEnqueue;
|
|
|
254 |
pthread_mutex_unlock(&mutex_);
|
|
|
255 |
return;
|
|
|
256 |
}
|
|
|
257 |
|
|
|
258 |
// signal anybody who's waiting for the buffer to be non-empty
|
|
|
259 |
pthread_cond_signal(¬Empty_);
|
|
|
260 |
|
|
|
261 |
if (blockUntilFlush) {
|
|
|
262 |
pthread_cond_wait(&flushed_, &mutex_);
|
|
|
263 |
}
|
|
|
264 |
|
|
|
265 |
// this really should be a loop where it makes sure it got flushed
|
|
|
266 |
// because condition variables can get triggered by the os for no reason
|
|
|
267 |
// it is probably a non-factor for the time being
|
|
|
268 |
pthread_mutex_unlock(&mutex_);
|
|
|
269 |
}
|
|
|
270 |
|
|
|
271 |
bool TFileTransport::swapEventBuffers(struct timespec* deadline) {
|
|
|
272 |
pthread_mutex_lock(&mutex_);
|
|
|
273 |
if (deadline != NULL) {
|
|
|
274 |
// if we were handed a deadline time struct, do a timed wait
|
|
|
275 |
pthread_cond_timedwait(¬Empty_, &mutex_, deadline);
|
|
|
276 |
} else {
|
|
|
277 |
// just wait until the buffer gets an item
|
|
|
278 |
pthread_cond_wait(¬Empty_, &mutex_);
|
|
|
279 |
}
|
|
|
280 |
|
|
|
281 |
bool swapped = false;
|
|
|
282 |
|
|
|
283 |
// could be empty if we timed out
|
|
|
284 |
if (!enqueueBuffer_->isEmpty()) {
|
|
|
285 |
TFileTransportBuffer *temp = enqueueBuffer_;
|
|
|
286 |
enqueueBuffer_ = dequeueBuffer_;
|
|
|
287 |
dequeueBuffer_ = temp;
|
|
|
288 |
|
|
|
289 |
swapped = true;
|
|
|
290 |
}
|
|
|
291 |
|
|
|
292 |
// unlock the mutex and signal if required
|
|
|
293 |
pthread_mutex_unlock(&mutex_);
|
|
|
294 |
|
|
|
295 |
if (swapped) {
|
|
|
296 |
pthread_cond_signal(¬Full_);
|
|
|
297 |
}
|
|
|
298 |
|
|
|
299 |
return swapped;
|
|
|
300 |
}
|
|
|
301 |
|
|
|
302 |
|
|
|
303 |
void TFileTransport::writerThread() {
|
|
|
304 |
// open file if it is not open
|
|
|
305 |
if(!fd_) {
|
|
|
306 |
openLogFile();
|
|
|
307 |
}
|
|
|
308 |
|
|
|
309 |
// set the offset to the correct value (EOF)
|
|
|
310 |
try {
|
|
|
311 |
seekToEnd();
|
|
|
312 |
} catch (TException &te) {
|
|
|
313 |
}
|
|
|
314 |
|
|
|
315 |
// throw away any partial events
|
|
|
316 |
offset_ += readState_.lastDispatchPtr_;
|
|
|
317 |
ftruncate(fd_, offset_);
|
|
|
318 |
readState_.resetAllValues();
|
|
|
319 |
|
|
|
320 |
// Figure out the next time by which a flush must take place
|
|
|
321 |
|
|
|
322 |
struct timespec ts_next_flush;
|
|
|
323 |
getNextFlushTime(&ts_next_flush);
|
|
|
324 |
uint32_t unflushed = 0;
|
|
|
325 |
|
|
|
326 |
while(1) {
|
|
|
327 |
// this will only be true when the destructor is being invoked
|
|
|
328 |
if(closing_) {
|
|
|
329 |
// empty out both the buffers
|
|
|
330 |
if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) {
|
|
|
331 |
if (-1 == ::close(fd_)) {
|
|
|
332 |
int errno_copy = errno;
|
|
|
333 |
GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy);
|
|
|
334 |
throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy);
|
|
|
335 |
}
|
|
|
336 |
// just be safe and sync to disk
|
|
|
337 |
fsync(fd_);
|
|
|
338 |
fd_ = 0;
|
|
|
339 |
pthread_exit(NULL);
|
|
|
340 |
return;
|
|
|
341 |
}
|
|
|
342 |
}
|
|
|
343 |
|
|
|
344 |
if (swapEventBuffers(&ts_next_flush)) {
|
|
|
345 |
eventInfo* outEvent;
|
|
|
346 |
while (NULL != (outEvent = dequeueBuffer_->getNext())) {
|
|
|
347 |
if (!outEvent) {
|
|
|
348 |
T_DEBUG_L(1, "Got an empty event");
|
|
|
349 |
return;
|
|
|
350 |
}
|
|
|
351 |
|
|
|
352 |
// sanity check on event
|
|
|
353 |
if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) {
|
|
|
354 |
T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_);
|
|
|
355 |
continue;
|
|
|
356 |
}
|
|
|
357 |
|
|
|
358 |
// If chunking is required, then make sure that msg does not cross chunk boundary
|
|
|
359 |
if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) {
|
|
|
360 |
|
|
|
361 |
// event size must be less than chunk size
|
|
|
362 |
if(outEvent->eventSize_ > chunkSize_) {
|
|
|
363 |
T_ERROR("TFileTransport: event size(%u) is greater than chunk size(%u): skipping event",
|
|
|
364 |
outEvent->eventSize_, chunkSize_);
|
|
|
365 |
continue;
|
|
|
366 |
}
|
|
|
367 |
|
|
|
368 |
int64_t chunk1 = offset_/chunkSize_;
|
|
|
369 |
int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_;
|
|
|
370 |
|
|
|
371 |
// if adding this event will cross a chunk boundary, pad the chunk with zeros
|
|
|
372 |
if (chunk1 != chunk2) {
|
|
|
373 |
// refetch the offset to keep in sync
|
|
|
374 |
offset_ = lseek(fd_, 0, SEEK_CUR);
|
|
|
375 |
int32_t padding = (int32_t)((offset_/chunkSize_ + 1)*chunkSize_ - offset_);
|
|
|
376 |
|
|
|
377 |
uint8_t zeros[padding];
|
|
|
378 |
bzero(zeros, padding);
|
|
|
379 |
if (-1 == ::write(fd_, zeros, padding)) {
|
|
|
380 |
int errno_copy = errno;
|
|
|
381 |
GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy);
|
|
|
382 |
throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while padding zeros", errno_copy);
|
|
|
383 |
}
|
|
|
384 |
unflushed += padding;
|
|
|
385 |
offset_ += padding;
|
|
|
386 |
}
|
|
|
387 |
}
|
|
|
388 |
|
|
|
389 |
// write the dequeued event to the file
|
|
|
390 |
if (outEvent->eventSize_ > 0) {
|
|
|
391 |
if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) {
|
|
|
392 |
int errno_copy = errno;
|
|
|
393 |
GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy);
|
|
|
394 |
throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error while writing event", errno_copy);
|
|
|
395 |
}
|
|
|
396 |
|
|
|
397 |
unflushed += outEvent->eventSize_;
|
|
|
398 |
offset_ += outEvent->eventSize_;
|
|
|
399 |
}
|
|
|
400 |
}
|
|
|
401 |
dequeueBuffer_->reset();
|
|
|
402 |
}
|
|
|
403 |
|
|
|
404 |
bool flushTimeElapsed = false;
|
|
|
405 |
struct timespec current_time;
|
|
|
406 |
clock_gettime(CLOCK_REALTIME, ¤t_time);
|
|
|
407 |
|
|
|
408 |
if (current_time.tv_sec > ts_next_flush.tv_sec ||
|
|
|
409 |
(current_time.tv_sec == ts_next_flush.tv_sec && current_time.tv_nsec > ts_next_flush.tv_nsec)) {
|
|
|
410 |
flushTimeElapsed = true;
|
|
|
411 |
getNextFlushTime(&ts_next_flush);
|
|
|
412 |
}
|
|
|
413 |
|
|
|
414 |
// couple of cases from which a flush could be triggered
|
|
|
415 |
if ((flushTimeElapsed && unflushed > 0) ||
|
|
|
416 |
unflushed > flushMaxBytes_ ||
|
|
|
417 |
forceFlush_) {
|
|
|
418 |
|
|
|
419 |
// sync (force flush) file to disk
|
|
|
420 |
fsync(fd_);
|
|
|
421 |
unflushed = 0;
|
|
|
422 |
|
|
|
423 |
// notify anybody waiting for flush completion
|
|
|
424 |
forceFlush_ = false;
|
|
|
425 |
pthread_cond_broadcast(&flushed_);
|
|
|
426 |
}
|
|
|
427 |
}
|
|
|
428 |
}
|
|
|
429 |
|
|
|
430 |
void TFileTransport::flush() {
|
|
|
431 |
// file must be open for writing for any flushing to take place
|
|
|
432 |
if (writerThreadId_ <= 0) {
|
|
|
433 |
return;
|
|
|
434 |
}
|
|
|
435 |
// wait for flush to take place
|
|
|
436 |
pthread_mutex_lock(&mutex_);
|
|
|
437 |
|
|
|
438 |
forceFlush_ = true;
|
|
|
439 |
|
|
|
440 |
while (forceFlush_) {
|
|
|
441 |
pthread_cond_wait(&flushed_, &mutex_);
|
|
|
442 |
}
|
|
|
443 |
|
|
|
444 |
pthread_mutex_unlock(&mutex_);
|
|
|
445 |
}
|
|
|
446 |
|
|
|
447 |
|
|
|
448 |
uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) {
|
|
|
449 |
uint32_t have = 0;
|
|
|
450 |
uint32_t get = 0;
|
|
|
451 |
|
|
|
452 |
while (have < len) {
|
|
|
453 |
get = read(buf+have, len-have);
|
|
|
454 |
if (get <= 0) {
|
|
|
455 |
throw TEOFException();
|
|
|
456 |
}
|
|
|
457 |
have += get;
|
|
|
458 |
}
|
|
|
459 |
|
|
|
460 |
return have;
|
|
|
461 |
}
|
|
|
462 |
|
|
|
463 |
uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) {
|
|
|
464 |
// check if there an event is ready to be read
|
|
|
465 |
if (!currentEvent_) {
|
|
|
466 |
currentEvent_ = readEvent();
|
|
|
467 |
}
|
|
|
468 |
|
|
|
469 |
// did not manage to read an event from the file. This could have happened
|
|
|
470 |
// if the timeout expired or there was some other error
|
|
|
471 |
if (!currentEvent_) {
|
|
|
472 |
return 0;
|
|
|
473 |
}
|
|
|
474 |
|
|
|
475 |
// read as much of the current event as possible
|
|
|
476 |
int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_;
|
|
|
477 |
if (remaining <= (int32_t)len) {
|
|
|
478 |
// copy over anything thats remaining
|
|
|
479 |
if (remaining > 0) {
|
|
|
480 |
memcpy(buf,
|
|
|
481 |
currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_,
|
|
|
482 |
remaining);
|
|
|
483 |
}
|
|
|
484 |
delete(currentEvent_);
|
|
|
485 |
currentEvent_ = NULL;
|
|
|
486 |
return remaining;
|
|
|
487 |
}
|
|
|
488 |
|
|
|
489 |
// read as much as possible
|
|
|
490 |
memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len);
|
|
|
491 |
currentEvent_->eventBuffPos_ += len;
|
|
|
492 |
return len;
|
|
|
493 |
}
|
|
|
494 |
|
|
|
495 |
eventInfo* TFileTransport::readEvent() {
|
|
|
496 |
int readTries = 0;
|
|
|
497 |
|
|
|
498 |
if (!readBuff_) {
|
|
|
499 |
readBuff_ = new uint8_t[readBuffSize_];
|
|
|
500 |
}
|
|
|
501 |
|
|
|
502 |
while (1) {
|
|
|
503 |
// read from the file if read buffer is exhausted
|
|
|
504 |
if (readState_.bufferPtr_ == readState_.bufferLen_) {
|
|
|
505 |
// advance the offset pointer
|
|
|
506 |
offset_ += readState_.bufferLen_;
|
|
|
507 |
readState_.bufferLen_ = ::read(fd_, readBuff_, readBuffSize_);
|
|
|
508 |
// if (readState_.bufferLen_) {
|
|
|
509 |
// T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_);
|
|
|
510 |
// }
|
|
|
511 |
readState_.bufferPtr_ = 0;
|
|
|
512 |
readState_.lastDispatchPtr_ = 0;
|
|
|
513 |
|
|
|
514 |
// read error
|
|
|
515 |
if (readState_.bufferLen_ == -1) {
|
|
|
516 |
readState_.resetAllValues();
|
|
|
517 |
GlobalOutput("TFileTransport: error while reading from file");
|
|
|
518 |
throw TTransportException("TFileTransport: error while reading from file");
|
|
|
519 |
} else if (readState_.bufferLen_ == 0) { // EOF
|
|
|
520 |
// wait indefinitely if there is no timeout
|
|
|
521 |
if (readTimeout_ == TAIL_READ_TIMEOUT) {
|
|
|
522 |
usleep(eofSleepTime_);
|
|
|
523 |
continue;
|
|
|
524 |
} else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) {
|
|
|
525 |
// reset state
|
|
|
526 |
readState_.resetState(0);
|
|
|
527 |
return NULL;
|
|
|
528 |
} else if (readTimeout_ > 0) {
|
|
|
529 |
// timeout already expired once
|
|
|
530 |
if (readTries > 0) {
|
|
|
531 |
readState_.resetState(0);
|
|
|
532 |
return NULL;
|
|
|
533 |
} else {
|
|
|
534 |
usleep(readTimeout_ * 1000);
|
|
|
535 |
readTries++;
|
|
|
536 |
continue;
|
|
|
537 |
}
|
|
|
538 |
}
|
|
|
539 |
}
|
|
|
540 |
}
|
|
|
541 |
|
|
|
542 |
readTries = 0;
|
|
|
543 |
|
|
|
544 |
// attempt to read an event from the buffer
|
|
|
545 |
while(readState_.bufferPtr_ < readState_.bufferLen_) {
|
|
|
546 |
if (readState_.readingSize_) {
|
|
|
547 |
if(readState_.eventSizeBuffPos_ == 0) {
|
|
|
548 |
if ( (offset_ + readState_.bufferPtr_)/chunkSize_ !=
|
|
|
549 |
((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) {
|
|
|
550 |
// skip one byte towards chunk boundary
|
|
|
551 |
// T_DEBUG_L(1, "Skipping a byte");
|
|
|
552 |
readState_.bufferPtr_++;
|
|
|
553 |
continue;
|
|
|
554 |
}
|
|
|
555 |
}
|
|
|
556 |
|
|
|
557 |
readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] =
|
|
|
558 |
readBuff_[readState_.bufferPtr_++];
|
|
|
559 |
if (readState_.eventSizeBuffPos_ == 4) {
|
|
|
560 |
// 0 length event indicates padding
|
|
|
561 |
if (*((uint32_t *)(readState_.eventSizeBuff_)) == 0) {
|
|
|
562 |
// T_DEBUG_L(1, "Got padding");
|
|
|
563 |
readState_.resetState(readState_.lastDispatchPtr_);
|
|
|
564 |
continue;
|
|
|
565 |
}
|
|
|
566 |
// got a valid event
|
|
|
567 |
readState_.readingSize_ = false;
|
|
|
568 |
if (readState_.event_) {
|
|
|
569 |
delete(readState_.event_);
|
|
|
570 |
}
|
|
|
571 |
readState_.event_ = new eventInfo();
|
|
|
572 |
readState_.event_->eventSize_ = *((uint32_t *)(readState_.eventSizeBuff_));
|
|
|
573 |
|
|
|
574 |
// check if the event is corrupted and perform recovery if required
|
|
|
575 |
if (isEventCorrupted()) {
|
|
|
576 |
performRecovery();
|
|
|
577 |
// start from the top
|
|
|
578 |
break;
|
|
|
579 |
}
|
|
|
580 |
}
|
|
|
581 |
} else {
|
|
|
582 |
if (!readState_.event_->eventBuff_) {
|
|
|
583 |
readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_];
|
|
|
584 |
readState_.event_->eventBuffPos_ = 0;
|
|
|
585 |
}
|
|
|
586 |
// take either the entire event or the remaining bytes in the buffer
|
|
|
587 |
int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_),
|
|
|
588 |
readState_.event_->eventSize_ - readState_.event_->eventBuffPos_);
|
|
|
589 |
|
|
|
590 |
// copy data from read buffer into event buffer
|
|
|
591 |
memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_,
|
|
|
592 |
readBuff_ + readState_.bufferPtr_,
|
|
|
593 |
reclaimBuffer);
|
|
|
594 |
|
|
|
595 |
// increment position ptrs
|
|
|
596 |
readState_.event_->eventBuffPos_ += reclaimBuffer;
|
|
|
597 |
readState_.bufferPtr_ += reclaimBuffer;
|
|
|
598 |
|
|
|
599 |
// check if the event has been read in full
|
|
|
600 |
if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) {
|
|
|
601 |
// set the completed event to the current event
|
|
|
602 |
eventInfo* completeEvent = readState_.event_;
|
|
|
603 |
completeEvent->eventBuffPos_ = 0;
|
|
|
604 |
|
|
|
605 |
readState_.event_ = NULL;
|
|
|
606 |
readState_.resetState(readState_.bufferPtr_);
|
|
|
607 |
|
|
|
608 |
// exit criteria
|
|
|
609 |
return completeEvent;
|
|
|
610 |
}
|
|
|
611 |
}
|
|
|
612 |
}
|
|
|
613 |
|
|
|
614 |
}
|
|
|
615 |
}
|
|
|
616 |
|
|
|
617 |
bool TFileTransport::isEventCorrupted() {
|
|
|
618 |
// an error is triggered if:
|
|
|
619 |
if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) {
|
|
|
620 |
// 1. Event size is larger than user-speficied max-event size
|
|
|
621 |
T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)",
|
|
|
622 |
readState_.event_->eventSize_, maxEventSize_);
|
|
|
623 |
return true;
|
|
|
624 |
} else if (readState_.event_->eventSize_ > chunkSize_) {
|
|
|
625 |
// 2. Event size is larger than chunk size
|
|
|
626 |
T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)",
|
|
|
627 |
readState_.event_->eventSize_, chunkSize_);
|
|
|
628 |
return true;
|
|
|
629 |
} else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) !=
|
|
|
630 |
((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) {
|
|
|
631 |
// 3. size indicates that event crosses chunk boundary
|
|
|
632 |
T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%ld",
|
|
|
633 |
readState_.event_->eventSize_, offset_ + readState_.bufferPtr_ + 4);
|
|
|
634 |
return true;
|
|
|
635 |
}
|
|
|
636 |
|
|
|
637 |
return false;
|
|
|
638 |
}
|
|
|
639 |
|
|
|
640 |
void TFileTransport::performRecovery() {
|
|
|
641 |
// perform some kickass recovery
|
|
|
642 |
uint32_t curChunk = getCurChunk();
|
|
|
643 |
if (lastBadChunk_ == curChunk) {
|
|
|
644 |
numCorruptedEventsInChunk_++;
|
|
|
645 |
} else {
|
|
|
646 |
lastBadChunk_ = curChunk;
|
|
|
647 |
numCorruptedEventsInChunk_ = 1;
|
|
|
648 |
}
|
|
|
649 |
|
|
|
650 |
if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) {
|
|
|
651 |
// maybe there was an error in reading the file from disk
|
|
|
652 |
// seek to the beginning of chunk and try again
|
|
|
653 |
seekToChunk(curChunk);
|
|
|
654 |
} else {
|
|
|
655 |
|
|
|
656 |
// just skip ahead to the next chunk if we not already at the last chunk
|
|
|
657 |
if (curChunk != (getNumChunks() - 1)) {
|
|
|
658 |
seekToChunk(curChunk + 1);
|
|
|
659 |
} else if (readTimeout_ == TAIL_READ_TIMEOUT) {
|
|
|
660 |
// if tailing the file, wait until there is enough data to start
|
|
|
661 |
// the next chunk
|
|
|
662 |
while(curChunk == (getNumChunks() - 1)) {
|
|
|
663 |
usleep(DEFAULT_CORRUPTED_SLEEP_TIME_US);
|
|
|
664 |
}
|
|
|
665 |
seekToChunk(curChunk + 1);
|
|
|
666 |
} else {
|
|
|
667 |
// pretty hosed at this stage, rewind the file back to the last successful
|
|
|
668 |
// point and punt on the error
|
|
|
669 |
readState_.resetState(readState_.lastDispatchPtr_);
|
|
|
670 |
currentEvent_ = NULL;
|
|
|
671 |
char errorMsg[1024];
|
|
|
672 |
sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu",
|
|
|
673 |
offset_ + readState_.lastDispatchPtr_);
|
|
|
674 |
GlobalOutput(errorMsg);
|
|
|
675 |
throw TTransportException(errorMsg);
|
|
|
676 |
}
|
|
|
677 |
}
|
|
|
678 |
|
|
|
679 |
}
|
|
|
680 |
|
|
|
681 |
void TFileTransport::seekToChunk(int32_t chunk) {
|
|
|
682 |
if (fd_ <= 0) {
|
|
|
683 |
throw TTransportException("File not open");
|
|
|
684 |
}
|
|
|
685 |
|
|
|
686 |
int32_t numChunks = getNumChunks();
|
|
|
687 |
|
|
|
688 |
// file is empty, seeking to chunk is pointless
|
|
|
689 |
if (numChunks == 0) {
|
|
|
690 |
return;
|
|
|
691 |
}
|
|
|
692 |
|
|
|
693 |
// negative indicates reverse seek (from the end)
|
|
|
694 |
if (chunk < 0) {
|
|
|
695 |
chunk += numChunks;
|
|
|
696 |
}
|
|
|
697 |
|
|
|
698 |
// too large a value for reverse seek, just seek to beginning
|
|
|
699 |
if (chunk < 0) {
|
|
|
700 |
T_DEBUG("Incorrect value for reverse seek. Seeking to beginning...", chunk)
|
|
|
701 |
chunk = 0;
|
|
|
702 |
}
|
|
|
703 |
|
|
|
704 |
// cannot seek past EOF
|
|
|
705 |
bool seekToEnd = false;
|
|
|
706 |
uint32_t minEndOffset = 0;
|
|
|
707 |
if (chunk >= numChunks) {
|
|
|
708 |
T_DEBUG("Trying to seek past EOF. Seeking to EOF instead...");
|
|
|
709 |
seekToEnd = true;
|
|
|
710 |
chunk = numChunks - 1;
|
|
|
711 |
// this is the min offset to process events till
|
|
|
712 |
minEndOffset = lseek(fd_, 0, SEEK_END);
|
|
|
713 |
}
|
|
|
714 |
|
|
|
715 |
off_t newOffset = off_t(chunk) * chunkSize_;
|
|
|
716 |
offset_ = lseek(fd_, newOffset, SEEK_SET);
|
|
|
717 |
readState_.resetAllValues();
|
|
|
718 |
currentEvent_ = NULL;
|
|
|
719 |
if (offset_ == -1) {
|
|
|
720 |
GlobalOutput("TFileTransport: lseek error in seekToChunk");
|
|
|
721 |
throw TTransportException("TFileTransport: lseek error in seekToChunk");
|
|
|
722 |
}
|
|
|
723 |
|
|
|
724 |
// seek to EOF if user wanted to go to last chunk
|
|
|
725 |
if (seekToEnd) {
|
|
|
726 |
uint32_t oldReadTimeout = getReadTimeout();
|
|
|
727 |
setReadTimeout(NO_TAIL_READ_TIMEOUT);
|
|
|
728 |
// keep on reading unti the last event at point of seekChunk call
|
|
|
729 |
while (readEvent() && ((offset_ + readState_.bufferPtr_) < minEndOffset)) {};
|
|
|
730 |
setReadTimeout(oldReadTimeout);
|
|
|
731 |
}
|
|
|
732 |
|
|
|
733 |
}
|
|
|
734 |
|
|
|
735 |
void TFileTransport::seekToEnd() {
|
|
|
736 |
seekToChunk(getNumChunks());
|
|
|
737 |
}
|
|
|
738 |
|
|
|
739 |
uint32_t TFileTransport::getNumChunks() {
|
|
|
740 |
if (fd_ <= 0) {
|
|
|
741 |
return 0;
|
|
|
742 |
}
|
|
|
743 |
|
|
|
744 |
struct stat f_info;
|
|
|
745 |
int rv = fstat(fd_, &f_info);
|
|
|
746 |
|
|
|
747 |
if (rv < 0) {
|
|
|
748 |
int errno_copy = errno;
|
|
|
749 |
throw TTransportException(TTransportException::UNKNOWN,
|
|
|
750 |
"TFileTransport::getNumChunks() (fstat)",
|
|
|
751 |
errno_copy);
|
|
|
752 |
}
|
|
|
753 |
|
|
|
754 |
if (f_info.st_size > 0) {
|
|
|
755 |
return ((f_info.st_size)/chunkSize_) + 1;
|
|
|
756 |
}
|
|
|
757 |
|
|
|
758 |
// empty file has no chunks
|
|
|
759 |
return 0;
|
|
|
760 |
}
|
|
|
761 |
|
|
|
762 |
uint32_t TFileTransport::getCurChunk() {
|
|
|
763 |
return offset_/chunkSize_;
|
|
|
764 |
}
|
|
|
765 |
|
|
|
766 |
// Utility Functions
|
|
|
767 |
void TFileTransport::openLogFile() {
|
|
|
768 |
mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH;
|
|
|
769 |
int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND;
|
|
|
770 |
fd_ = ::open(filename_.c_str(), flags, mode);
|
|
|
771 |
offset_ = 0;
|
|
|
772 |
|
|
|
773 |
// make sure open call was successful
|
|
|
774 |
if(fd_ == -1) {
|
|
|
775 |
int errno_copy = errno;
|
|
|
776 |
GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy);
|
|
|
777 |
throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy);
|
|
|
778 |
}
|
|
|
779 |
|
|
|
780 |
}
|
|
|
781 |
|
|
|
782 |
void TFileTransport::getNextFlushTime(struct timespec* ts_next_flush) {
|
|
|
783 |
clock_gettime(CLOCK_REALTIME, ts_next_flush);
|
|
|
784 |
ts_next_flush->tv_nsec += (flushMaxUs_ % 1000000) * 1000;
|
|
|
785 |
if (ts_next_flush->tv_nsec > 1000000000) {
|
|
|
786 |
ts_next_flush->tv_nsec -= 1000000000;
|
|
|
787 |
ts_next_flush->tv_sec += 1;
|
|
|
788 |
}
|
|
|
789 |
ts_next_flush->tv_sec += flushMaxUs_ / 1000000;
|
|
|
790 |
}
|
|
|
791 |
|
|
|
792 |
TFileTransportBuffer::TFileTransportBuffer(uint32_t size)
|
|
|
793 |
: bufferMode_(WRITE)
|
|
|
794 |
, writePoint_(0)
|
|
|
795 |
, readPoint_(0)
|
|
|
796 |
, size_(size)
|
|
|
797 |
{
|
|
|
798 |
buffer_ = new eventInfo*[size];
|
|
|
799 |
}
|
|
|
800 |
|
|
|
801 |
TFileTransportBuffer::~TFileTransportBuffer() {
|
|
|
802 |
if (buffer_) {
|
|
|
803 |
for (uint32_t i = 0; i < writePoint_; i++) {
|
|
|
804 |
delete buffer_[i];
|
|
|
805 |
}
|
|
|
806 |
delete[] buffer_;
|
|
|
807 |
buffer_ = NULL;
|
|
|
808 |
}
|
|
|
809 |
}
|
|
|
810 |
|
|
|
811 |
bool TFileTransportBuffer::addEvent(eventInfo *event) {
|
|
|
812 |
if (bufferMode_ == READ) {
|
|
|
813 |
GlobalOutput("Trying to write to a buffer in read mode");
|
|
|
814 |
}
|
|
|
815 |
if (writePoint_ < size_) {
|
|
|
816 |
buffer_[writePoint_++] = event;
|
|
|
817 |
return true;
|
|
|
818 |
} else {
|
|
|
819 |
// buffer is full
|
|
|
820 |
return false;
|
|
|
821 |
}
|
|
|
822 |
}
|
|
|
823 |
|
|
|
824 |
eventInfo* TFileTransportBuffer::getNext() {
|
|
|
825 |
if (bufferMode_ == WRITE) {
|
|
|
826 |
bufferMode_ = READ;
|
|
|
827 |
}
|
|
|
828 |
if (readPoint_ < writePoint_) {
|
|
|
829 |
return buffer_[readPoint_++];
|
|
|
830 |
} else {
|
|
|
831 |
// no more entries
|
|
|
832 |
return NULL;
|
|
|
833 |
}
|
|
|
834 |
}
|
|
|
835 |
|
|
|
836 |
void TFileTransportBuffer::reset() {
|
|
|
837 |
if (bufferMode_ == WRITE || writePoint_ > readPoint_) {
|
|
|
838 |
T_DEBUG("Resetting a buffer with unread entries");
|
|
|
839 |
}
|
|
|
840 |
// Clean up the old entries
|
|
|
841 |
for (uint32_t i = 0; i < writePoint_; i++) {
|
|
|
842 |
delete buffer_[i];
|
|
|
843 |
}
|
|
|
844 |
bufferMode_ = WRITE;
|
|
|
845 |
writePoint_ = 0;
|
|
|
846 |
readPoint_ = 0;
|
|
|
847 |
}
|
|
|
848 |
|
|
|
849 |
bool TFileTransportBuffer::isFull() {
|
|
|
850 |
return writePoint_ == size_;
|
|
|
851 |
}
|
|
|
852 |
|
|
|
853 |
bool TFileTransportBuffer::isEmpty() {
|
|
|
854 |
return writePoint_ == 0;
|
|
|
855 |
}
|
|
|
856 |
|
|
|
857 |
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
|
|
|
858 |
shared_ptr<TProtocolFactory> protocolFactory,
|
|
|
859 |
shared_ptr<TFileReaderTransport> inputTransport):
|
|
|
860 |
processor_(processor),
|
|
|
861 |
inputProtocolFactory_(protocolFactory),
|
|
|
862 |
outputProtocolFactory_(protocolFactory),
|
|
|
863 |
inputTransport_(inputTransport) {
|
|
|
864 |
|
|
|
865 |
// default the output transport to a null transport (common case)
|
|
|
866 |
outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
|
|
|
867 |
}
|
|
|
868 |
|
|
|
869 |
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
|
|
|
870 |
shared_ptr<TProtocolFactory> inputProtocolFactory,
|
|
|
871 |
shared_ptr<TProtocolFactory> outputProtocolFactory,
|
|
|
872 |
shared_ptr<TFileReaderTransport> inputTransport):
|
|
|
873 |
processor_(processor),
|
|
|
874 |
inputProtocolFactory_(inputProtocolFactory),
|
|
|
875 |
outputProtocolFactory_(outputProtocolFactory),
|
|
|
876 |
inputTransport_(inputTransport) {
|
|
|
877 |
|
|
|
878 |
// default the output transport to a null transport (common case)
|
|
|
879 |
outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport());
|
|
|
880 |
}
|
|
|
881 |
|
|
|
882 |
TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor,
|
|
|
883 |
shared_ptr<TProtocolFactory> protocolFactory,
|
|
|
884 |
shared_ptr<TFileReaderTransport> inputTransport,
|
|
|
885 |
shared_ptr<TTransport> outputTransport):
|
|
|
886 |
processor_(processor),
|
|
|
887 |
inputProtocolFactory_(protocolFactory),
|
|
|
888 |
outputProtocolFactory_(protocolFactory),
|
|
|
889 |
inputTransport_(inputTransport),
|
|
|
890 |
outputTransport_(outputTransport) {};
|
|
|
891 |
|
|
|
892 |
void TFileProcessor::process(uint32_t numEvents, bool tail) {
|
|
|
893 |
shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
|
|
|
894 |
shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
|
|
|
895 |
|
|
|
896 |
// set the read timeout to 0 if tailing is required
|
|
|
897 |
int32_t oldReadTimeout = inputTransport_->getReadTimeout();
|
|
|
898 |
if (tail) {
|
|
|
899 |
// save old read timeout so it can be restored
|
|
|
900 |
inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT);
|
|
|
901 |
}
|
|
|
902 |
|
|
|
903 |
uint32_t numProcessed = 0;
|
|
|
904 |
while(1) {
|
|
|
905 |
// bad form to use exceptions for flow control but there is really
|
|
|
906 |
// no other way around it
|
|
|
907 |
try {
|
|
|
908 |
processor_->process(inputProtocol, outputProtocol);
|
|
|
909 |
numProcessed++;
|
|
|
910 |
if ( (numEvents > 0) && (numProcessed == numEvents)) {
|
|
|
911 |
return;
|
|
|
912 |
}
|
|
|
913 |
} catch (TEOFException& teof) {
|
|
|
914 |
if (!tail) {
|
|
|
915 |
break;
|
|
|
916 |
}
|
|
|
917 |
} catch (TException &te) {
|
|
|
918 |
cerr << te.what() << endl;
|
|
|
919 |
break;
|
|
|
920 |
}
|
|
|
921 |
}
|
|
|
922 |
|
|
|
923 |
// restore old read timeout
|
|
|
924 |
if (tail) {
|
|
|
925 |
inputTransport_->setReadTimeout(oldReadTimeout);
|
|
|
926 |
}
|
|
|
927 |
|
|
|
928 |
}
|
|
|
929 |
|
|
|
930 |
void TFileProcessor::processChunk() {
|
|
|
931 |
shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_);
|
|
|
932 |
shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_);
|
|
|
933 |
|
|
|
934 |
uint32_t curChunk = inputTransport_->getCurChunk();
|
|
|
935 |
|
|
|
936 |
while(1) {
|
|
|
937 |
// bad form to use exceptions for flow control but there is really
|
|
|
938 |
// no other way around it
|
|
|
939 |
try {
|
|
|
940 |
processor_->process(inputProtocol, outputProtocol);
|
|
|
941 |
if (curChunk != inputTransport_->getCurChunk()) {
|
|
|
942 |
break;
|
|
|
943 |
}
|
|
|
944 |
} catch (TEOFException& teof) {
|
|
|
945 |
break;
|
|
|
946 |
} catch (TException &te) {
|
|
|
947 |
cerr << te.what() << endl;
|
|
|
948 |
break;
|
|
|
949 |
}
|
|
|
950 |
}
|
|
|
951 |
}
|
|
|
952 |
|
|
|
953 |
}}} // apache::thrift::transport
|