Skip to content

Instantly share code, notes, and snippets.

@jakedouglas
Created October 2, 2009 07:57
Show Gist options
  • Save jakedouglas/199541 to your computer and use it in GitHub Desktop.
Save jakedouglas/199541 to your computer and use it in GitHub Desktop.
diff --git a/ext/cmain.cpp b/ext/cmain.cpp
index a75417c..2922338 100644
--- a/ext/cmain.cpp
+++ b/ext/cmain.cpp
@@ -783,12 +783,12 @@ extern "C" int evma_send_file_data_to_connection (const unsigned long binding, c
evma_start_proxy
*****************/
-extern "C" void evma_start_proxy (const unsigned long from, const unsigned long to)
+extern "C" void evma_start_proxy (const unsigned long from, const unsigned long to, const unsigned long bufsize)
{
ensure_eventmachine("evma_start_proxy");
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (from));
if (ed)
- ed->StartProxy(to);
+ ed->StartProxy(to, bufsize);
}
diff --git a/ext/ed.cpp b/ext/ed.cpp
index 2117f5c..c4fce96 100644
--- a/ext/ed.cpp
+++ b/ext/ed.cpp
@@ -58,6 +58,8 @@ EventableDescriptor::EventableDescriptor (int sd, EventMachine_t *em):
bCallbackUnbind (true),
UnbindReasonCode (0),
ProxyTarget(NULL),
+ ProxiedFrom(NULL),
+ MaxOutboundBufSize(0),
MyEventMachine (em),
PendingConnectTimeout(20000000)
{
@@ -103,6 +105,10 @@ EventableDescriptor::~EventableDescriptor()
{
if (EventCallback && bCallbackUnbind)
(*EventCallback)(GetBinding(), EM_CONNECTION_UNBOUND, NULL, UnbindReasonCode);
+ if (ProxiedFrom) {
+ (*EventCallback)(ProxiedFrom->GetBinding(), EM_PROXY_TARGET_UNBOUND, NULL, 0);
+ ProxiedFrom->StopProxy();
+ }
StopProxy();
Close();
}
@@ -181,12 +187,13 @@ bool EventableDescriptor::IsCloseScheduled()
EventableDescriptor::StartProxy
*******************************/
-void EventableDescriptor::StartProxy(unsigned long to)
+void EventableDescriptor::StartProxy(const unsigned long to, const unsigned long bufsize)
{
EventableDescriptor *ed = dynamic_cast <EventableDescriptor*> (Bindable_t::GetObject (to));
if (ed) {
StopProxy();
- ProxyTarget = to;
+ ProxyTarget = ed;
+ ed->SetProxiedFrom(this, bufsize);
return;
}
throw std::runtime_error ("Tried to proxy to an invalid descriptor");
@@ -200,11 +207,23 @@ EventableDescriptor::StopProxy
void EventableDescriptor::StopProxy()
{
if (ProxyTarget) {
+ ProxyTarget->SetProxiedFrom(NULL, 0);
ProxyTarget = NULL;
}
}
+/***********************************
+EventableDescriptor::SetProxiedFrom
+***********************************/
+
+void EventableDescriptor::SetProxiedFrom(EventableDescriptor *from, const unsigned long bufsize)
+{
+ ProxiedFrom = from;
+ MaxOutboundBufSize = bufsize;
+}
+
+
/********************************************
EventableDescriptor::_GenericInboundDispatch
********************************************/
@@ -213,12 +232,10 @@ void EventableDescriptor::_GenericInboundDispatch(const char *buf, int size)
{
assert(EventCallback);
- if (!ProxyTarget)
+ if (ProxyTarget)
+ ProxyTarget->SendOutboundData(buf, size);
+ else
(*EventCallback)(GetBinding(), EM_CONNECTION_READ, buf, size);
- else if (ConnectionDescriptor::SendDataToConnection(ProxyTarget, buf, size) == -1) {
- (*EventCallback)(GetBinding(), EM_PROXY_TARGET_UNBOUND, NULL, 0);
- StopProxy();
- }
}
@@ -483,6 +500,9 @@ int ConnectionDescriptor::SendOutboundData (const char *data, int length)
if (bWatchOnly)
throw std::runtime_error ("cannot send data on a 'watch only' connection");
+ if (ProxiedFrom && MaxOutboundBufSize && GetOutboundDataSize() + length > MaxOutboundBufSize)
+ ProxiedFrom->Pause();
+
#ifdef WITH_SSL
if (SslBox) {
if (length > 0) {
@@ -931,6 +951,9 @@ void ConnectionDescriptor::_WriteOutboundData()
assert (bytes_written >= 0);
OutboundDataSize -= bytes_written;
+ if (ProxiedFrom && MaxOutboundBufSize && GetOutboundDataSize() < MaxOutboundBufSize && ProxiedFrom->IsPaused())
+ ProxiedFrom->Resume();
+
#ifdef HAVE_WRITEV
if (!err) {
unsigned int sent = bytes_written;
diff --git a/ext/ed.h b/ext/ed.h
index 0f284ed..d60afff 100644
--- a/ext/ed.h
+++ b/ext/ed.h
@@ -84,8 +84,13 @@ class EventableDescriptor: public Bindable_t
struct epoll_event *GetEpollEvent() { return &EpollEvent; }
#endif
- virtual void StartProxy(const unsigned long);
+ virtual void StartProxy(const unsigned long, const unsigned long);
virtual void StopProxy();
+ virtual void SetProxiedFrom(EventableDescriptor*, const unsigned long);
+ virtual int SendOutboundData(const char*,int){ return -1; }
+ virtual bool IsPaused(){ return false; }
+ virtual bool Pause(){ return false; }
+ virtual bool Resume(){ return false; }
private:
bool bCloseNow;
@@ -100,7 +105,10 @@ class EventableDescriptor: public Bindable_t
Int64 CreatedAt;
bool bCallbackUnbind;
int UnbindReasonCode;
- unsigned long ProxyTarget;
+ EventableDescriptor *ProxyTarget;
+ EventableDescriptor *ProxiedFrom;
+
+ unsigned long MaxOutboundBufSize;
#ifdef HAVE_EPOLL
struct epoll_event EpollEvent;
diff --git a/ext/eventmachine.h b/ext/eventmachine.h
index 071d15b..f7baa21 100644
--- a/ext/eventmachine.h
+++ b/ext/eventmachine.h
@@ -105,7 +105,7 @@ extern "C" {
const unsigned long evma_watch_pid (int);
void evma_unwatch_pid (const unsigned long);
- void evma_start_proxy(const unsigned long, const unsigned long);
+ void evma_start_proxy(const unsigned long, const unsigned long, const unsigned long);
void evma_stop_proxy(const unsigned long);
int evma_set_rlimit_nofile (int n_files);
diff --git a/ext/rubymain.cpp b/ext/rubymain.cpp
index 3244e8b..2ac9363 100644
--- a/ext/rubymain.cpp
+++ b/ext/rubymain.cpp
@@ -974,9 +974,9 @@ static VALUE t_get_loop_time (VALUE self)
t_start_proxy
**************/
-static VALUE t_start_proxy (VALUE self, VALUE from, VALUE to)
+static VALUE t_start_proxy (VALUE self, VALUE from, VALUE to, VALUE bufsize)
{
- evma_start_proxy(NUM2ULONG (from), NUM2ULONG (to));
+ evma_start_proxy(NUM2ULONG (from), NUM2ULONG (to), NUM2ULONG(bufsize));
return Qnil;
}
@@ -1083,7 +1083,7 @@ extern "C" void Init_rubyeventmachine()
rb_define_module_function (EmModule, "resume_connection", (VALUE (*)(...))t_resume, 1);
rb_define_module_function (EmModule, "connection_paused?", (VALUE (*)(...))t_paused_p, 1);
- rb_define_module_function (EmModule, "start_proxy", (VALUE (*)(...))t_start_proxy, 2);
+ rb_define_module_function (EmModule, "start_proxy", (VALUE (*)(...))t_start_proxy, 3);
rb_define_module_function (EmModule, "stop_proxy", (VALUE (*)(...))t_stop_proxy, 1);
rb_define_module_function (EmModule, "watch_filename", (VALUE (*)(...))t_watch_filename, 1);
diff --git a/lib/em/connection.rb b/lib/em/connection.rb
index 47a602c..6acb69b 100644
--- a/lib/em/connection.rb
+++ b/lib/em/connection.rb
@@ -138,8 +138,8 @@ module EventMachine
# a low-level proxy relay for all data inbound for this connection, to the connection given
# as the argument. This is essentially just a helper method for enable_proxy.
# See EventMachine::enable_proxy documentation for details.
- def proxy_incoming_to(conn)
- EventMachine::enable_proxy(self, conn)
+ def proxy_incoming_to(conn,bufsize=0)
+ EventMachine::enable_proxy(self, conn, bufsize)
end
# Helper method for EventMachine::disable_proxy(self)
diff --git a/lib/eventmachine.rb b/lib/eventmachine.rb
index f63d8a0..a5c0c1c 100644
--- a/lib/eventmachine.rb
+++ b/lib/eventmachine.rb
@@ -1472,8 +1472,8 @@ module EventMachine
# EM.run {
# EM.start_server("127.0.0.1", 8080, ProxyServer)
# }
- def self.enable_proxy(from, to)
- EM::start_proxy(from.signature, to.signature)
+ def self.enable_proxy(from, to, bufsize=0)
+ EM::start_proxy(from.signature, to.signature, bufsize)
end
# disable_proxy takes just one argument, a Connection that has proxying enabled via enable_proxy.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment