Subversion Repositories SmartDukaan

Rev

Rev 301 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
301 ashish 1
//  Copyright (c) 2007-2008 Facebook
2
//
3
//  Licensed under the Apache License, Version 2.0 (the "License");
4
//  you may not use this file except in compliance with the License.
5
//  You may obtain a copy of the License at
6
//
7
//      http://www.apache.org/licenses/LICENSE-2.0
8
//
9
//  Unless required by applicable law or agreed to in writing, software
10
//  distributed under the License is distributed on an "AS IS" BASIS,
11
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
//  See the License for the specific language governing permissions and
13
//  limitations under the License.
14
//
15
// See accompanying file LICENSE or visit the Scribe site at:
16
// http://developers.facebook.com/scribe/
17
//
18
// @author Bobby Johnson
19
// @author Jason Sobel
20
// @author Avinash Lakshman
21
 
22
#include "common.h"
23
#include "file.h"
24
#include "HdfsFile.h"
25
 
26
// INITIAL_BUFFER_SIZE must always be >= UINT_SIZE
27
#define INITIAL_BUFFER_SIZE 4096
28
#define UINT_SIZE 4
29
 
30
using namespace std;
31
using boost::shared_ptr;
32
 
33
boost::shared_ptr<FileInterface> FileInterface::createFileInterface(const std::string& type,
34
                                                                    const std::string& name,
35
                                                                    bool framed) {
36
  if (0 == type.compare("std")) {
37
    return shared_ptr<FileInterface>(new StdFile(name, framed));
38
  } else if (0 == type.compare("hdfs")) {
39
    return shared_ptr<FileInterface>(new HdfsFile(name));
40
  } else {
41
    return shared_ptr<FileInterface>();
42
  }
43
}
44
 
45
std::vector<std::string> FileInterface::list(const std::string& path, const std::string &fsType) {
46
  std::vector<std::string> files;
47
  shared_ptr<FileInterface> concrete_file = createFileInterface(fsType, path);
48
  if (concrete_file) {
49
    concrete_file->listImpl(path, files);
50
  }
51
  return files;
52
}
53
 
54
FileInterface::FileInterface(const std::string& name, bool frame)
55
  : framed(frame), filename(name) {
56
}
57
 
58
FileInterface::~FileInterface() {
59
}
60
 
61
StdFile::StdFile(const std::string& name, bool frame)
62
  : FileInterface(name, frame), inputBuffer(NULL), bufferSize(0) {
63
}
64
 
65
StdFile::~StdFile() {
66
  if (inputBuffer) {
67
    delete[] inputBuffer;
68
    inputBuffer = NULL;
69
  }
70
}
71
 
72
bool StdFile::openRead() {
73
  return open(fstream::in);
74
}
75
 
76
bool StdFile::openWrite() {
77
  // open file for write in append mode
78
  ios_base::openmode mode = fstream::out | fstream::app;
79
  return open(mode);
80
}
81
 
82
bool StdFile::openTruncate() {
83
  // open an existing file for write and truncate its contents
84
  ios_base::openmode mode = fstream::out | fstream::app | fstream::trunc;
85
  return open(mode);
86
}
87
 
88
bool StdFile::open(ios_base::openmode mode) {
89
 
90
  if (file.is_open()) {
91
    return false;
92
  }
93
 
94
  file.open(filename.c_str(), mode);
95
 
96
  return file.good();
97
}
98
 
99
bool StdFile::isOpen() {
100
  return file.is_open();
101
}
102
 
103
void StdFile::close() {
104
  if (file.is_open()) {
105
    file.close();
106
  }
107
}
108
 
109
string StdFile::getFrame(unsigned data_length) {
110
 
111
  if (framed) {
112
    char buf[UINT_SIZE];
113
    serializeUInt(data_length, buf);
114
    return string(buf, UINT_SIZE);
115
 
116
  } else {
117
    return string();
118
  }
119
}
120
 
121
bool StdFile::write(const std::string& data) {
122
 
123
  if (!file.is_open()) {
124
    return false;
125
  }
126
 
127
  file << data;
128
  if (file.bad()) {
129
    return false;
130
  }
131
  return true;
132
}
133
 
