Skip to content

Instantly share code, notes, and snippets.

@codemartial
Created November 15, 2016 10:37
Show Gist options
  • Save codemartial/20f8101f4cd4ac1ce67cd7f90cc10c6d to your computer and use it in GitHub Desktop.
Save codemartial/20f8101f4cd4ac1ce67cd7f90cc10c6d to your computer and use it in GitHub Desktop.
A distributed filesystem prototype
package main
import (
"bytes"
"fmt"
"sync"
"strings"
)
// Stores the filesystem hierarchy, data node members and their capacity
type MetaServer struct {
dataNodes map[int]int // NodeID: Capacity
dataNodeInstances map[int]*DataNode // NodeID: DataNode Instance Mapping
files *FSNode // Filesystem Hierarchy
dnMu sync.RWMutex // Data Node Mutex
fsMu sync.RWMutex // File System Mutex
}
func NewMetaServer() *MetaServer {
return &MetaServer{
map[int]int{},
map[int]*DataNode{},
NewDirectory(),
sync.RWMutex{},
sync.RWMutex{},
}
}
func (meta *MetaServer) AddNode(nodeID, capacity int) bool {
meta.dnMu.Lock()
defer meta.dnMu.Unlock()
if _, ok := meta.dataNodes[nodeID]; ok {
return false
}
meta.dataNodes[nodeID] = capacity
meta.dataNodeInstances[nodeID] = NewDataNode(nodeID)
return true
}
func (meta *MetaServer) GetNode(nodeID int) (*DataNode, bool) {
meta.dnMu.RLock()
defer meta.dnMu.RUnlock()
node, ok := meta.dataNodeInstances[nodeID]
return node, ok
}
// Return the node with the maximum capacity
func (meta *MetaServer) GetFreeNode() (*DataNode, bool) {
meta.dnMu.RLock()
defer meta.dnMu.RUnlock()
var nodeID, capacity int
for _nodeID, _capacity := range(meta.dataNodes) {
if _capacity > capacity {
capacity = _capacity
nodeID = _nodeID
}
}
if capacity <= 0 {
return nil, false
}
node, ok := meta.dataNodeInstances[nodeID]
return node, ok
}
func (meta *MetaServer) ValidateNode(nodeID int) bool {
if _, ok := meta.dataNodes[nodeID]; !ok {
return false
}
if meta.dataNodes[nodeID] <= 0 { // Out of capacity
return false
}
return true
}
// Add a file into the directory listing
func (meta *MetaServer) RegisterFile(path string, nodeID int) bool {
meta.dnMu.RLock()
if valid := meta.ValidateNode(nodeID); !valid {
meta.dnMu.RUnlock()
return false // Data Node non-existent or out of capacity
}
meta.dnMu.RUnlock()
components := strings.Split(path, "/")
fileName := components[len(components)-1]
if len(components) == 0 || fileName == "" {
return false // Invalid path
}
meta.dnMu.Lock()
defer meta.dnMu.Unlock()
if valid := meta.ValidateNode(nodeID); !valid {
return false // Re-validating after aquiring write lock
}
meta.fsMu.Lock()
defer meta.fsMu.Unlock()
var node *FSNode
// Set up the top-level directory
node, ok := meta.files.Find(components[0])
if !ok {
if added := meta.files.AddChild(components[0], NewDirectory()); !added {
return false // Error creating path component
}
node,_ = meta.files.Find(components[0])
if node == nil {
fmt.Println("Newly created directory not retrieved")
return false
}
}
// Navigate File system hierarchy
for _,dirname := range(components[1:len(components)-1]) {
if !node.IsDirectory() {
return false // Path terminated in a file
}
if _, ok := node.Find(dirname); !ok {
if added := node.AddChild(dirname, NewDirectory()); !added {
return false // Error creating path component
}
}
node, _ = node.Find(dirname)
}
if _, ok := node.Find(fileName); ok {
return false // Already exists
}
// Register File
if added := node.AddChild(fileName, NewFile(nodeID)); !added {
return false // Couldn't add file to the hierarchy
}
meta.dataNodes[nodeID] -= 1
return true
}
func (meta *MetaServer) ListPath(path string) (*FSNode, bool) {
components := strings.Split(path, "/")
if len(components) == 0 {
return nil, false
}
node, ok := meta.files.Find(components[0])
if !ok {
return nil, false // File not found
}
node,_ = meta.files.Find(components[0])
for _,dirname := range(components[1:]) {
if !node.IsDirectory() {
return nil, false // Path terminated in a file
}
node, ok = node.Find(dirname)
if !ok {
return nil, false // File not found
}
}
return node, true
}
// Filesystem node: is either a directory or a file
type FSNode struct {
listing map[string]*FSNode
dataNodeID int
}
func NewFile(nodeID int) *FSNode {
return &FSNode{nil, nodeID}
}
func NewDirectory() *FSNode {
return &FSNode{map[string]*FSNode{}, -1}
}
func (node *FSNode) ListDir() (map[string]*FSNode, bool) {
if node.IsDirectory() {
return node.listing, true
}
return nil, false
}
func (node *FSNode) GetStorageNodeID() (int, bool) {
if node.IsDirectory() {
return 0, false
}
return node.dataNodeID, true
}
func (node *FSNode) Find(name string) (*FSNode, bool) {
if !node.IsDirectory() {
return nil, false
}
child, ok := node.listing[name]
return child, ok
}
func (node *FSNode) IsDirectory() bool {
return node.listing != nil
}
func (node *FSNode) AddChild(name string, ch *FSNode) bool {
if !node.IsDirectory() {
return false
}
if _, ok := node.Find(name); ok {
return false
}
node.listing[name] = ch
return true
}
// Each data node stores contents of a file in full
type DataNode struct {
ID int // Node ID
files map[string]bytes.Buffer // File path: contents mapping
mu sync.RWMutex
}
func NewDataNode(nodeID int) *DataNode {
return &DataNode{nodeID, map[string]bytes.Buffer{}, sync.RWMutex{}}
}
func (dn *DataNode) StoreFile(path string, contents bytes.Buffer) bool {
dn.mu.Lock()
defer dn.mu.Unlock()
if _, ok := dn.files[path]; ok {
return false
}
dn.files[path] = contents
return true
}
func (dn *DataNode) FetchFile(path string) (bytes.Buffer, bool) {
dn.mu.RLock()
defer dn.mu.RUnlock()
contents, ok := dn.files[path]
return contents, ok
}
func (dn *DataNode) DeleteFile(path string) bool {
dn.mu.Lock()
defer dn.mu.Unlock()
if _, ok:= dn.files[path]; !ok {
return false
}
delete(dn.files, path)
return true
}
// Client interface for creating, fetching and listing files
type Client struct {
meta *MetaServer
}
func NewClient(meta *MetaServer) *Client {
return &Client{meta}
}
func (c *Client) Put(path string, contents bytes.Buffer) bool {
dn, ok := c.meta.GetFreeNode()
if !ok {
return false
}
if stored := dn.StoreFile(path, contents); !stored {
return false
} else {
fmt.Println("Client.Put: (",path,") File stored on data node ", dn.ID)
}
if registered := c.meta.RegisterFile(path, dn.ID); registered {
return true
}
dn.DeleteFile(path)
return false
}
func (c *Client) ListPath(path string) (*FSNode, bool) {
listing, ok := c.meta.ListPath(path)
return listing, ok
}
func (c *Client) GetFile(path string) (bytes.Buffer, bool) {
file, ok := c.ListPath(path)
if !ok {
fmt.Println("Client.GetFile: (",path,") File not found")
return bytes.Buffer{}, false
}
if file.IsDirectory() {
fmt.Println("Client.GetFile: (",path,") Found directory instead of file")
return bytes.Buffer{}, false
}
nodeID, ok := file.GetStorageNodeID()
if !ok {
fmt.Println("Client.GetFile: (",path,") Invalid file location")
return bytes.Buffer{}, false
}
dn, ok := c.meta.GetNode(nodeID)
if !ok {
fmt.Println("Client.GetFile: (",path,") Couldn't access data node")
return bytes.Buffer{}, false
}
if contents, ok := dn.FetchFile(path); !ok {
fmt.Println("Client.GetFile: (",path,") Couldn't fetch file data")
return bytes.Buffer{}, false
} else {
return contents, ok
}
}
func bootstrapServer() *MetaServer {
meta := NewMetaServer()
meta.AddNode(1, 2)
meta.AddNode(2, 2)
meta.AddNode(3, 2)
return meta
}
func main() {
server := bootstrapServer()
fmt.Println("Server up")
c1 := NewClient(server)
c2 := NewClient(server)
c3 := NewClient(server)
fmt.Println("Clients up")
done := make(chan struct{}, 2)
go func() {
if !c1.Put("/frob/test", *bytes.NewBufferString("Test")) {
fmt.Println("Failed to create file with contents: ","Test")
}
if !c1.Put("/foo/bar.txt", *bytes.NewBufferString("Hello")) {
fmt.Println("Failed to create file with contents: ","Hello")
}
v:= struct{}{}
done <- v
}()
go func() {
if !c2.Put("/foo/bar.txt", *bytes.NewBufferString("Hi")) {
fmt.Println("Failed to create file with contents: ","Hi")
}
if contents, ok := c2.GetFile("/frob/test"); ok {
fmt.Println("File contents: ", contents.String())
} else {
fmt.Println("File not found: /frob/test")
}
v := struct{}{}
done <- v
}()
for i:= 0; i < 2; i++ {
<- done
}
if contents, ok := c3.GetFile("/foo/bar.txt"); ok {
fmt.Println("File contents: ", contents.String())
} else {
fmt.Println("File not found: /foo/bar.txt")
}
if !c2.Put("/foo/baz.txt", *bytes.NewBufferString("World")) {
fmt.Println("Failed to create file with contents: ","Worls")
}
if contents, ok := c3.GetFile("/foo/baz.txt"); ok {
fmt.Println("File contents: ", contents.String())
} else {
fmt.Println("File not found: /foo/baz.txt")
}
if node, ok := c2.ListPath("/foo"); ok {
fmt.Println(node.ListDir())
} else {
fmt.Println("Path not found: /foo")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment