Skip to content

Instantly share code, notes, and snippets.

@renatoaraujoc
Last active January 15, 2024 16:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save renatoaraujoc/571feb5272825c690aec52574b25a149 to your computer and use it in GitHub Desktop.
Save renatoaraujoc/571feb5272825c690aec52574b25a149 to your computer and use it in GitHub Desktop.
GCP GCS FileSystem - Emulates the Local File System
export class DirectoryNotFoundException extends Error {
code = 'ENOENT';
constructor(directoryName: string, parentDirectoryPath: string) {
super(
`Directory '${directoryName}' at parent path '${parentDirectoryPath}' does not exist`
);
Object.setPrototypeOf(this, DirectoryNotFoundException.prototype);
}
}
export class DirectoryNotEmptyException extends Error {
code = 'ENOTEMPTY';
constructor(directoryName: string, parentDirectoryPath: string) {
super(
`Directory '${directoryName}' at parent path '${parentDirectoryPath}' is not empty`
);
Object.setPrototypeOf(this, DirectoryNotEmptyException.prototype);
}
}
export class FileAlreadyExistsException extends Error {
code = 'EEXIST';
constructor(fileName: string, parentDirectoryPath: string) {
super(
`File '${fileName}' at path '${parentDirectoryPath}' already exists, if you want to overwrite it, set the 'overwrite' option to true`
);
Object.setPrototypeOf(this, FileAlreadyExistsException.prototype);
}
}
import { FileSystemService } from './FileSystemService';
import { GCPGCSFileSystem } from './GCPGCSFileSystem';
import { LocalFileSystem } from './LocalFileSystem';
import { StorageOptions } from '@google-cloud/storage';
import { ConfigurableModuleBuilder, Module } from '@nestjs/common';
/**
* This configuration object dictates which instance of
* ApiFileSystemService will be used.
*/
type ApiFileSystemConfig = {
type:
| {
value: 'LOCAL';
}
| {
value: 'GCP_GCS';
bucketName: string;
storageOptions?: StorageOptions;
};
};
export const { ConfigurableModuleClass, MODULE_OPTIONS_TOKEN } =
new ConfigurableModuleBuilder<ApiFileSystemConfig>().build();
@Module({
providers: [
{
provide: FileSystemService,
useFactory: async (options: ApiFileSystemConfig) => {
/**
* Local fileSystem, uses the local storage.
*/
if (options.type.value === 'LOCAL') {
// Still being re-worked!!
return new LocalFileSystem();
}
/**
* GCP's Google Cloud Storage fileSystem.
*/
if (options.type.value === 'GCP_GCS') {
const fs = new GCPGCSFileSystem(
options.type.bucketName,
options.type.storageOptions
);
// Sanity check to see if the bucket exists, else we throw an error
if (!(await fs.isBucketExists())) {
throw new Error(
`FileSystem [GCP_GCS] bucket '${options.type.bucketName}' does not exist!`
);
}
return fs;
}
throw new Error(`Could not resolve the fileSystem!`);
},
inject: [MODULE_OPTIONS_TOKEN]
}
],
exports: [FileSystemService]
})
export class FileSystemModule extends ConfigurableModuleClass {}
import { FileSystemReadDirResult } from './types';
export abstract class FileSystemService {
/**
* Writes data to a file at the specified path.
*
* @param {string} path - The path of the file to write.
* @param {Buffer} data - The data to write to the file.
* @param {{overwrite: boolean; createDirsRecursivelyIfNotExists: boolean;}} [options] - Optional parameters.
* - overwrite (boolean): If set to true, overwrites the file if it already exists. Defaults to false.
* - createDirsRecursivelyIfNotExists (boolean): If set to true, creates directories recursively if they don't exist.
* Defaults to false.
* @returns {Promise<void>} - A promise that resolves when the file is successfully written.
* @throws {FileAlreadyExistsException} - If the file already exists and the overwrite option is set to false.
* @throws {DirectoryNotFoundException} - If the parent directory of the file does not exist and the
* createDirsRecursivelyIfNotExists option is set to false.
*/
abstract writeFile(
path: string,
data: Buffer,
options?: {
overwrite?: boolean;
createDirsRecursivelyIfNotExists?: boolean;
}
): Promise<void>;
abstract writeFile(
path: string,
data: string | number | boolean | Record<string, unknown>
): Promise<void>;
/**
* Create a directory on the filesystem
*
* @param path The target path where to create the directory
*/
abstract createDir(path: string): Promise<void>;
/**
* Read a file from the filesystem
*
* @param path The path to read from
* @param as The type to read as
*/
abstract readFile(path: string, as: 'buffer'): Promise<Buffer>;
abstract readFile(path: string, as: 'string'): Promise<string>;
abstract readFile<T>(path: string, as: 'json'): Promise<T>;
/**
* Reads the contents of a directory.
*
* @param {string} path - The path of the directory to read.
* @param {object} [options] - The options for reading the directory.
* @param {number | 'ALL'} [options.includeDirectories] - Determines whether to include directories in the result. Possible values are a number or 'ALL'.
* Use a number to specify the maxium number of directories to include, or use 'ALL' to include all directories.
* @param {number | 'ALL'} [options.includeFiles] - Determines whether to include files in the result. Possible values are a number or 'ALL'.
* Use a number to specify the maximum files to include, or use 'ALL' to include all files.
*
* @return {Promise<FileSystemReadDirResult>} A Promise that resolves with the contents of the directory.
*/
abstract readDir(
path: string,
options?: {
includeDirectories?: number | 'ALL';
includeFiles?: number | 'ALL';
}
): Promise<FileSystemReadDirResult>;
/**
* Checks if a file exists at the given path.
*
* @param {string} path - The path to the file.
* @returns {Promise<boolean>} - Returns a Promise that resolves to a boolean value indicating whether the file exists or not.
*/
abstract fileExists(path: string): Promise<boolean>;
/**
* Checks if a directory exists at the given path.
*
* @param {string} path - The path to the directory.
* @returns {Promise<boolean>} - Returns a Promise that resolves to a boolean value indicating whether the directory exists or not.
*/
abstract dirExists(path: string): Promise<boolean>;
/**
* Remove a file from the filesystem
*
* @param path The path to remove
*/
abstract rmFile(path: string): Promise<void>;
/**
* Remove a directory from the filesystem. If the directory is not empty, it will throw an error unless
* the force option is set to true. Deleting a directory it means that everyting inside it will be deleted,
* including the directory itself.
*
* @param path The dirPath to remove
* @param force If true, will remove the directory even if it's not empty, default is false
*/
abstract rmDir(path: string, force?: boolean): Promise<void>;
}
import { FileSystemService } from './FileSystemService';
import {
GetFilesResponse,
Storage,
StorageOptions
} from '@google-cloud/storage';
import {
DirectoryNotEmptyException,
DirectoryNotFoundException,
FileAlreadyExistsException,
FileSystemReadDirResult
} from '@rcambiental/api/shared-services/filesystem';
import * as path from 'path';
import { EMPTY, expand, from, lastValueFrom, map, of, scan, skip } from 'rxjs';
import * as stream from 'stream';
export class GCPGCSFileSystem extends FileSystemService {
private static MAX_GCS_RESULTS_PER_REQUEST = 1000;
private storage: Storage;
constructor(
public bucketName: string,
storageOptions?: StorageOptions
) {
super();
this.storage = new Storage(storageOptions);
}
/**
* Returns a boolean indicating if the bucket exists.
*/
async isBucketExists() {
const [exists] = await this.storage.bucket(this.bucketName).exists();
return exists;
}
override async dirExists(path: string): Promise<boolean> {
path = this.__fixDirPath(path);
// Empty path here means we're checking if the root directory exists, it always does
if (path === '') {
return true;
}
const bucket = this.storage.bucket(this.bucketName);
const file = bucket.file(path);
return (await file.exists())[0];
}
override async fileExists(path: string): Promise<boolean> {
// Fixes the path, removing the leading slash
path = this.__fixPath(path);
// Check if the path doesn't end with a trailing slash, else throw an error
if (path.endsWith('/')) {
throw new Error(
`GCPFileSystem.fileExists: path '${path}' shouldn't end with a '/' as this would represent a directory!`
);
}
const bucket = this.storage.bucket(this.bucketName);
const file = bucket.file(path);
return (await file.exists())[0];
}
override async readFile(path: string, as: 'buffer'): Promise<Buffer>;
override async readFile(path: string, as: 'string'): Promise<string>;
override async readFile<T>(path: string, as: 'json'): Promise<T>;
override async readFile<T>(
path: string,
as: 'buffer' | 'string' | 'json'
): Promise<string | Buffer | T> {
// Fixes the path, removing the leading slash
path = this.__fixPath(path);
// Throw if the file path is invalid
this.__throwIfFilePathInvalid(path);
if (!(await this.fileExists(path))) {
throw new Error(
`GCPFileSystem.readFile: file '${path}' does not exist!`
);
}
const bucket = this.storage.bucket(this.bucketName);
const file = bucket.file(path);
const stream = file.createReadStream();
const chunks: any[] = [];
// eslint-disable-next-line no-restricted-syntax
for await (const chunk of stream) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
if (as === 'string') {
return buffer.toString();
}
if (as === 'json') {
return JSON.parse(buffer.toString()) as T;
}
return buffer as Buffer;
}
override async rmDir(dirPath: string, force?: boolean): Promise<void> {
const forceDelete = force ?? false;
dirPath = this.__fixDirPath(dirPath);
if (dirPath === '') {
throw new Error(
'GCPFileSystem.rmDir: cannot delete root directory!'
);
}
const { dir, base } = path.parse(dirPath);
if (!(await this.dirExists(dirPath))) {
throw new DirectoryNotFoundException(base, dir);
}
const files = await this.__readFilesOrPrefixesFromDirPath(
'FILES',
dirPath,
1
);
const prefixes = await this.__readFilesOrPrefixesFromDirPath(
'DIRECTORIES',
dirPath,
1
);
if (!forceDelete && (files.length > 0 || prefixes.length > 0)) {
throw new DirectoryNotEmptyException(base, dir);
}
const bucket = this.storage.bucket(this.bucketName);
// Delete all files
await bucket.deleteFiles({
prefix: dirPath
});
}
override async rmFile(path: string): Promise<void> {
// Fixes the path, removing the leading slash
path = this.__fixPath(path);
// Throw if the file path is invalid
this.__throwIfFilePathInvalid(path);
const bucket = this.storage.bucket(this.bucketName);
try {
await bucket.file(path).delete();
} catch (e) {
// do nothing, we don't care if the object exists or not.
}
}
override async writeFile(
filePath: string,
data: Buffer | string | number | boolean | Record<string, unknown>,
options?: {
overwrite?: boolean;
createDirsRecursivelyIfNotExists?: boolean;
}
): Promise<void> {
const overwrite = options?.overwrite ?? false;
const createDirsRecursivelyIfNotExists =
options?.createDirsRecursivelyIfNotExists ?? false;
// We should never have a leading slash
filePath = this.__fixPath(filePath);
// Throw if the file path is invalid
this.__throwIfFilePathInvalid(filePath);
const { base, dir } = path.parse(filePath);
// Check if the file exists and the overwrite flag is false, if so, throw an error
if (!overwrite && (await this.fileExists(filePath))) {
throw new FileAlreadyExistsException(base, dir);
}
// Check if the directory exists, if not, create it if the option is set to true, else throw an error
if (!(await this.dirExists(dir))) {
if (createDirsRecursivelyIfNotExists) {
await this.createDir(dir);
} else {
throw new DirectoryNotFoundException(
path.basename(dir),
path.dirname(dir)
);
}
}
const bucket = this.storage.bucket(this.bucketName);
const file = bucket.file(filePath);
const writeStream = file.createWriteStream();
if (typeof data === 'object' && !Buffer.isBuffer(data)) {
data = JSON.stringify(data, null, 4);
}
const passthroughStream = new stream.PassThrough();
passthroughStream.write(data);
passthroughStream.end();
return new Promise((resolve, reject) => {
passthroughStream
.pipe(writeStream)
.on('error', (error) => reject(error))
.on('finish', () => resolve());
});
}
override async createDir(dirPath: string): Promise<void> {
const argPath = dirPath;
// We should never have a leading slash
dirPath = this.__fixPath(dirPath);
// Error on empty paths
if (dirPath === '') {
throw new Error(`GCPFileSystem.createDir: path cannot be empty!`);
}
// We should add a trailing slash if the path is not empty and doesn't end with a slash
if (dirPath.length > 0 && !dirPath.endsWith('/')) {
dirPath += '/';
}
// We split all folders to create them one by one
const folders = dirPath
.split('/')
.filter((folder) => {
return folder.trim().length > 0;
})
.map((folder, index, array) => {
return index > 0
? array.slice(0, index).join('/') + `/${folder}/`
: `${folder}/`;
});
const bucket = this.storage.bucket(this.bucketName);
try {
for (const folder of folders) {
const folderAsFile = bucket.file(folder);
const [folderExists] = await folderAsFile.exists();
if (folderExists) {
console.log(
`GCPFileSystem.writeFile['${argPath}']: folder '${folder}' already exists!`
);
continue;
}
await folderAsFile.save('');
console.log(
`GCPFileSystem.writeFile['${argPath}']: folder '${folder}' created!`
);
}
} catch (e) {
console.error(
`GCPFileSystem.writeFile['${argPath}']: error creating folder '${argPath}', error: ${
(e as Error).message
}`
);
}
}
override async readDir(
dirPath: string,
options?: {
includeDirectories?: number | 'ALL';
includeFiles?: number | 'ALL';
}
): Promise<FileSystemReadDirResult> {
const includeDirectories = options?.includeDirectories ?? 'ALL';
const includeFiles = options?.includeFiles ?? 'ALL';
// We should never have a leading slash
dirPath = this.__fixPath(dirPath);
// If we want to read the contents of a directory, we need to add a trailing slash
if (dirPath.length > 0 && !dirPath.endsWith('/')) {
dirPath += '/';
}
/**
* When dirPath is '', means we are at the root level and this always exist,
* so we'll only throw an error if we're not at the root level.
*/
if (dirPath !== '' && !(await this.dirExists(dirPath))) {
const { dir, base } = path.parse(dirPath);
throw new DirectoryNotFoundException(base, dir);
}
const files =
includeFiles === 'ALL' || includeFiles > 0
? await this.__readFilesOrPrefixesFromDirPath(
'FILES',
dirPath,
includeFiles
)
: [];
const directories =
includeDirectories === 'ALL' || includeDirectories > 0
? await this.__readFilesOrPrefixesFromDirPath(
'DIRECTORIES',
dirPath,
includeDirectories
)
: [];
return {
files,
directories
};
}
private async __readFilesOrPrefixesFromDirPath<
T extends 'FILES' | 'DIRECTORIES'
>(
type: T,
dirPath: string,
maxResults: 'ALL' | number
): Promise<
T extends 'FILES'
? FileSystemReadDirResult['files']
: FileSystemReadDirResult['directories']
> {
dirPath = this.__fixDirPath(dirPath);
const bucket = this.storage.bucket(this.bucketName);
let fetchedResultsCount = 0;
const prefixes$ = of(null).pipe(
expand((result: GetFilesResponse | null) => {
return !result ||
// Or, if we're on the second iteration or above and we still have results to fetch, given the maxResults condition
(((typeof maxResults === 'number' &&
fetchedResultsCount < maxResults) ||
maxResults === 'ALL') &&
result[1])
? from(
bucket.getFiles({
prefix: dirPath,
delimiter: '/',
matchGlob: type === 'FILES' ? '**?' : '**/',
maxResults:
maxResults === 'ALL'
? GCPGCSFileSystem.MAX_GCS_RESULTS_PER_REQUEST
: Math.min(
GCPGCSFileSystem.MAX_GCS_RESULTS_PER_REQUEST,
maxResults +
(type === 'DIRECTORIES' &&
!result
? 1
: 0) -
fetchedResultsCount
),
autoPaginate: false,
pageToken: (result?.[1] as any)?.['pageToken']
})
).pipe(
/**
* If it's the first request, we have to strip out the prefix object itself from the results.
* We also increment the fetchedResultsCount variable with the length of the results.
*/
map((res) => {
fetchedResultsCount +=
(res[2] as any)?.[
type === 'FILES' ? 'items' : 'prefixes'
]?.length ?? 0;
return result
? // If we're on the second result or above, just return the unmodified res
res
: /**
* If we're on the first result, strip out the dirPath from the list of prefixes
* by modifying the response object at the prefixes key.
**/
([
res[0],
res[1],
type === 'DIRECTORIES'
? {
...(res[2] as any),
prefixes: (res[2] as any)[
'prefixes'
].filter(
(prefix: string) =>
prefix !== dirPath
)
}
: res[2]
] as GetFilesResponse);
})
)
: EMPTY;
}),
// We have to skip(1) here because expand emits the first value, which is null, dumb shit :D
skip(1),
map(([, , response]) => {
return type === 'FILES'
? this.__parseFilesFromGetFilesResponse(dirPath, response)
: this.__parsePrefixesFromGetFilesResponse(
dirPath,
response
);
}),
scan(
(acc, value) => acc.concat(value),
[] as (
| FileSystemReadDirResult['files'][number]
| FileSystemReadDirResult['directories'][number]
)[]
)
);
// Convert the observable back into a promise
// noinspection ES6MissingAwait
return lastValueFrom(prefixes$) as Promise<
T extends 'FILES'
? FileSystemReadDirResult['files']
: FileSystemReadDirResult['directories']
>;
}
/**
* Using the response object from bucket.getFiles function (third value of the array), we return the prefixes (directories) parsed, if any. *
* The prefixes in the response are the directories, they come with a trailing slash.
* We need to do some special treatment for them, so we remove the trailing slash to mimics the behavior
* with the local file system. When trying to read a directory, the trailing slash
* is included again, automatically, making it all work just like a local file system.
*/
private __parsePrefixesFromGetFilesResponse(
dirPath: string,
response: any
): FileSystemReadDirResult['directories'] {
return ((response?.['prefixes'] as any[]) ?? []).map((prefix) => {
const prefixWithoutTrailingSlash = prefix.endsWith('/')
? prefix.slice(0, -1)
: prefix;
return {
name: prefixWithoutTrailingSlash.replace(dirPath, ''),
fullPath: `/${prefixWithoutTrailingSlash}`
};
});
}
/**
* Using the response object from bucket.getFiles function (third value of the array), we return the files parsed, if any.
*/
private __parseFilesFromGetFilesResponse(
dirPath: string,
response: any
): FileSystemReadDirResult['files'] {
return ((response?.['items'] as any[]) ?? [])
.filter(
(file) =>
!['.DS_Store'].includes(file.basename) &&
file.name !== dirPath
)
.map((file) => {
const parsedFile = path.parse(file.name);
return {
fullPath: `/${dirPath}${parsedFile.base}`,
path: `/${dirPath.slice(0, -1)}`,
basename: parsedFile.base,
filename: parsedFile.name,
extension:
parsedFile.ext.length > 0 &&
parsedFile.ext.startsWith('.')
? parsedFile.ext.slice(1)
: parsedFile.ext,
size: Number.parseInt(file.size, 10),
contentType: file?.contentType ?? null
};
});
}
/**
* Fix the given path by removing the leading slash if it exists, GCS does not support paths starting with a leading slash.
*/
private __fixPath(path: string) {
return path.length > 0 && path.startsWith('/')
? path.slice(1).trim()
: path.trim();
}
/**
* Fixes a dirPath, basically it calls for __fixPath and then includes a trailing slash if it doesn't exist.
*/
private __fixDirPath(path: string) {
// Fixes the path, removing the leading slash
path = this.__fixPath(path);
// Adds a trailing slash if it doesn't exist
return path.length > 0 && !path.endsWith('/') ? `${path}/` : path;
}
/**
* Throws an error if the given file path is invalid.
* Filenames from path should be valid:
* 1. test
* 2. text.extension
* 2. .extension
* Are the only acceptable formats.
*/
private __throwIfFilePathInvalid(filePath: string) {
// Error on empty paths
if (filePath === '') {
throw new Error(`GCPFileSystem: path cannot be empty!`);
}
const { dir, base } = path.parse(filePath);
// Error on paths that are just a slash or a dot or ends with a slash
if (
// is a simple dot at root level, its not acceptable
filePath === '.' ||
// is a directory, its not acceptable
filePath.endsWith('/') ||
// path is like 'dawd/adwd/.', a single dot after a directory, its not acceptable
/[a-zA-Z0-9_/\-. ]+\/\.$/.test(filePath) ||
// path is like 'dawd/adwd/..{2,}[any_string]', multiple dots after a directory (with optional string after), its not acceptable
/[a-zA-Z0-9_/\-. ]+\/[.]{2,}[a-zA-Z0-9\-_ ]*$/.test(filePath) ||
// if we have a directory but no basename or basename is empty, its not acceptable
(dir.length > 0 && base.length === 0) ||
base === ''
) {
throw new Error(
`GCPFileSystem: file path '${filePath}' is invalid!`
);
}
}
}
import { Module } from '@nestjs/common';
import { ApiConfigModule, ApiConfigService } from '@rcambiental/api/config';
import { FileSystemModule } from '@rcambiental/api/shared-services/filesystem';
@Module({
imports: [
FileSystemModule.registerAsync({
imports: [ApiConfigModule],
useFactory: async (apiConfigService: ApiConfigService) => {
const { fileSystem } = apiConfigService;
if (fileSystem.type === 'LOCAL') {
if (apiConfigService.isStageProduction()) {
throw new Error(
'FileSystem[LOCAL] is not allowed in production!'
);
}
return {
type: {
value: 'LOCAL'
}
};
}
if (fileSystem.type === 'GCP_GCS') {
return {
type: {
value: 'GCP_GCS',
bucketName: fileSystem.bucketName,
storageOptions: {
apiEndpoint: fileSystem.emulatorEndpoint
? `http://${fileSystem.emulatorEndpoint}`
: undefined
}
}
} as const;
}
throw new Error(`Invalid fileSystem ${fileSystem}!`);
},
inject: [ApiConfigService]
})
],
exports: [FileSystemModule]
})
export class ApiFileSystemModule {}
export type FileSystemFile = {
// Path to the file excluding file itself (basename = filename + extension)
path: string;
// Full path which is the path to file plus the file itself (basename = filename + extension)
fullPath: string;
// Filename + extension
basename: string;
// Filename
filename: string;
// Extension
extension: string;
// size
size: number;
// MimeType, if available
contentType: string | null;
};
export type FileSystemDirectory = {
// directory name, i.e: 'def'
name: string;
// Full path to the directory, i.e: '/abc/def', will always contain a trailing slash
fullPath: string;
};
/**
* Result of reading a directory.
* The result of `false` means that the directory does not exist.
*/
export type FileSystemReadDirResult = {
files: FileSystemFile[];
directories: FileSystemDirectory[];
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment