Skip to content

Instantly share code, notes, and snippets.

@firejox
Created December 21, 2016 15:01
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save firejox/4f1d1fd16adcad49b478601fd24b1867 to your computer and use it in GitHub Desktop.
Save firejox/4f1d1fd16adcad49b478601fd24b1867 to your computer and use it in GitHub Desktop.
async await toy
lib Intrinsics
fun frameaddress = "llvm.frameaddress"(level : Int32) : Void*
end
module Async
enum Status
INCOMPLETE
COMPLETE
FAULT
end
module TaskInterface
abstract def value
abstract def status : Status
abstract def exception
abstract def proc
end
class Task(T)
include TaskInterface
getter value : T?
getter status : Status
getter exception : Exception?
getter proc : ->
def initialize
@proc = Proc(Void).new {}
@status = Status::INCOMPLETE
end
protected def value=(@value)
end
protected def status=(@status)
end
protected def exception=(@exception)
end
protected def proc=(@proc)
end
def self.yield : TaskInterface
YieldTask.new
end
private class YieldTask
include TaskInterface
@status = Status::INCOMPLETE
def value : Nil
end
def exception : Nil
end
def proc : Nil
end
def status
tmp, @status = @status, Status::COMPLETE
tmp
end
end
def self.delay(time : Time::Span) : TaskInterface
TimedTask.new time
end
private class TimedTask
include TaskInterface
@delay : Time::Span?
@cur_time = Time.now
@status = Status::INCOMPLETE
def initialize(@delay)
end
def value : Nil
end
def exception : Nil
end
def proc : Nil
end
def status
delay = @delay.not_nil!
if delay.ticks != -1 && (Time.now - @cur_time) >= delay
@status = Status::COMPLETE
end
@status
end
end
end
class AsyncCall
@@current : self? # current async call
getter prev : self? # the previous async call on the stack
property current_ip : Void*? # the next reeentrant adddress
property local_vars : Void*? # current local variables dump
property? is_wait : Bool = false # true if current call is wait for another task
getter task : TaskInterface
def initialize(@task)
end
protected def push
@prev = @@current
@@current = self
end
protected def pop
@@current = @prev
@prev = nil
end
def self.current
@@current
end
end
def async_call_and_task_builder(block : -> R) forall R
task = Task(R).new
async_call = AsyncCall.new task
task.proc = -> {
begin
async_call.push
task.value = block.call
task.status = Status::COMPLETE unless async_call.is_wait?
rescue ex
task.exception = ex
task.status = Status::FAULT
ensure
async_call.pop
end
nil
}
task
end
macro await(call)
{% if call.is_a?(Call) %}
begin
%tmp = {{call}}
%ip = Pointer(Void).null
asm("
1: call 2f
jmp 1b
2: popq $0
": "=r"(%ip))
AsyncCall.current.not_nil!.current_ip = %ip
%current = AsyncCall.current.not_nil!
case %tmp.status
when Status::INCOMPLETE
%stack_top = Pointer(Void).null
asm("movq \%rsp, $0": "=r"(%stack_top))
%frame_addr = Intrinsics.frameaddress 0
%current.local_vars.try &.copy_from(
%stack_top, %frame_addr.address - %stack_top.address)
%current.is_wait = true
%current.task.try { |%self| Thread.current.queue.push %self }
return nil
when Status::FAULT
%current.is_wait = false
%tmp.exception.try { |%self| raise %self }
when Status::COMPLETE
%current.is_wait = false
else
%current.is_wait = false
raise "Invaild Task Status"
end
%tmp.value
end
{% end %}
end
macro async(func)
{% if func.is_a?(Def) %}
{% name = func.name %}
{% if func.receiver %}
{% name = "#{func.receiver.id}.#{name.id}".id %}
{% end %}
{% args = func.args.map do |arg|
if func.splat_index != nil && arg == func.args[func.splat_index]
"*#{arg.id}".id
else
arg
end
end
%}
{% if func.double_splat %}
{% args = args + ["**#{func.double_splat.id}".id] %}
{% end %}
{% if func.block_arg %}
{% args = args + ["&#{func.block_arg.id}".id] %}
{% end %}
def {{name.id}} ({{ args.join(",").id }})
%task = async_call_and_task_builder ->{
%current = AsyncCall.current.not_nil!
%stack_top = Pointer(Void).null
asm("movq \%rsp, $0": "=r"(%stack_top))
%frame_addr = Intrinsics.frameaddress 0
if %current.local_vars
%current.local_vars.try &.copy_to(%stack_top, %frame_addr.address - %stack_top.address)
else
%current.local_vars = Pointer(Void).malloc(%frame_addr.address - %stack_top.address)
end
if %ip = AsyncCall.current.not_nil!.current_ip
asm("jmp *$0"::"r"(%ip))
end
{{ func.body }}
}
%task.proc.call
%task
end
{% else %}
{{ raise "require Def node" }}
{% end %}
end
end
include Async
async def qqq(x : Int32 = 1, *args : *T, &block)
time = Time.now
LibC.printf "start wait\n"
await Task.delay(Time::Span.new(0, 0, 10))
LibC.printf "end wait\ntime : %s\n", (Time.now - time).to_s
end
#qqq
class Thread
getter queue = Deque(TaskInterface).new
def self.async_test(&block : ->)
new {
block.call
while task = current.queue.shift?
task.proc.try &.call
end
}
end
end
s = Thread.async_test {
qqq
}
s.join
start wait
end wait
time : 00:00:10.0000538
@firejox
Copy link
Author

firejox commented Dec 26, 2016

This is already wrapped in this repo https://github.com/firejox/crystal-async_await

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment