Created
October 2, 2009 07:57
-
-
Save jakedouglas/199541 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/ext/cmain.cpp b/ext/cmain.cpp | |
index a75417c..2922338 100644 | |
--- a/ext/cmain.cpp | |
+++ b/ext/cmain.cpp | |
@@ -783,12 +783,12 @@ extern "C" int evma_send_file_data_to_connection (const unsigned long binding, c | |
evma_start_proxy | |
*****************/ | |
-extern "C" void evma_start_proxy (const unsigned long from, const unsigned long to) | |
+extern "C" void evma_start_proxy (const unsigned long from, const unsigned long to, const unsigned long bufsize) | |
{ | |
ensure_eventmachine("evma_start_proxy"); | |
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (from)); | |
if (ed) | |
- ed->StartProxy(to); | |
+ ed->StartProxy(to, bufsize); | |
} | |
diff --git a/ext/ed.cpp b/ext/ed.cpp | |
index 2117f5c..c4fce96 100644 | |
--- a/ext/ed.cpp | |
+++ b/ext/ed.cpp | |
@@ -58,6 +58,8 @@ EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em): | |
bCallbackUnbind (true), | |
UnbindReasonCode (0), | |
ProxyTarget(NULL), | |
+ ProxiedFrom(NULL), | |
+ MaxOutboundBufSize(0), | |
MyEventMachine (em), | |
PendingConnectTimeout(20000000) | |
{ | |
@@ -103,6 +105,10 @@ EventableDescriptor::~EventableDescriptor() | |
{ | |
if (EventCallback && bCallbackUnbind) | |
(*EventCallback)(GetBinding(), EM_CONNECTION_UNBOUND, NULL, UnbindReasonCode); | |
+ if (ProxiedFrom) { | |
+ (*EventCallback)(ProxiedFrom->GetBinding(), EM_PROXY_TARGET_UNBOUND, NULL, 0); | |
+ ProxiedFrom->StopProxy(); | |
+ } | |
StopProxy(); | |
Close(); | |
} | |
@@ -181,12 +187,13 @@ bool EventableDescriptor::IsCloseScheduled() | |
EventableDescriptor::StartProxy | |
*******************************/ | |
-void EventableDescriptor::StartProxy(unsigned long to) | |
+void EventableDescriptor::StartProxy(const unsigned long to, const unsigned long bufsize) | |
{ | |
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (to)); | |
if (ed) { | |
StopProxy(); | |
- ProxyTarget = to; | |
+ ProxyTarget = ed; | |
+ ed->SetProxiedFrom(this, bufsize); | |
return; | |
} | |
throw std::runtime_error ("Tried to proxy to an invalid descriptor"); | |
@@ -200,11 +207,23 @@ EventableDescriptor::StopProxy | |
void EventableDescriptor::StopProxy() | |
{ | |
if (ProxyTarget) { | |
+ ProxyTarget->SetProxiedFrom(NULL, 0); | |
ProxyTarget = NULL; | |
} | |
} | |
+/*********************************** | |
+EventableDescriptor::SetProxiedFrom | |
+***********************************/ | |
+ | |
+void EventableDescriptor::SetProxiedFrom(EventableDescriptor *from, const unsigned long bufsize) | |
+{ | |
+ ProxiedFrom = from; | |
+ MaxOutboundBufSize = bufsize; | |
+} | |
+ | |
+ | |
/******************************************** | |
EventableDescriptor::_GenericInboundDispatch | |
********************************************/ | |
@@ -213,12 +232,10 @@ void EventableDescriptor::_GenericInboundDispatch(const char *buf, int size) | |
{ | |
assert(EventCallback); | |
- if (!ProxyTarget) | |
+ if (ProxyTarget) | |
+ ProxyTarget->SendOutboundData(buf, size); | |
+ else | |
(*EventCallback)(GetBinding(), EM_CONNECTION_READ, buf, size); | |
- else if (ConnectionDescriptor::SendDataToConnection(ProxyTarget, buf, size) == -1) { | |
- (*EventCallback)(GetBinding(), EM_PROXY_TARGET_UNBOUND, NULL, 0); | |
- StopProxy(); | |
- } | |
} | |
@@ -483,6 +500,9 @@ int ConnectionDescriptor::SendOutboundData (const char *data, int length) | |
if (bWatchOnly) | |
throw std::runtime_error ("cannot send data on a 'watch only' connection"); | |
+ if (ProxiedFrom && MaxOutboundBufSize && GetOutboundDataSize() + length > MaxOutboundBufSize) | |
+ ProxiedFrom->Pause(); | |
+ | |
#ifdef WITH_SSL | |
if (SslBox) { | |
if (length > 0) { | |
@@ -931,6 +951,9 @@ void ConnectionDescriptor::_WriteOutboundData() | |
assert (bytes_written >= 0); | |
OutboundDataSize -= bytes_written; | |
+ if (ProxiedFrom && MaxOutboundBufSize && GetOutboundDataSize() < MaxOutboundBufSize && ProxiedFrom->IsPaused()) | |
+ ProxiedFrom->Resume(); | |
+ | |
#ifdef HAVE_WRITEV | |
if (!err) { | |
unsigned int sent = bytes_written; | |
diff --git a/ext/ed.h b/ext/ed.h | |
index 0f284ed..d60afff 100644 | |
--- a/ext/ed.h | |
+++ b/ext/ed.h | |
@@ -84,8 +84,13 @@ class EventableDescriptor: public Bindable_t | |
struct epoll_event *GetEpollEvent() { return &EpollEvent; } | |
#endif | |
- virtual void StartProxy(const unsigned long); | |
+ virtual void StartProxy(const unsigned long, const unsigned long); | |
virtual void StopProxy(); | |
+ virtual void SetProxiedFrom(EventableDescriptor*, const unsigned long); | |
+ virtual int SendOutboundData(const char*,int){ return -1; } | |
+ virtual bool IsPaused(){ return false; } | |
+ virtual bool Pause(){ return false; } | |
+ virtual bool Resume(){ return false; } | |
private: | |
bool bCloseNow; | |
@@ -100,7 +105,10 @@ class EventableDescriptor: public Bindable_t | |
Int64 CreatedAt; | |
bool bCallbackUnbind; | |
int UnbindReasonCode; | |
- unsigned long ProxyTarget; | |
+ EventableDescriptor *ProxyTarget; | |
+ EventableDescriptor *ProxiedFrom; | |
+ | |
+ unsigned long MaxOutboundBufSize; | |
#ifdef HAVE_EPOLL | |
struct epoll_event EpollEvent; | |
diff --git a/ext/eventmachine.h b/ext/eventmachine.h | |
index 071d15b..f7baa21 100644 | |
--- a/ext/eventmachine.h | |
+++ b/ext/eventmachine.h | |
@@ -105,7 +105,7 @@ extern "C" { | |
const unsigned long evma_watch_pid (int); | |
void evma_unwatch_pid (const unsigned long); | |
- void evma_start_proxy(const unsigned long, const unsigned long); | |
+ void evma_start_proxy(const unsigned long, const unsigned long, const unsigned long); | |
void evma_stop_proxy(const unsigned long); | |
int evma_set_rlimit_nofile (int n_files); | |
diff --git a/ext/rubymain.cpp b/ext/rubymain.cpp | |
index 3244e8b..2ac9363 100644 | |
--- a/ext/rubymain.cpp | |
+++ b/ext/rubymain.cpp | |
@@ -974,9 +974,9 @@ static VALUE t_get_loop_time (VALUE self) | |
t_start_proxy | |
**************/ | |
-static VALUE t_start_proxy (VALUE self, VALUE from, VALUE to) | |
+static VALUE t_start_proxy (VALUE self, VALUE from, VALUE to, VALUE bufsize) | |
{ | |
- evma_start_proxy(NUM2ULONG (from), NUM2ULONG (to)); | |
+ evma_start_proxy(NUM2ULONG (from), NUM2ULONG (to), NUM2ULONG(bufsize)); | |
return Qnil; | |
} | |
@@ -1083,7 +1083,7 @@ extern "C" void Init_rubyeventmachine() | |
rb_define_module_function (EmModule, "resume_connection", (VALUE (*)(...))t_resume, 1); | |
rb_define_module_function (EmModule, "connection_paused?", (VALUE (*)(...))t_paused_p, 1); | |
- rb_define_module_function (EmModule, "start_proxy", (VALUE (*)(...))t_start_proxy, 2); | |
+ rb_define_module_function (EmModule, "start_proxy", (VALUE (*)(...))t_start_proxy, 3); | |
rb_define_module_function (EmModule, "stop_proxy", (VALUE (*)(...))t_stop_proxy, 1); | |
rb_define_module_function (EmModule, "watch_filename", (VALUE (*)(...))t_watch_filename, 1); | |
diff --git a/lib/em/connection.rb b/lib/em/connection.rb | |
index 47a602c..6acb69b 100644 | |
--- a/lib/em/connection.rb | |
+++ b/lib/em/connection.rb | |
@@ -138,8 +138,8 @@ module EventMachine | |
# a low-level proxy relay for all data inbound for this connection, to the connection given | |
# as the argument. This is essentially just a helper method for enable_proxy. | |
# See EventMachine::enable_proxy documentation for details. | |
- def proxy_incoming_to(conn) | |
- EventMachine::enable_proxy(self, conn) | |
+ def proxy_incoming_to(conn,bufsize=0) | |
+ EventMachine::enable_proxy(self, conn, bufsize) | |
end | |
# Helper method for EventMachine::disable_proxy(self) | |
diff --git a/lib/eventmachine.rb b/lib/eventmachine.rb | |
index f63d8a0..a5c0c1c 100644 | |
--- a/lib/eventmachine.rb | |
+++ b/lib/eventmachine.rb | |
@@ -1472,8 +1472,8 @@ module EventMachine | |
# EM.run { | |
# EM.start_server("127.0.0.1", 8080, ProxyServer) | |
# } | |
- def self.enable_proxy(from, to) | |
- EM::start_proxy(from.signature, to.signature) | |
+ def self.enable_proxy(from, to, bufsize=0) | |
+ EM::start_proxy(from.signature, to.signature, bufsize) | |
end | |
# disable_proxy takes just one argument, a Connection that has proxying enabled via enable_proxy. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment