Rev 301 | Blame | Compare with Previous | Last modification | View Log | RSS feed
// Copyright (c) 2007-2008 Facebook//// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.//// See accompanying file LICENSE or visit the Scribe site at:// http://developers.facebook.com/scribe///// @author Bobby Johnson// @author Jason Sobel// @author Avinash Lakshman#include "common.h"#include "file.h"#include "HdfsFile.h"// INITIAL_BUFFER_SIZE must always be >= UINT_SIZE#define INITIAL_BUFFER_SIZE 4096#define UINT_SIZE 4using namespace std;using boost::shared_ptr;boost::shared_ptr<FileInterface> FileInterface::createFileInterface(const std::string& type,const std::string& name,bool framed) {if (0 == type.compare("std")) {return shared_ptr<FileInterface>(new StdFile(name, framed));} else if (0 == type.compare("hdfs")) {return shared_ptr<FileInterface>(new HdfsFile(name));} else {return shared_ptr<FileInterface>();}}std::vector<std::string> FileInterface::list(const std::string& path, const std::string &fsType) {std::vector<std::string> files;shared_ptr<FileInterface> concrete_file = createFileInterface(fsType, path);if (concrete_file) {concrete_file->listImpl(path, files);}return files;}FileInterface::FileInterface(const std::string& name, bool frame): framed(frame), filename(name) {}FileInterface::~FileInterface() {}StdFile::StdFile(const std::string& name, bool frame): FileInterface(name, frame), inputBuffer(NULL), bufferSize(0) {}StdFile::~StdFile() {if (inputBuffer) {delete[] inputBuffer;inputBuffer = NULL;}}bool StdFile::openRead() {return open(fstream::in);}bool StdFile::openWrite() {// open file for write in append modeios_base::openmode mode = fstream::out | fstream::app;return open(mode);}bool StdFile::openTruncate() {// open an existing file for write and truncate its contentsios_base::openmode mode = fstream::out | fstream::app | fstream::trunc;return open(mode);}bool StdFile::open(ios_base::openmode mode) {if (file.is_open()) {return false;}file.open(filename.c_str(), mode);return file.good();}bool StdFile::isOpen() {return file.is_open();}void StdFile::close() {if (file.is_open()) {file.close();}}string StdFile::getFrame(unsigned data_length) {if (framed) {char buf[UINT_SIZE];serializeUInt(data_length, buf);return string(buf, UINT_SIZE);} else {return string();}}bool StdFile::write(const std::string& data) {if (!file.is_open()) {return false;}file << data;if (file.bad()) {return false;}return true;}void StdFile::flush() {if (file.is_open()) {file.flush();}}bool StdFile::readNext(std::string& _return) {if (!inputBuffer) {bufferSize = INITIAL_BUFFER_SIZE;inputBuffer = new char[bufferSize];}if (framed) {unsigned size;file.read(inputBuffer, UINT_SIZE); // assumes INITIAL_BUFFER_SIZE > UINT_SIZEif (file.good() && (size = unserializeUInt(inputBuffer))) {// check if size is larger than half the max uint sizeif (size >= (((unsigned)1) << (UINT_SIZE*8 - 1))) {LOG_OPER("WARNING: attempting to read message of size %d bytes", size);// Do not try to make bufferSize any larger than this or you might overflowbufferSize = size;}while (size > bufferSize) {bufferSize = 2 * bufferSize;delete[] inputBuffer;inputBuffer = new char[bufferSize];}file.read(inputBuffer, size);if (file.good()) {_return.assign(inputBuffer, size);return true;} else {int offset = file.tellg();LOG_OPER("ERROR: Failed to read file %s at offset %d",filename.c_str(), offset);return false;}}} else {file.getline(inputBuffer, bufferSize);if (file.good()) {_return = inputBuffer;return true;}}return false;}unsigned long StdFile::fileSize() {unsigned long size = 0;try {size = boost::filesystem::file_size(filename.c_str());} catch(std::exception const& e) {LOG_OPER("Failed to get size for file <%s> error <%s>", filename.c_str(), e.what());size = 0;}return size;}void StdFile::listImpl(const std::string& path, std::vector<std::string>& _return) {try {if (boost::filesystem::exists(path)) {boost::filesystem::directory_iterator dir_iter(path), end_iter;for ( ; dir_iter != end_iter; ++dir_iter) {_return.push_back(dir_iter->filename());}}} catch (std::exception const& e) {LOG_OPER("exception <%s> listing files in <%s>",e.what(), path.c_str());}}void StdFile::deleteFile() {boost::filesystem::remove(filename);}bool StdFile::createDirectory(std::string path) {try {boost::filesystem::create_directories(path);} catch(std::exception const& e) {LOG_OPER("Exception < %s > in StdFile::createDirectory for path %s ",e.what(),path.c_str());return false;}return true;}bool StdFile::createSymlink(std::string oldpath, std::string newpath) {if (symlink(oldpath.c_str(), newpath.c_str()) == 0) {return true;}return false;}// Buffer had better be at least UINT_SIZE long!unsigned FileInterface::unserializeUInt(const char* buffer) {unsigned retval = 0;int i;for (i = 0; i < UINT_SIZE; ++i) {retval |= (unsigned char)buffer[i] << (8 * i);}return retval;}void FileInterface::serializeUInt(unsigned data, char* buffer) {int i;for (i = 0; i < UINT_SIZE; ++i) {buffer[i] = (unsigned char)((data >> (8 * i)) & 0xFF);}}