Skip to content

Instantly share code, notes, and snippets.

@edouarda
Last active May 3, 2023 11:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save edouarda/79ef7ba0be9a45ebac4d4063ea4ab028 to your computer and use it in GitHub Desktop.
Save edouarda/79ef7ba0be9a45ebac4d4063ea4ab028 to your computer and use it in GitHub Desktop.
Better Win32 port for RocksDB - as used in Quasar (https://www.quasar.ai)
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#if defined(OS_WIN)
#include "port/win/port_win.h"
#include <assert.h>
#include <io.h>
#include <rpc.h>
#include <stdio.h>
#include <string.h>
#include <chrono>
#include <cstdlib>
#include <exception>
#include <memory>
#include "port/port_dirent.h"
#include "port/sys_time.h"
#ifdef ROCKSDB_WINDOWS_UTF8_FILENAMES
// utf8 <-> utf16
#include <string>
#include <locale>
#include <codecvt>
#endif
#include "logging/logging.h"
namespace ROCKSDB_NAMESPACE {
extern const bool kDefaultToAdaptiveMutex = false;
namespace port {
#ifdef ROCKSDB_WINDOWS_UTF8_FILENAMES
std::string utf16_to_utf8(const std::wstring& utf16) {
std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>, wchar_t> convert;
return convert.to_bytes(utf16);
}
std::wstring utf8_to_utf16(const std::string& utf8) {
std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>> converter;
return converter.from_bytes(utf8);
}
#endif
void gettimeofday(struct timeval* tv, struct timezone* /* tz */) {
using namespace std::chrono;
microseconds usNow(
duration_cast<microseconds>(system_clock::now().time_since_epoch()));
seconds secNow(duration_cast<seconds>(usNow));
tv->tv_sec = static_cast<long>(secNow.count());
tv->tv_usec = static_cast<long>(usNow.count() -
duration_cast<microseconds>(secNow).count());
}
Mutex::Mutex(bool adaptive) { ::InitializeCriticalSection(&section_); }
Mutex::~Mutex() { ::DeleteCriticalSection(&section_); }
void Mutex::Lock() {
::EnterCriticalSection(&section_);
#ifndef NDEBUG
locked_ = true;
#endif
}
void Mutex::Unlock() {
#ifndef NDEBUG
locked_ = false;
#endif
::LeaveCriticalSection(&section_);
}
bool Mutex::TryLock() {
bool ret = ::TryEnterCriticalSection(&section_);
#ifndef NDEBUG
if (ret) {
locked_ = true;
}
#endif
return ret;
}
void Mutex::AssertHeld() {
#ifndef NDEBUG
assert(locked_);
#endif
}
CondVar::CondVar(Mutex* mu) : mu_(mu) { ::InitializeConditionVariable(&cv_); }
CondVar::~CondVar() {}
void CondVar::Wait() {
#ifndef NDEBUG
mu_->locked_ = false;
#endif
::SleepConditionVariableCS(&cv_, &(mu_->section_), INFINITE);
#ifndef NDEBUG
mu_->locked_ = true;
#endif
}
bool CondVar::TimedWait(uint64_t abs_time_us) {
using namespace std::chrono;
// MSVC++ library implements wait_until in terms of wait_for so
// we need to convert absolute wait into relative wait.
microseconds usAbsTime(abs_time_us);
microseconds usNow(
duration_cast<microseconds>(system_clock::now().time_since_epoch()));
microseconds relTimeUs =
(usAbsTime > usNow) ? (usAbsTime - usNow) : microseconds::zero();
const BOOL cvStatus = ::SleepConditionVariableCS(
&cv_, &(mu_->section_),
static_cast<DWORD>(duration_cast<milliseconds>(relTimeUs).count()));
#ifndef NDEBUG
mu_->locked_ = true;
#endif
if ((!cvStatus) && (GetLastError() == ERROR_TIMEOUT)) {
return true;
}
return false;
}
void CondVar::Signal() { ::WakeConditionVariable(&cv_); }
void CondVar::SignalAll() { WakeAllConditionVariable(&cv_); }
int PhysicalCoreID() { return GetCurrentProcessorNumber(); }
void InitOnce(OnceType* once, void (*initializer)()) {
std::call_once(once->flag_, initializer);
}
// Private structure, exposed only by pointer
struct DIR {
HANDLE handle_;
bool firstread_;
RX_WIN32_FIND_DATA data_;
dirent entry_;
DIR() : handle_(INVALID_HANDLE_VALUE),
firstread_(true) {}
DIR(const DIR&) = delete;
DIR& operator=(const DIR&) = delete;
~DIR() {
if (INVALID_HANDLE_VALUE != handle_) {
::FindClose(handle_);
}
}
};
DIR* opendir(const char* name) {
if (!name || *name == 0) {
errno = ENOENT;
return nullptr;
}
std::string pattern(name);
pattern.append("\\").append("*");
std::unique_ptr<DIR> dir(new DIR);
dir->handle_ =
RX_FindFirstFileEx(RX_FN(pattern).c_str(),
FindExInfoBasic, // Do not want alternative name
&dir->data_, FindExSearchNameMatch,
NULL, // lpSearchFilter
0);
if (dir->handle_ == INVALID_HANDLE_VALUE) {
return nullptr;
}
RX_FILESTRING x(dir->data_.cFileName, RX_FNLEN(dir->data_.cFileName));
strcpy_s(dir->entry_.d_name, sizeof(dir->entry_.d_name), FN_TO_RX(x).c_str());
return dir.release();
}
struct dirent* readdir(DIR* dirp) {
if (!dirp || dirp->handle_ == INVALID_HANDLE_VALUE) {
errno = EBADF;
return nullptr;
}
if (dirp->firstread_) {
dirp->firstread_ = false;
return &dirp->entry_;
}
auto ret = RX_FindNextFile(dirp->handle_, &dirp->data_);
if (ret == 0) {
return nullptr;
}
RX_FILESTRING x(dirp->data_.cFileName, RX_FNLEN(dirp->data_.cFileName));
strcpy_s(dirp->entry_.d_name, sizeof(dirp->entry_.d_name),
FN_TO_RX(x).c_str());
return &dirp->entry_;
}
int closedir(DIR* dirp) {
delete dirp;
return 0;
}
int truncate(const char* path, int64_t length) {
if (path == nullptr) {
errno = EFAULT;
return -1;
}
return ROCKSDB_NAMESPACE::port::Truncate(path, length);
}
int Truncate(std::string path, int64_t len) {
if (len < 0) {
errno = EINVAL;
return -1;
}
HANDLE hFile =
RX_CreateFile(RX_FN(path).c_str(), GENERIC_READ | GENERIC_WRITE,
FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
NULL, // Security attrs
OPEN_EXISTING, // Truncate existing file only
FILE_ATTRIBUTE_NORMAL, NULL);
if (INVALID_HANDLE_VALUE == hFile) {
auto lastError = GetLastError();
if (lastError == ERROR_FILE_NOT_FOUND) {
errno = ENOENT;
} else if (lastError == ERROR_ACCESS_DENIED) {
errno = EACCES;
} else {
errno = EIO;
}
return -1;
}
int result = 0;
FILE_END_OF_FILE_INFO end_of_file;
end_of_file.EndOfFile.QuadPart = len;
if (!SetFileInformationByHandle(hFile, FileEndOfFileInfo, &end_of_file,
sizeof(FILE_END_OF_FILE_INFO))) {
errno = EIO;
result = -1;
}
CloseHandle(hFile);
return result;
}
void Crash(const std::string& srcfile, int srcline) {
fprintf(stdout, "Crashing at %s:%d\n", srcfile.c_str(), srcline);
fflush(stdout);
abort();
}
int GetMaxOpenFiles() { return -1; }
// Assume 4KB page size
const size_t kPageSize = 4U * 1024U;
void SetCpuPriority(ThreadId id, CpuPriority priority) {
// Not supported
(void)id;
(void)priority;
}
int64_t GetProcessID() { return GetCurrentProcessId(); }
bool GenerateRfcUuid(std::string* output) {
UUID uuid;
UuidCreateSequential(&uuid);
RPC_CSTR rpc_str;
auto status = UuidToStringA(&uuid, &rpc_str);
if (status != RPC_S_OK) {
return false;
}
// rpc_str is nul-terminated
*output = reinterpret_cast<char*>(rpc_str);
status = RpcStringFreeA(&rpc_str);
assert(status == RPC_S_OK);
return true;
}
} // namespace port
} // namespace ROCKSDB_NAMESPACE
#endif
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// See port_example.h for documentation for the following types/functions.
#pragma once
// Always want minimum headers
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#include <windows.h>
#include <string>
#include <thread>
#include <string.h>
#include <mutex>
#include <limits>
#include <condition_variable>
#include <malloc.h>
#include <intrin.h>
#include <process.h>
#include <stdint.h>
#include "port/win/win_thread.h"
#include "rocksdb/options.h"
#undef min
#undef max
#undef DeleteFile
#undef GetCurrentTime
#ifndef strcasecmp
#define strcasecmp _stricmp
#endif
#undef GetCurrentTime
#undef DeleteFile
#ifndef _SSIZE_T_DEFINED
using ssize_t = SSIZE_T;
#endif
// size_t printf formatting named in the manner of C99 standard formatting
// strings such as PRIu64
// in fact, we could use that one
#ifndef ROCKSDB_PRIszt
#define ROCKSDB_PRIszt "Iu"
#endif
#ifdef _MSC_VER
#define __attribute__(A)
// Thread local storage on Linux
// There is thread_local in C++11
#ifndef __thread
#define __thread __declspec(thread)
#endif
#endif
namespace ROCKSDB_NAMESPACE {
#define PREFETCH(addr, rw, locality)
extern const bool kDefaultToAdaptiveMutex;
namespace port {
// VS < 2015
#if defined(_MSC_VER) && (_MSC_VER < 1900)
// VS 15 has snprintf
#define snprintf _snprintf
#define ROCKSDB_NOEXCEPT
// std::numeric_limits<size_t>::max() is not constexpr just yet
// therefore, use the same limits
// For use at db/file_indexer.h kLevelMaxIndex
const uint32_t kMaxUint32 = UINT32_MAX;
const int kMaxInt32 = INT32_MAX;
const int kMinInt32 = INT32_MIN;
const int64_t kMaxInt64 = INT64_MAX;
const int64_t kMinInt64 = INT64_MIN;
const uint64_t kMaxUint64 = UINT64_MAX;
#ifdef _WIN64
const size_t kMaxSizet = UINT64_MAX;
#else
const size_t kMaxSizet = UINT_MAX;
#endif
#else // VS >= 2015 or MinGW
#define ROCKSDB_NOEXCEPT noexcept
// For use at db/file_indexer.h kLevelMaxIndex
const uint32_t kMaxUint32 = std::numeric_limits<uint32_t>::max();
const int kMaxInt32 = std::numeric_limits<int>::max();
const int kMinInt32 = std::numeric_limits<int>::min();
const uint64_t kMaxUint64 = std::numeric_limits<uint64_t>::max();
const int64_t kMaxInt64 = std::numeric_limits<int64_t>::max();
const int64_t kMinInt64 = std::numeric_limits<int64_t>::min();
const size_t kMaxSizet = std::numeric_limits<size_t>::max();
#endif //_MSC_VER
// "Windows is designed to run on little-endian computer architectures."
// https://docs.microsoft.com/en-us/windows/win32/sysinfo/registry-value-types
constexpr bool kLittleEndian = true;
#undef PLATFORM_IS_LITTLE_ENDIAN
class CondVar;
class Mutex {
public:
/* implicit */ Mutex(bool adaptive = kDefaultToAdaptiveMutex);
~Mutex();
void Lock();
void Unlock();
bool TryLock();
// this will assert if the mutex is not locked
// it does NOT verify that mutex is held by a calling thread
void AssertHeld();
// Mutex is move only with lock ownership transfer
Mutex(const Mutex&) = delete;
void operator=(const Mutex&) = delete;
private:
friend class CondVar;
CRITICAL_SECTION section_;
#ifndef NDEBUG
bool locked_;
#endif
};
class RWMutex {
public:
RWMutex() { InitializeSRWLock(&srwLock_); }
// No copying allowed
RWMutex(const RWMutex&) = delete;
void operator=(const RWMutex&) = delete;
void ReadLock() { AcquireSRWLockShared(&srwLock_); }
void WriteLock() { AcquireSRWLockExclusive(&srwLock_); }
void ReadUnlock() { ReleaseSRWLockShared(&srwLock_); }
void WriteUnlock() { ReleaseSRWLockExclusive(&srwLock_); }
// Empty as in POSIX
void AssertHeld() {}
private:
SRWLOCK srwLock_;
};
class CondVar {
public:
explicit CondVar(Mutex* mu);
~CondVar();
void Wait();
bool TimedWait(uint64_t expiration_time);
void Signal();
void SignalAll();
// Condition var is not copy/move constructible
CondVar(const CondVar&) = delete;
CondVar& operator=(const CondVar&) = delete;
CondVar(CondVar&&) = delete;
CondVar& operator=(CondVar&&) = delete;
private:
CONDITION_VARIABLE cv_;
Mutex* mu_;
};
#ifdef _POSIX_THREADS
using Thread = std::thread;
#else
// Wrapper around the platform efficient
// or otherwise preferrable implementation
using Thread = WindowsThread;
#endif
// OnceInit type helps emulate
// Posix semantics with initialization
// adopted in the project
struct OnceType {
struct Init {};
OnceType() {}
OnceType(const Init&) {}
OnceType(const OnceType&) = delete;
OnceType& operator=(const OnceType&) = delete;
std::once_flag flag_;
};
#define LEVELDB_ONCE_INIT port::OnceType::Init()
extern void InitOnce(OnceType* once, void (*initializer)());
#ifndef CACHE_LINE_SIZE
#define CACHE_LINE_SIZE 64U
#endif
#ifdef ROCKSDB_JEMALLOC
// Separate inlines so they can be replaced if needed
void* jemalloc_aligned_alloc(size_t size, size_t alignment) ROCKSDB_NOEXCEPT;
void jemalloc_aligned_free(void* p) ROCKSDB_NOEXCEPT;
#endif
inline void *cacheline_aligned_alloc(size_t size) {
#ifdef ROCKSDB_JEMALLOC
return jemalloc_aligned_alloc(size, CACHE_LINE_SIZE);
#else
return _aligned_malloc(size, CACHE_LINE_SIZE);
#endif
}
inline void cacheline_aligned_free(void *memblock) {
#ifdef ROCKSDB_JEMALLOC
jemalloc_aligned_free(memblock);
#else
_aligned_free(memblock);
#endif
}
extern const size_t kPageSize;
// https://gcc.gnu.org/bugzilla/show_bug.cgi?id=52991 for MINGW32
// could not be worked around with by -mno-ms-bitfields
#ifndef __MINGW32__
#define ALIGN_AS(n) __declspec(align(n))
#else
#define ALIGN_AS(n)
#endif
static inline void AsmVolatilePause() {
#if defined(_M_IX86) || defined(_M_X64) || defined(_M_ARM64) || defined(_M_ARM)
YieldProcessor();
#endif
// it would be nice to get "wfe" on ARM here
}
extern int PhysicalCoreID();
// For Thread Local Storage abstraction
using pthread_key_t = DWORD;
inline int pthread_key_create(pthread_key_t* key, void (*destructor)(void*)) {
// Not used
(void)destructor;
pthread_key_t k = TlsAlloc();
if (TLS_OUT_OF_INDEXES == k) {
return ENOMEM;
}
*key = k;
return 0;
}
inline int pthread_key_delete(pthread_key_t key) {
if (!TlsFree(key)) {
return EINVAL;
}
return 0;
}
inline int pthread_setspecific(pthread_key_t key, const void* value) {
if (!TlsSetValue(key, const_cast<void*>(value))) {
return ENOMEM;
}
return 0;
}
inline void* pthread_getspecific(pthread_key_t key) {
void* result = TlsGetValue(key);
if (!result) {
if (GetLastError() != ERROR_SUCCESS) {
errno = EINVAL;
} else {
errno = NOERROR;
}
}
return result;
}
// UNIX equiv although errno numbers will be off
// using C-runtime to implement. Note, this does not
// feel space with zeros in case the file is extended.
int truncate(const char* path, int64_t length);
int Truncate(std::string path, int64_t length);
void Crash(const std::string& srcfile, int srcline);
extern int GetMaxOpenFiles();
std::string utf16_to_utf8(const std::wstring& utf16);
std::wstring utf8_to_utf16(const std::string& utf8);
using ThreadId = int;
extern void SetCpuPriority(ThreadId id, CpuPriority priority);
int64_t GetProcessID();
// Uses platform APIs to generate a 36-character RFC-4122 UUID. Returns
// true on success or false on failure.
bool GenerateRfcUuid(std::string* output);
} // namespace port
#ifdef ROCKSDB_WINDOWS_UTF8_FILENAMES
#define RX_FILESTRING std::wstring
#define RX_FN(a) ROCKSDB_NAMESPACE::port::utf8_to_utf16(a)
#define FN_TO_RX(a) ROCKSDB_NAMESPACE::port::utf16_to_utf8(a)
#define RX_FNCMP(a, b) ::wcscmp(a, RX_FN(b).c_str())
#define RX_FNLEN(a) ::wcslen(a)
#define RX_DeleteFile DeleteFileW
#define RX_CreateFile CreateFileW
#define RX_CreateFileMapping CreateFileMappingW
#define RX_GetFileAttributesEx GetFileAttributesExW
#define RX_FindFirstFileEx FindFirstFileExW
#define RX_FindNextFile FindNextFileW
#define RX_WIN32_FIND_DATA WIN32_FIND_DATAW
#define RX_CreateDirectory CreateDirectoryW
#define RX_RemoveDirectory RemoveDirectoryW
#define RX_GetFileAttributesEx GetFileAttributesExW
#define RX_MoveFileEx MoveFileExW
#define RX_CreateHardLink CreateHardLinkW
#define RX_PathIsRelative PathIsRelativeW
#define RX_GetCurrentDirectory GetCurrentDirectoryW
#define RX_GetDiskFreeSpaceEx GetDiskFreeSpaceExW
#define RX_PathIsDirectory PathIsDirectoryW
#else
#define RX_FILESTRING std::string
#define RX_FN(a) a
#define FN_TO_RX(a) a
#define RX_FNCMP(a, b) strcmp(a, b)
#define RX_FNLEN(a) strlen(a)
#define RX_DeleteFile DeleteFileA
#define RX_CreateFile CreateFileA
#define RX_CreateFileMapping CreateFileMappingA
#define RX_GetFileAttributesEx GetFileAttributesExA
#define RX_FindFirstFileEx FindFirstFileExA
#define RX_CreateDirectory CreateDirectoryA
#define RX_FindNextFile FindNextFileA
#define RX_WIN32_FIND_DATA WIN32_FIND_DATAA
#define RX_CreateDirectory CreateDirectoryA
#define RX_RemoveDirectory RemoveDirectoryA
#define RX_GetFileAttributesEx GetFileAttributesExA
#define RX_MoveFileEx MoveFileExA
#define RX_CreateHardLink CreateHardLinkA
#define RX_PathIsRelative PathIsRelativeA
#define RX_GetCurrentDirectory GetCurrentDirectoryA
#define RX_GetDiskFreeSpaceEx GetDiskFreeSpaceExA
#define RX_PathIsDirectory PathIsDirectoryA
#endif
using port::pthread_key_t;
using port::pthread_key_create;
using port::pthread_key_delete;
using port::pthread_setspecific;
using port::pthread_getspecific;
using port::truncate;
} // namespace ROCKSDB_NAMESPACE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment