Subversion Repositories SmartDukaan

Rev

Rev 301 | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
301 ashish 1
// Copyright (c) 2009- Facebook
2
// Distributed under the Scribe Software License
3
//
4
// See accompanying file LICENSE or visit the Scribe site at:
5
// http://developers.facebook.com/scribe/
6
//
7
 
8
#include <limits>
9
#include "common.h"
10
#include "file.h"
11
#include "HdfsFile.h"
12
 
13
using namespace std;
14
 
15
HdfsFile::HdfsFile(const std::string& name) : FileInterface(name, false), inputBuffer_(NULL), bufferSize_(0) {
16
  LOG_OPER("[hdfs] Connecting to HDFS");
17
 
18
  // First attempt to parse the hdfs cluster from the path name specified.
19
  // If it fails, then use the default hdfs cluster.
20
  fileSys = connectToPath(name.c_str());
21
 
22
  if (fileSys == 0) {
23
    // ideally, we should throw an exception here, but the scribe store code
24
    // does not handle this elegantly now.
25
    LOG_OPER("[hdfs] ERROR: HDFS is not configured for file: %s", name.c_str());
26
  }
27
  hfile = 0;
28
}
29
 
30
HdfsFile::~HdfsFile() {
31
  if (fileSys) {
32
    hdfsDisconnect(fileSys);
33
  }
34
  fileSys = 0;
35
  hfile = 0;
36
}
37
 
38
bool HdfsFile::openRead() {
39
  if (fileSys) {
40
    hfile = hdfsOpenFile(fileSys, filename.c_str(), O_RDONLY, 0, 0, 0);
41
  }
42
  if (hfile) {
43
    LOG_OPER("[hdfs] opened for read %s", filename.c_str());
44
    return true;
45
  }
46
  return false;
47
}
48
 
49
bool HdfsFile::openWrite() {
50
  int flags;
51
 
52
  if (!fileSys) {
53
    return false;
54
  }
55
  if (hfile) {
56
    LOG_OPER("[hdfs] already opened for write %s", filename.c_str());
57
    return false;
58
  }
59
 
60
  if (hdfsExists(fileSys, filename.c_str()) == 0) {
61
    flags = O_WRONLY|O_APPEND; // file exists, append to it.
62
  } else {
63
    flags = O_WRONLY;
64
  }
65
  hfile = hdfsOpenFile(fileSys, filename.c_str(), flags, 0, 0, 0);
66
  if (hfile) {
67
    if (flags & O_APPEND) {
68
      LOG_OPER("[hdfs] opened for append %s", filename.c_str());
69
    } else {
70
      LOG_OPER("[hdfs] opened for write %s", filename.c_str());
71
    }
72
    return true;
73
  }
74
  return false;
75
}
76
 
77
bool HdfsFile::openTruncate() {
78
  LOG_OPER("[hdfs] truncate %s", filename.c_str());
79
  deleteFile();
80
  return openWrite();
81
}
82
 
83
bool HdfsFile::isOpen() {
84
   bool retVal = (hfile) ? true : false;
85
   return retVal;
86
}
87
 
88
void HdfsFile::close() {
89
  if (fileSys) {
90
    if (hfile) {
91
      hdfsCloseFile(fileSys, hfile );
92
      LOG_OPER("[hdfs] closed %s", filename.c_str());
93
    }
94
    hfile = 0;
95
  }
96
}
97
 
98
bool HdfsFile::write(const std::string& data) {
99
  if (!isOpen()) {
100
    bool success = openWrite();
101
 
102
    if (!success) {
103
      return false;
104
    }
105
  }
106
  tSize bytesWritten = hdfsWrite(fileSys, hfile, data.data(),
107
                                 (tSize) data.length());
108
  bool retVal = (bytesWritten == (tSize) data.length()) ? true : false;
109
  return retVal;
110
}
111
 
112
void HdfsFile::flush() {
113
  if (hfile) {
114
    hdfsFlush(fileSys, hfile);
115
  }
116
}
117
 
118
unsigned long HdfsFile::fileSize() {
119
  long size = 0L;
120
 
121
  if (fileSys) {
122
    hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys, filename.c_str());
123
    if (pFileInfo != NULL) {
124
      size = pFileInfo->mSize;
125
      hdfsFreeFileInfo(pFileInfo, 1);
126
    }
127
  }
128
  return size;
129
}
130
 
131
void HdfsFile::deleteFile() {
132
  if (fileSys) {
133
    hdfsDelete(fileSys, filename.c_str());
134
  }
135
  LOG_OPER("[hdfs] deleteFile %s", filename.c_str());
136
}
137
 
138
void HdfsFile::listImpl(const std::string& path,
139
                        std::vector<std::string>& _return) {
140
  if (!fileSys) {
141
    return;
142
  }
143
 
144
  int value = hdfsExists(fileSys, path.c_str());
145
  if (value == 0) {
146
    int numEntries = 0;
147
    hdfsFileInfo* pHdfsFileInfo = 0;
148
    pHdfsFileInfo = hdfsListDirectory(fileSys, path.c_str(), &numEntries);
149
    if (pHdfsFileInfo) {
150
      for(int i = 0; i < numEntries; i++) {
151
        char* pathname = pHdfsFileInfo[i].mName;
152
        char* filename = rindex(pathname, '/');
153
        if (filename != NULL) {
154
          _return.push_back(filename+1);
155
        }
156
      }
157
      hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
158
    }
159
  }
160
}
161
 
162
bool HdfsFile::readNext(std::string& _return) {
163
   return false;           // frames not yet supported
164
}
165
 
166
string HdfsFile::getFrame(unsigned data_length) {
167
  return std::string();    // not supported
168
}
169
 
170
bool HdfsFile::createDirectory(std::string path) {
171
  // opening the file will create the directories.
172
  return true;
173
}
174
 
175
/**
176
 * HDFS currently does not support symlinks. So we create a
177
 * normal file and write the symlink data into it
178
 */
179
bool HdfsFile::createSymlink(std::string oldpath, std::string newpath) {
180
  LOG_OPER("[hdfs] Creating symlink oldpath %s newpath %s",
181
           oldpath.c_str(), newpath.c_str());
182
  HdfsFile* link = new HdfsFile(newpath);
183
  if (link->openWrite() == false) {
184
    LOG_OPER("[hdfs] Creating symlink failed because %s already exists.",
185
             newpath.c_str());
186
    return false;
187
  }
188
  if (link->write(oldpath) == false) {
189
    LOG_OPER("[hdfs] Writing symlink %s failed", newpath.c_str());
190
    return false;
191
  }
192
  link->close();
193
  return true;
194
}
195
 
196
/**
197
 * If the URI is specified of the form
198
 * hdfs://server::port/path, then connect to the
199
 * specified cluster
200
 */
201
hdfsFS HdfsFile::connectToPath(const char* uri) {
202
  const char proto[] = "hdfs://";
203
 
204
  if (strncmp(proto, uri, strlen(proto)) != 0) {
205
    // uri doesn't start with hdfs:// -> use default:0, which is special
206
    // to libhdfs.
207
    return hdfsConnectNewInstance("default", 0);
208
  }
209
 
210
  // Skip the hdfs:// part.
211
  uri += strlen(proto);
212
  // Find the next colon.
213
  const char* colon = strchr(uri, ':');
214
  // No ':' or ':' is the last character.
215
  if (!colon || !colon[1]) {
216
    LOG_OPER("[hdfs] Missing port specification: \"%s\"", uri);
217
    return NULL;
218
  }
219
 
220
  char* endptr = NULL;
221
  const long port = strtol(colon + 1, &endptr, 10);
222
  if (port < 0) {
223
    LOG_OPER("[hdfs] Invalid port specification (negative): \"%s\"", uri);
224
    return NULL;
225
  } else if (port > std::numeric_limits<tPort>::max()) {
226
    LOG_OPER("[hdfs] Invalid port specification (out of range): \"%s\"", uri);
227
    return NULL;
228
  }
229
 
230
  char* const host = (char*) malloc(colon - uri + 1);
231
  memcpy((char*) host, uri, colon - uri);
232
  host[colon - uri] = '\0';
233
 
234
  LOG_OPER("[hdfs] Before hdfsConnectNewInstance(%s, %li)", host, port);
235
  hdfsFS fs = hdfsConnectNewInstance(host, port);
236
  LOG_OPER("[hdfs] After hdfsConnectNewInstance");
237
  free(host);
238
  return fs;
239
}