134
void StdFile::flush() {
135
  if (file.is_open()) {
136
    file.flush();
137
  }
138
}
139
 
140
bool StdFile::readNext(std::string& _return) {
141
 
142
  if (!inputBuffer) {
143
    bufferSize = INITIAL_BUFFER_SIZE;
144
    inputBuffer = new char[bufferSize];
145
  }
146
 
147
  if (framed) {
148
    unsigned size;
149
    file.read(inputBuffer, UINT_SIZE);  // assumes INITIAL_BUFFER_SIZE > UINT_SIZE
150
    if (file.good() && (size = unserializeUInt(inputBuffer))) {
151
 
152
      // check if size is larger than half the max uint size
153
      if (size >= (((unsigned)1) << (UINT_SIZE*8 - 1))) {
154
        LOG_OPER("WARNING: attempting to read message of size %d bytes", size);
155
 
156
        // Do not try to make bufferSize any larger than this or you might overflow
157
        bufferSize = size;
158
      }
159
 
160
      while (size > bufferSize) {
161
        bufferSize = 2 * bufferSize;
162
        delete[] inputBuffer;
163
        inputBuffer = new char[bufferSize];
164
      }
165
      file.read(inputBuffer, size);
166
      if (file.good()) {
167
        _return.assign(inputBuffer, size);
168
        return true;
169
      } else {
170
        int offset = file.tellg();
171
        LOG_OPER("ERROR: Failed to read file %s at offset %d",
172
                 filename.c_str(), offset);
173
        return false;
174
      }
175
    }
176
  } else {
177
    file.getline(inputBuffer, bufferSize);
178
    if (file.good()) {
179
      _return = inputBuffer;
180
      return true;
181
    }
182
  }
183
  return false;
184
}
185
 
186
unsigned long StdFile::fileSize() {
187
  unsigned long size = 0;
188
  try {
189
    size = boost::filesystem::file_size(filename.c_str());
190
  } catch(std::exception const& e) {
191
    LOG_OPER("Failed to get size for file <%s> error <%s>", filename.c_str(), e.what());
192
    size = 0;
193
  }
194
  return size;
195
}
196
 
197
void StdFile::listImpl(const std::string& path, std::vector<std::string>& _return) {
198
  try {
199
    if (boost::filesystem::exists(path)) {
200
      boost::filesystem::directory_iterator dir_iter(path), end_iter;
201
 
202
      for ( ; dir_iter != end_iter; ++dir_iter) {
203
        _return.push_back(dir_iter->filename());
204
      }
205
    }
206
  } catch (std::exception const& e) {
207
    LOG_OPER("exception <%s> listing files in <%s>",
208
             e.what(), path.c_str());
209
  }
210
}
211
 
212
void StdFile::deleteFile() {
213
  boost::filesystem::remove(filename);
214
}
215
 
216
bool StdFile::createDirectory(std::string path) {
217
  try {
218
    boost::filesystem::create_directories(path);
219
  } catch(std::exception const& e) {
220
    LOG_OPER("Exception < %s > in StdFile::createDirectory for path %s ",
221
      e.what(),path.c_str());
222
    return false;
223
  }
224
 
225
  return true;
226
}
227
 
228
bool StdFile::createSymlink(std::string oldpath, std::string newpath) {
229
  if (symlink(oldpath.c_str(), newpath.c_str()) == 0) {
230
    return true;
231
  }
232
 
233
  return false;
234
}
235
 
236
// Buffer had better be at least UINT_SIZE long!
237
unsigned FileInterface::unserializeUInt(const char* buffer) {
238
  unsigned retval = 0;
239
  int i;
240
  for (i = 0; i < UINT_SIZE; ++i) {
241
    retval |= (unsigned char)buffer[i] << (8 * i);
242
  }
243
  return retval;
244
}
245
 
246
void FileInterface::serializeUInt(unsigned data, char* buffer) {
247
  int i;
248
  for (i = 0; i < UINT_SIZE; ++i) {
249
    buffer[i] = (unsigned char)((data >> (8 * i)) & 0xFF);
250
  }
251
}