Created
December 1, 2013 04:41
-
-
Save enakai00/7728581 to your computer and use it in GitHub Desktop.
gfapi env module for LevelDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
// Use of this source code is governed by a BSD-style license that can be | |
// found in the LICENSE file. See the AUTHORS file for names of contributors. | |
#include <deque> | |
#include <set> | |
#include <dirent.h> | |
#include <errno.h> | |
#include <fcntl.h> | |
#include <pthread.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <sys/mman.h> | |
#include <sys/stat.h> | |
#include <sys/time.h> | |
#include <sys/types.h> | |
#include <time.h> | |
#include <unistd.h> | |
#if defined(LEVELDB_PLATFORM_ANDROID) | |
#include <sys/stat.h> | |
#endif | |
#include "leveldb/env.h" | |
#include "leveldb/slice.h" | |
#include "port/port.h" | |
#include "util/logging.h" | |
#include "util/mutexlock.h" | |
#include "util/posix_logger.h" | |
extern "C" { | |
#include <glusterfs/api/glfs.h> | |
} | |
namespace leveldb { | |
namespace { | |
static Status IOError(const std::string& context, int err_number) { | |
return Status::IOError(context, strerror(err_number)); | |
} | |
class GfapiSequentialFile: public SequentialFile { | |
private: | |
std::string filename_; | |
glfs_fd_t* fd_; | |
public: | |
GfapiSequentialFile(const std::string& fname, glfs_fd_t* fd) | |
: filename_(fname), fd_(fd) { } | |
virtual ~GfapiSequentialFile() { glfs_close(fd_); } | |
virtual Status Read(size_t n, Slice* result, char* scratch) { | |
// Original: | |
// size_t r = fread_unlocked(scratch, 1, n, file_); | |
ssize_t r = glfs_read(fd_, scratch, n, 0); | |
*result = Slice(scratch, r); | |
if (r < 0) | |
return IOError(filename_, errno); | |
// TODO: I'm not sure how I can check EOF with glfs_read(). | |
// The following is the original code using fread_unlocked(). | |
/* | |
if (r < n) { | |
if (feof(file_)) { | |
// We leave status as ok if we hit the end of the file | |
} else { | |
// A partial read with an error: return a non-ok status | |
s = IOError(filename_, errno); | |
} | |
} | |
*/ | |
return Status::OK(); | |
} | |
virtual Status Skip(uint64_t n) { | |
if (glfs_lseek(fd_, static_cast<off_t>(n), SEEK_CUR) < 0) | |
return IOError(filename_, errno); | |
return Status::OK(); | |
} | |
}; | |
// pread() based random-access | |
class GfapiRandomAccessFile: public RandomAccessFile { | |
private: | |
std::string filename_; | |
glfs_fd_t* fd_; | |
public: | |
GfapiRandomAccessFile(const std::string& fname, glfs_fd_t* fd) | |
: filename_(fname), fd_(fd) { } | |
virtual ~GfapiRandomAccessFile() { glfs_close(fd_); } | |
virtual Status Read(uint64_t offset, size_t n, Slice* result, | |
char* scratch) const { | |
ssize_t r = glfs_pread(fd_, scratch, n, static_cast<off_t>(offset), 0); | |
*result = Slice(scratch, (r < 0) ? 0 : r); | |
if (r < 0) { | |
// An error: return a non-ok status | |
return IOError(filename_, errno); | |
} | |
return Status::OK(); | |
} | |
}; | |
class GfapiWritableFile: public WritableFile { | |
private: | |
std::string filename_; | |
glfs_fd_t* fd_; | |
public: | |
GfapiWritableFile(const std::string& fname, glfs_fd_t* fd) | |
: filename_(fname), fd_(fd) { } | |
virtual ~GfapiWritableFile() { | |
GfapiWritableFile::Close(); | |
} | |
virtual Status Append(const Slice& data) { | |
const char* src = data.data(); | |
size_t left = data.size(); | |
size_t ret = glfs_write(fd_, src, left, 0); | |
if (ret < 0) | |
return IOError(filename_, errno); | |
return Status::OK(); | |
} | |
virtual Status Close() { | |
if (fd_ != NULL) { | |
int ret = glfs_close(fd_); | |
fd_ = NULL; | |
if (ret < 0) | |
return IOError(filename_, errno); | |
} | |
return Status::OK(); | |
} | |
virtual Status Flush() { | |
return Status::OK(); | |
} | |
virtual Status Sync() { | |
if (glfs_fsync(fd_) < 0) | |
return IOError(filename_, errno); | |
return Status::OK(); | |
} | |
}; | |
//static int LockOrUnlock(int fd, bool lock) { | |
static int LockOrUnlock(glfs_fd_t* fd, bool lock) { | |
errno = 0; | |
struct flock f; | |
memset(&f, 0, sizeof(f)); | |
f.l_type = (lock ? F_WRLCK : F_UNLCK); | |
f.l_whence = SEEK_SET; | |
f.l_start = 0; | |
f.l_len = 0; // Lock/unlock entire file | |
return glfs_posix_lock(fd, F_SETLK, &f); | |
} | |
class GfapiFileLock : public FileLock { | |
public: | |
glfs_fd_t* fd_; | |
std::string name_; | |
}; | |
// Set of locked files. We keep a separate set instead of just | |
// relying on fcntrl(F_SETLK) since fcntl(F_SETLK) does not provide | |
// any protection against multiple uses from the same process. | |
class GfapiLockTable { | |
private: | |
port::Mutex mu_; | |
std::set<std::string> locked_files_; | |
public: | |
bool Insert(const std::string& fname) { | |
MutexLock l(&mu_); | |
return locked_files_.insert(fname).second; | |
} | |
void Remove(const std::string& fname) { | |
MutexLock l(&mu_); | |
locked_files_.erase(fname); | |
} | |
}; | |
class GfapiEnv : public Env { | |
private: | |
glfs_t* fs_; | |
public: | |
GfapiEnv(); | |
virtual ~GfapiEnv() { | |
fprintf(stderr, "Destroying Env::Default()\n"); | |
exit(1); | |
} | |
virtual Status NewSequentialFile(const std::string& fname, | |
SequentialFile** result) { | |
glfs_fd_t* fd; | |
fd = glfs_open(fs_, fname.c_str(), O_RDONLY); | |
if (fd == NULL) { | |
*result = NULL; | |
return IOError(fname, errno); | |
} else { | |
*result = new GfapiSequentialFile(fname, fd); | |
return Status::OK(); | |
} | |
} | |
virtual Status NewRandomAccessFile(const std::string& fname, | |
RandomAccessFile** result) { | |
*result = NULL; | |
glfs_fd_t* fd; | |
fd = glfs_open(fs_, fname.c_str(), O_RDONLY); | |
if (fd == NULL) { | |
*result = NULL; | |
return IOError(fname, errno); | |
} else { | |
*result = new GfapiRandomAccessFile(fname, fd); | |
return Status::OK(); | |
} | |
} | |
virtual Status NewWritableFile(const std::string& fname, | |
WritableFile** result) { | |
glfs_fd_t* fd; | |
if (GfapiEnv::FileExists(fname)) | |
GfapiEnv::DeleteFile(fname); | |
fd = glfs_creat(fs_, fname.c_str(), O_RDWR | O_APPEND, 0644); | |
if (fd == NULL) { | |
*result = NULL; | |
} else { | |
*result = new GfapiWritableFile(fname, fd); | |
return Status::OK(); | |
} | |
return IOError(fname, errno); | |
} | |
virtual bool FileExists(const std::string& fname) { | |
return glfs_access(fs_, fname.c_str(), F_OK) == 0; | |
} | |
virtual Status GetChildren(const std::string& dir, | |
std::vector<std::string>* result) { | |
result->clear(); | |
glfs_fd_t* d = glfs_opendir(fs_, dir.c_str()); | |
if (d == NULL) | |
return IOError(dir, errno); | |
struct dirent entry; | |
struct dirent* res; | |
while (true) { | |
int ret = glfs_readdir_r(d, &entry, &res); | |
if (ret) | |
return Status::IOError(dir.c_str(), "failed to read dentry"); | |
if (res == NULL) | |
break; | |
result->push_back(entry.d_name); | |
} | |
glfs_closedir(d); | |
return Status::OK(); | |
} | |
virtual Status DeleteFile(const std::string& fname) { | |
if (glfs_unlink(fs_, fname.c_str()) != 0) | |
return IOError(fname, errno); | |
return Status::OK(); | |
} | |
virtual Status CreateDir(const std::string& name) { | |
if (glfs_mkdir(fs_, name.c_str(), 0755) != 0) | |
return IOError(name, errno); | |
return Status::OK(); | |
} | |
virtual Status DeleteDir(const std::string& name) { | |
if (glfs_rmdir(fs_, name.c_str()) != 0) | |
return IOError(name, errno); | |
return Status::OK(); | |
} | |
virtual Status GetFileSize(const std::string& fname, uint64_t* size) { | |
struct stat sbuf; | |
if (glfs_stat(fs_, fname.c_str(), &sbuf) != 0) { | |
*size = 0; | |
return IOError(fname, errno); | |
} else { | |
*size = sbuf.st_size; | |
} | |
return Status::OK(); | |
} | |
virtual Status RenameFile(const std::string& src, const std::string& target) { | |
if (glfs_rename(fs_, src.c_str(), target.c_str()) != 0) | |
return IOError(src, errno); | |
return Status::OK(); | |
} | |
virtual Status LockFile(const std::string& fname, FileLock** lock) { | |
*lock = NULL; | |
glfs_fd_t* fd; | |
if (FileExists(fname)) { | |
fd = glfs_open(fs_, fname.c_str(), O_RDWR); | |
} else { | |
fd = glfs_creat(fs_, fname.c_str(), O_RDWR, 0644); | |
} | |
if (fd == NULL) { | |
return IOError(fname, errno); | |
} else if (!locks_.Insert(fname)) { | |
glfs_close(fd); | |
return Status::IOError("lock " + fname, "already held by process"); | |
} else if (LockOrUnlock(fd, true) == -1) { | |
glfs_close(fd); | |
locks_.Remove(fname); | |
return IOError("lock " + fname, errno); | |
} else { | |
GfapiFileLock* my_lock = new GfapiFileLock; | |
my_lock->fd_ = fd; | |
my_lock->name_ = fname; | |
*lock = my_lock; | |
} | |
return Status::OK(); | |
} | |
virtual Status UnlockFile(FileLock* lock) { | |
GfapiFileLock* my_lock = reinterpret_cast<GfapiFileLock*>(lock); | |
if (LockOrUnlock(my_lock->fd_, false) == -1) { | |
return IOError("unlock", errno); | |
} | |
locks_.Remove(my_lock->name_); | |
glfs_close(my_lock->fd_); | |
delete my_lock; | |
return Status::OK(); | |
} | |
virtual void Schedule(void (*function)(void*), void* arg); | |
virtual void StartThread(void (*function)(void* arg), void* arg); | |
virtual Status GetTestDirectory(std::string* result) { | |
const char* env = getenv("TEST_TMPDIR"); | |
if (env && env[0] != '\0') { | |
*result = env; | |
} else { | |
char buf[100]; | |
snprintf(buf, sizeof(buf), "/tmp/leveldbtest-%d", int(geteuid())); | |
*result = buf; | |
} | |
// Directory may already exist | |
CreateDir(*result); | |
return Status::OK(); | |
} | |
static uint64_t gettid() { | |
pthread_t tid = pthread_self(); | |
uint64_t thread_id = 0; | |
memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); | |
return thread_id; | |
} | |
virtual Status NewLogger(const std::string& fname, Logger** result) { | |
FILE* f = fopen(fname.c_str(), "w"); | |
if (f == NULL) { | |
*result = NULL; | |
return IOError(fname, errno); | |
} else { | |
*result = new PosixLogger(f, &GfapiEnv::gettid); | |
return Status::OK(); | |
} | |
} | |
virtual uint64_t NowMicros() { | |
struct timeval tv; | |
gettimeofday(&tv, NULL); | |
return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec; | |
} | |
virtual void SleepForMicroseconds(int micros) { | |
usleep(micros); | |
} | |
private: | |
void PthreadCall(const char* label, int result) { | |
if (result != 0) { | |
fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); | |
exit(1); | |
} | |
} | |
// BGThread() is the body of the background thread | |
void BGThread(); | |
static void* BGThreadWrapper(void* arg) { | |
reinterpret_cast<GfapiEnv*>(arg)->BGThread(); | |
return NULL; | |
} | |
size_t page_size_; | |
pthread_mutex_t mu_; | |
pthread_cond_t bgsignal_; | |
pthread_t bgthread_; | |
bool started_bgthread_; | |
// Entry per Schedule() call | |
struct BGItem { void* arg; void (*function)(void*); }; | |
typedef std::deque<BGItem> BGQueue; | |
BGQueue queue_; | |
GfapiLockTable locks_; | |
// MmapLimiter mmap_limit_; | |
}; | |
GfapiEnv::GfapiEnv() : page_size_(getpagesize()), | |
started_bgthread_(false) { | |
const char* gfvol = getenv("GF_VOLUME"); | |
const char* gfserver = getenv("GF_SERVER"); | |
int ret; | |
if (gfvol == NULL || gfserver == NULL) { | |
fprintf(stderr, "Need to set environmental variables GF_SERVER and GF_VOLUME\n"); | |
exit(-1); | |
} | |
fs_ = glfs_new(gfvol); | |
glfs_set_volfile_server(fs_, "tcp", gfserver, 24007); | |
ret = glfs_init(fs_); | |
if (ret) { | |
fprintf(stderr, "Failed to connect GlusterFS server/volume: %s/%s\n", gfserver, gfvol); | |
exit(ret); | |
} | |
PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL)); | |
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL)); | |
} | |
void GfapiEnv::Schedule(void (*function)(void*), void* arg) { | |
PthreadCall("lock", pthread_mutex_lock(&mu_)); | |
// Start background thread if necessary | |
if (!started_bgthread_) { | |
started_bgthread_ = true; | |
PthreadCall( | |
"create thread", | |
pthread_create(&bgthread_, NULL, &GfapiEnv::BGThreadWrapper, this)); | |
} | |
// If the queue is currently empty, the background thread may currently be | |
// waiting. | |
if (queue_.empty()) { | |
PthreadCall("signal", pthread_cond_signal(&bgsignal_)); | |
} | |
// Add to priority queue | |
queue_.push_back(BGItem()); | |
queue_.back().function = function; | |
queue_.back().arg = arg; | |
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); | |
} | |
void GfapiEnv::BGThread() { | |
while (true) { | |
// Wait until there is an item that is ready to run | |
PthreadCall("lock", pthread_mutex_lock(&mu_)); | |
while (queue_.empty()) { | |
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); | |
} | |
void (*function)(void*) = queue_.front().function; | |
void* arg = queue_.front().arg; | |
queue_.pop_front(); | |
PthreadCall("unlock", pthread_mutex_unlock(&mu_)); | |
(*function)(arg); | |
} | |
} | |
namespace { | |
struct StartThreadState { | |
void (*user_function)(void*); | |
void* arg; | |
}; | |
} | |
static void* StartThreadWrapper(void* arg) { | |
StartThreadState* state = reinterpret_cast<StartThreadState*>(arg); | |
state->user_function(state->arg); | |
delete state; | |
return NULL; | |
} | |
void GfapiEnv::StartThread(void (*function)(void* arg), void* arg) { | |
pthread_t t; | |
StartThreadState* state = new StartThreadState; | |
state->user_function = function; | |
state->arg = arg; | |
PthreadCall("start thread", | |
pthread_create(&t, NULL, &StartThreadWrapper, state)); | |
} | |
} // namespace | |
static pthread_once_t once = PTHREAD_ONCE_INIT; | |
static Env* default_env; | |
static void InitDefaultEnv() { default_env = new GfapiEnv; } | |
Env* Env::Default() { | |
pthread_once(&once, InitDefaultEnv); | |
return default_env; | |
} | |
} // namespace leveldb |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment