Created
December 21, 2016 15:01
-
-
Save firejox/4f1d1fd16adcad49b478601fd24b1867 to your computer and use it in GitHub Desktop.
async await toy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
lib Intrinsics | |
fun frameaddress = "llvm.frameaddress"(level : Int32) : Void* | |
end | |
module Async | |
enum Status | |
INCOMPLETE | |
COMPLETE | |
FAULT | |
end | |
module TaskInterface | |
abstract def value | |
abstract def status : Status | |
abstract def exception | |
abstract def proc | |
end | |
class Task(T) | |
include TaskInterface | |
getter value : T? | |
getter status : Status | |
getter exception : Exception? | |
getter proc : -> | |
def initialize | |
@proc = Proc(Void).new {} | |
@status = Status::INCOMPLETE | |
end | |
protected def value=(@value) | |
end | |
protected def status=(@status) | |
end | |
protected def exception=(@exception) | |
end | |
protected def proc=(@proc) | |
end | |
def self.yield : TaskInterface | |
YieldTask.new | |
end | |
private class YieldTask | |
include TaskInterface | |
@status = Status::INCOMPLETE | |
def value : Nil | |
end | |
def exception : Nil | |
end | |
def proc : Nil | |
end | |
def status | |
tmp, @status = @status, Status::COMPLETE | |
tmp | |
end | |
end | |
def self.delay(time : Time::Span) : TaskInterface | |
TimedTask.new time | |
end | |
private class TimedTask | |
include TaskInterface | |
@delay : Time::Span? | |
@cur_time = Time.now | |
@status = Status::INCOMPLETE | |
def initialize(@delay) | |
end | |
def value : Nil | |
end | |
def exception : Nil | |
end | |
def proc : Nil | |
end | |
def status | |
delay = @delay.not_nil! | |
if delay.ticks != -1 && (Time.now - @cur_time) >= delay | |
@status = Status::COMPLETE | |
end | |
@status | |
end | |
end | |
end | |
class AsyncCall | |
@@current : self? # current async call | |
getter prev : self? # the previous async call on the stack | |
property current_ip : Void*? # the next reeentrant adddress | |
property local_vars : Void*? # current local variables dump | |
property? is_wait : Bool = false # true if current call is wait for another task | |
getter task : TaskInterface | |
def initialize(@task) | |
end | |
protected def push | |
@prev = @@current | |
@@current = self | |
end | |
protected def pop | |
@@current = @prev | |
@prev = nil | |
end | |
def self.current | |
@@current | |
end | |
end | |
def async_call_and_task_builder(block : -> R) forall R | |
task = Task(R).new | |
async_call = AsyncCall.new task | |
task.proc = -> { | |
begin | |
async_call.push | |
task.value = block.call | |
task.status = Status::COMPLETE unless async_call.is_wait? | |
rescue ex | |
task.exception = ex | |
task.status = Status::FAULT | |
ensure | |
async_call.pop | |
end | |
nil | |
} | |
task | |
end | |
macro await(call) | |
{% if call.is_a?(Call) %} | |
begin | |
%tmp = {{call}} | |
%ip = Pointer(Void).null | |
asm(" | |
1: call 2f | |
jmp 1b | |
2: popq $0 | |
": "=r"(%ip)) | |
AsyncCall.current.not_nil!.current_ip = %ip | |
%current = AsyncCall.current.not_nil! | |
case %tmp.status | |
when Status::INCOMPLETE | |
%stack_top = Pointer(Void).null | |
asm("movq \%rsp, $0": "=r"(%stack_top)) | |
%frame_addr = Intrinsics.frameaddress 0 | |
%current.local_vars.try &.copy_from( | |
%stack_top, %frame_addr.address - %stack_top.address) | |
%current.is_wait = true | |
%current.task.try { |%self| Thread.current.queue.push %self } | |
return nil | |
when Status::FAULT | |
%current.is_wait = false | |
%tmp.exception.try { |%self| raise %self } | |
when Status::COMPLETE | |
%current.is_wait = false | |
else | |
%current.is_wait = false | |
raise "Invaild Task Status" | |
end | |
%tmp.value | |
end | |
{% end %} | |
end | |
macro async(func) | |
{% if func.is_a?(Def) %} | |
{% name = func.name %} | |
{% if func.receiver %} | |
{% name = "#{func.receiver.id}.#{name.id}".id %} | |
{% end %} | |
{% args = func.args.map do |arg| | |
if func.splat_index != nil && arg == func.args[func.splat_index] | |
"*#{arg.id}".id | |
else | |
arg | |
end | |
end | |
%} | |
{% if func.double_splat %} | |
{% args = args + ["**#{func.double_splat.id}".id] %} | |
{% end %} | |
{% if func.block_arg %} | |
{% args = args + ["&#{func.block_arg.id}".id] %} | |
{% end %} | |
def {{name.id}} ({{ args.join(",").id }}) | |
%task = async_call_and_task_builder ->{ | |
%current = AsyncCall.current.not_nil! | |
%stack_top = Pointer(Void).null | |
asm("movq \%rsp, $0": "=r"(%stack_top)) | |
%frame_addr = Intrinsics.frameaddress 0 | |
if %current.local_vars | |
%current.local_vars.try &.copy_to(%stack_top, %frame_addr.address - %stack_top.address) | |
else | |
%current.local_vars = Pointer(Void).malloc(%frame_addr.address - %stack_top.address) | |
end | |
if %ip = AsyncCall.current.not_nil!.current_ip | |
asm("jmp *$0"::"r"(%ip)) | |
end | |
{{ func.body }} | |
} | |
%task.proc.call | |
%task | |
end | |
{% else %} | |
{{ raise "require Def node" }} | |
{% end %} | |
end | |
end | |
include Async | |
async def qqq(x : Int32 = 1, *args : *T, &block) | |
time = Time.now | |
LibC.printf "start wait\n" | |
await Task.delay(Time::Span.new(0, 0, 10)) | |
LibC.printf "end wait\ntime : %s\n", (Time.now - time).to_s | |
end | |
#qqq | |
class Thread | |
getter queue = Deque(TaskInterface).new | |
def self.async_test(&block : ->) | |
new { | |
block.call | |
while task = current.queue.shift? | |
task.proc.try &.call | |
end | |
} | |
end | |
end | |
s = Thread.async_test { | |
qqq | |
} | |
s.join |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
start wait | |
end wait | |
time : 00:00:10.0000538 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is already wrapped in this repo https://github.com/firejox/crystal-async_await