Skip to content

Instantly share code, notes, and snippets.

@ngrebenshikov
Last active August 13, 2019 01:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ngrebenshikov/5b7c8943f7988c5cc19741832ef12620 to your computer and use it in GitHub Desktop.
Save ngrebenshikov/5b7c8943f7988c5cc19741832ef12620 to your computer and use it in GitHub Desktop.
Very simple chunk uploading on Haxe (here is the only idea, the code is not compilable)
package ;
class Server {
public static var dbConnection: Connection;
static function main() {
haxe.Log.trace = serverTrace;
dbConnection = Mysql.connect( {
host: "127.0.0.1",
port : 3306,
database : Configuration.instance.getDatabaseName(),
user : "",
pass : "",
socket : null
});
dbConnection.request("SET NAMES 'utf8';");
sys.db.Manager.cnx = dbConnection;
sys.db.Manager.initialize();
var params = Request.getParams();
requestService();
sys.db.Manager.cleanup();
dbConnection.close();
}
static function requestService() {
var context = new Context();
context.addObject("StorageService", new StorageService());
if (HttpConnection.handleRequest(context)) return;
php.Lib.print("Nice to meet you!");
}
static function dumpError(s: String) {
Sys.stderr().writeString("[Server][" + Date.now().toString() + "] ");
Sys.stderr().writeString(s);
Sys.stderr().flush();
}
public static function serverTrace(v: Dynamic, ?pos: haxe.PosInfos) {
dumpError(pos.fileName + ":" + Std.string(pos.lineNumber) +
": " + Std.string(v) + "\n");
}
private static function reportInternalServerError(e: Dynamic) {
trace(e);
trace(CallStack.toString(CallStack.exceptionStack()));
php.Web.setReturnCode(500);
}
}
package storage;
import flash.events.Event;
import flash.events.EventDispatcher;
import haxe.io.Bytes;
enum ChunkState {
none;
sending;
sent;
}
// EventDispatched features code can be implemented via https://haxetink.github.io/tink_core/#/types/signal
class Uploader extends EventDispatcher {
@:final public static var PROGRESS_CHANGED: String = "storage.Uploader.PROGRESS_CHANGED";
@:final public static var COMPLETE: String = "storage.Uploader.COMPLETE";
public var data(default, null): Bytes;
public var chunkSize(default, null): Int;
public var maxParallelRequests(default, null): Int;
public var name(default, null): String;
public var responseData(default, null): String;
public var progress(get, null): Int;
private function get_progress(): Int {
return Std.int(Lambda.count(chunks, function(c) { return c == ChunkState.sent; }) * 100 / chunks.length);
}
private var chunks: Array<ChunkState>;
public function new(data: Bytes, name: String = null, chunkSize: Int = 1 << 18, maxParallelRequests: Int = 5) {
super();
this.data = data;
this.chunkSize = chunkSize;
this.maxParallelRequests = maxParallelRequests;
this.name = if (name != null) name else haxe.crypto.Md5.encode(Std.string(Date.now().getTime()) + Std.random(1000000));
}
public function start() {
progress = 0;
prepareChunks();
update();
}
private function prepareChunks() {
var size: Int = Std.int(data.length / chunkSize);
if (data.length % chunkSize > 0) size += 1;
chunks = [for (i in 0...size) ChunkState.none];
}
private function uploadChunk(chunkIndex: Int) {
chunks[chunkIndex] = ChunkState.sending;
StorageServiceClient.instance.uploadChunk(
{
name: name,
index: chunkIndex,
count: chunks.length,
data: data.sub(chunkIndex*chunkSize, if (chunkIndex < chunks.length - 1) chunkSize else (data.length % chunkSize))
},
function(data) {
chunks[chunkIndex] = ChunkState.sent;
responseData = data;
update();
}
);
}
private function update() {
if (progress > 0) dispatchEvent(new Event(PROGRESS_CHANGED));
if (progress >= 100) {
dispatchEvent(new Event(COMPLETE));
} else {
haxe.Timer.delay(function() {
var sending = function() { return Lambda.count(chunks, function(c) { return c == ChunkState.sending; }); };
var none = function() { return Lambda.count(chunks, function(c) { return c == ChunkState.none; }); };
while (none() > 0 && sending() < maxParallelRequests) {
uploadChunk(getChunkIndexToUpload());
}
}, 10);
}
}
private inline function getChunkIndexToUpload() {
return Lambda.indexOf(chunks, ChunkState.none);
}
}
package storage;
class StorageService {
public function new() { }
public function uploadChunk(chunk: Chunk): String {
return new ServerUploader().uploadChunk(chunk);
}
}
package storage;
import utils.Runner;
import view.MessageBox;
import view.WaitingPopup;
import haxe.io.Bytes;
import haxe.Timer;
import auth.Session;
import haxe.remoting.HttpAsyncConnection;
import haxe.Serializer;
import haxe.Unserializer;
import strings.Strings;
import view.WaitingPopup;
import storage.FileInfo;
class StorageServiceClient {
private var url(default, set): String;
private function set_url(v: String): String {
url = v;
context = haxe.remoting.HttpAsyncConnection.urlConnect(v);
return v;
}
private function prepareUrl(): String { return Configuration.remotingUrl; }
private static var _instance: StorageServiceClient;
public static var instance(get, null): StorageServiceClient;
private static function get_instance(): StorageServiceClient {
if (null == _instance) _instance = new StorageServiceClient();
return _instance;
}
private var context: HttpAsyncConnection;
public function new() {
var me = this;
url = prepareUrl();
}
public function upload(data: Bytes, name: String = null): Uploader {
var u: Uploader = new Uploader(data, name);
Timer.delay(function() { u.start(); }, 100);
return u;
}
public function uploadChunk(chunk: Chunk, after: String -> Void) {
context.StorageService.uploadChunk.call([chunk], function(e) {
after(e);
});
}
}
package storage;
import sys.io.FileOutput;
import haxe.io.Bytes;
class Uploader {
public function getStorageFolder() {
return php.Web.getCwd() + "/../storage/upload";
}
public function getStorageUrl() {
return "/storage/upload";
}
public function getStorageTempFolder() {
return getStorageFolder() + "/tmp";
}
public function new() {}
public function uploadChunk(chunk: Chunk): String {
saveChunkToTemp(chunk);
if (getNumOfSavedChunks(chunk.name) >= chunk.count) {
mergeChunks(chunk.name, chunk.count);
removeChunks(chunk.name);
}
return getStorageUrl() + "/" + chunk.name;
}
private function saveChunkToTemp(chunk: Chunk) {
sys.io.File.saveBytes(getStorageTempFolder() + "/" + chunk.name + "-" + Std.string(chunk.index), chunk.data);
}
private function loadChunkDataFromTemp(name: String, index: Int): Bytes {
return sys.io.File.getBytes(getStorageTempFolder() + "/" + name + "-" + Std.string(index));
}
private function mergeChunks(name: String, length: Int) {
var out: FileOutput = sys.io.File.append(getStorageFolder() + "/" + name, true);
for (i in 0...length) {
out.write(loadChunkDataFromTemp(name, i));
}
out.close();
}
private function getNumOfSavedChunks(name: String) {
return Lambda.count(
sys.FileSystem.readDirectory(getStorageTempFolder()),
function(f: String) { return f.indexOf(name) == 0; });
}
private function removeChunks(name: String) {
var chunks = Lambda.filter(
sys.FileSystem.readDirectory(getStorageTempFolder()),
function(f: String) { return f.indexOf(name) == 0; });
for (c in chunks) {
sys.FileSystem.deleteFile(getStorageTempFolder() + "/" + c);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment