Skip to content

Instantly share code, notes, and snippets.

@jalbright015
Created July 21, 2022 07:38
Show Gist options
  • Save jalbright015/e82e7fb551fafe7fd4fb284f6a01500c to your computer and use it in GitHub Desktop.
Save jalbright015/e82e7fb551fafe7fd4fb284f6a01500c to your computer and use it in GitHub Desktop.
#include <config.h>
#include <daemons.h>
#include <net/socket.h> // from the driver include dir
#include <net/socket_errors.h> // from the driver include dir
#include <net/config.h>
#include <net/daemons.h>
#include "/u/j/jezu/std/debug.c"
inherit DAEMON;
inherit SAVE_D;
private nosave mapping fds = ([ ]);
#define DRIVER_UNIT 1
#define UNIT_CONFIG "../testsuite/etc/config.test"
#define LOG_UNIT "unit"
#define LOG_UNIT_ERR "unit_errors"
#define PORT_UNIT (11996)
#define MAXIMUM_RETRIES 3
#define BAD_CMD \
([ "error" : "Bad Command" ])
object *_sessions = ({});
object *_pending_session_requests = ({});
nosave private mapping _bad_cmd_ = BAD_CMD;
int _accesses;
nosave private int _unit_sock;
nosave private mapping _sockets;
nosave private mapping _resolve_pending;
protected void setup();
void close_connection(int fd);
void read_callback(int fd, mixed mess);
void write_callback(int fd);
private nosave string *base_args = ({ UNIT_CONFIG });
void log_info(string file, string message)
{
log_file(file, ctime(time()) + "\n");
log_file(file, message);
}
int query_accesses()
{
DEBUG("query_accesses");
return _accesses;
}
protected void create()
{
_accesses = 0;
set_persistent(1);
save::create();
seteuid(ROOT_UID);
//_sessions = clones(UNIT_SESSION);
_sockets = ([]);
_resolve_pending = ([]);
log_info(LOG_UNIT_ERR, "Created when uptime = " + uptime() + "\n");
call_out("setup", 2);
}
void remove()
{
save::remove();
log_info(LOG_UNIT_ERR, "Destructed when uptime = " + uptime() + "\n");
destruct(this_object());
}
int query_prevent_shadow(object ob)
{
return 1;
}
protected void clean_up() {}
protected void setup()
{
int ret;
_unit_sock = socket_create(STREAM, "read_callback", "close_callback");
if (_unit_sock < 0)
return log_info(LOG_UNIT_ERR, "setup: Failed to create socket.\n");
ret = socket_bind(_unit_sock, PORT_UNIT);
if (ret != EESUCCESS)
{
DEBUG(socket_error(ret));
socket_close(_unit_sock);
return log_info(LOG_UNIT_ERR, sprintf("socket_bind: %s\n", socket_error(ret)));
}
ret = socket_listen(_unit_sock, "listen_callback");
if (ret != EESUCCESS)
{
DEBUG(socket_error(ret));
socket_close(_unit_sock);
return log_info(LOG_UNIT_ERR, sprintf("socket_listen: %s\n", socket_error(ret)));
}
fetch(base_args);
DEBUG("setup() done\n");
}
protected void write_data_retry(int fd, string data, int counter)
{
int rc;
DEBUG("write_data_retry");
if (counter == MAXIMUM_RETRIES)
{
close_connection(fd);
return;
}
DEBUG(data);
data = replace_string(data, "\n", "\r");
rc = socket_write(fd, data);
_sockets[fd]["write_status"] = rc;
DEBUG(data);
switch (rc)
{
case EESUCCESS:
// we're finished with this fd.
DEBUG("EESUCCESS");
close_connection(fd);
break;
case EEALREADY:
DEBUG("EEALREADY");
// driver must have set the socket marked as BLOCKED when
// it was created by socket_accept(). Just wait for
// write_callback to be called so that we can write out the
// pending data.
_sockets[fd]["pending"] = data;
break;
case EECALLBACK:
DEBUG("EECALLBACK");
// wait for write_callback before accessing socket fd again.
break;
case EEWOULDBLOCK:
case EESEND:
DEBUG("EEWOULDBLOCK || EESEND");
// try again in two seconds
if (counter < MAXIMUM_RETRIES)
{
call_out("retry_write", 2, ({fd, data, counter + 1}));
return;
}
// fall through to the default case and write an error.
default:
DEBUG("DEFAULT");
log_info(LOG_UNIT_ERR, "write_data_retry: " + socket_error(rc) + "\n");
close_connection(fd);
break;
}
}
void retry_write(mixed* args)
{
DEBUG("retry_write");
write_data_retry(args...);
}
protected void write_data(int fd, mixed data)
{
DEBUG("write_data");
write_data_retry(fd, data, 0);
}
protected void store_client_info(int fd)
{
string addr;
mixed result;
int port;
DEBUG(fd);
DEBUG(_sockets);
sscanf(socket_address(fd), "%s %d", addr, port);
_sockets[fd] = ([
"address" : addr,
"name" : addr,
"port" : port,
"time" : time(),
"write_status" : EESUCCESS
]);
result = OB_RESOLVER->query_cache(addr, "resolve_callback");
DEBUG(result);
if ( intp(result) )
_resolve_pending[result] = fd;
else
_sockets[fd]["name"] = result;
}
protected void listen_callback(int fd)
{
int nfd;
DEBUG("listen_callback");
nfd = socket_accept(fd, "read_callback", "write_callback");
if (nfd < 0)
{
DEBUG("listen_callback: socket_accept failed.\n");
return log_info(LOG_UNIT_ERR, "listen_callback: socket_accept failed.\n");
}
store_client_info(nfd);
DEBUG(nfd);
}
//
// The driver calls write_callback to indicate that the data sent
// by the last call to socket_write() is finally all written to the
// network (or to indicate that a socket created in the blocked state
// is now ready for writing).
//
void write_callback(int fd)
{
DEBUG(fd);
// The status will be EEALREADY only in the event that the socket
// was created in a blocked state (this object is smart enough not
// to write to a socket it knows is blocked).
//
if (_sockets[fd]["write_status"] == EEALREADY)
{
DEBUG("_sockets[fd][\"write_status\"] == EEALREADY");
DEBUG(_sockets);
write_data(fd, _sockets[fd]["pending"]);
//
// its safe to delete the pending data now since its already been sent
// and since we won't ever have any more pending data for this
// socket (we might have an EECALLBACK but the driver is
// responsible for holding the pending data in that case).
//
map_delete(_sockets[fd], "pending");
}
else
{
//
// We can close the socket at this point since we only ever send one
// thing on a given socket before we are through with it.
//
_sockets[fd]["write_status"] = EESUCCESS;
DEBUG("_sockets[fd][\"write_status\"] = EESUCCESS");
close_connection(fd);
}
}
protected void read_callback(int fd, string str)
{
string cmd, args, file, url;
string *stdout, tmp, line0;
DEBUG("read_callback");
if (!sizeof(str))
{
DEBUG(_bad_cmd_);
return unit_error(fd, _bad_cmd_, "Bad stdout");
}
if (tmp = _sockets[fd]["read"])
str = tmp + str;
DEBUG(str);
if (str[<1] != '\n' && str[<1] != '\r')
{
_sockets[fd]["read"] = str;
return;
}
else
{
DEBUG("map_delete(_sockets[fd], \"read\")");
map_delete(_sockets[fd], "read");
}
_accesses++;
stdout = explode(replace_string(str, "\r", ""), "\n");
if ( !sizeof(stdout) )
return unit_error(fd, _bad_cmd_, "Bad stdout");
DEBUG(stdout);
}
// close_callback is called when any socket is closed unexpectedly
// (by the driver instead of as a result of socket_close()).
protected void close_callback(int fd)
{
DEBUG(call_stack());
DEBUG(call_stack(1));
DEBUG(call_stack(2));
DEBUG(call_stack(3));
DEBUG(call_stack(4));
if (fd == _unit_sock)
{
log_info(LOG_UNIT_ERR,
"UNIT socket closed unexpectedly; restarting.\n");
call_out("setup", 5);
}
else
{
if (undefinedp(_sockets[fd]))
{
log_info(LOG_UNIT_ERR,
sprintf("Client socket %d closed unexpectedly\n",
fd));
}
else
{
log_info(LOG_UNIT_ERR,
sprintf("Client socket %s %d closed unexpectedly.\n",
_sockets[fd]["name"],
_sockets[fd]["port"]));
}
map_delete(_sockets, fd);
}
}
// resolve_callback is called by the resolver object in response to our
// queries to resolve dotted decimal internet addresses into domain name
// style addresses.
void resolve_callback(string theName, string theAddr, int slot)
{
int *fds, fd;
DEBUG("resolve_callback");
fd = _resolve_pending[slot];
map_delete(_resolve_pending, slot);
if (!undefinedp(_sockets[fd]) && (_sockets[fd]["address"] == theAddr))
{
_sockets[fd]["name"] = theName;
}
else
{
log_info(LOG_UNIT_ERR,
sprintf("Resolved %s to %s after connection closed.\n",
theAddr,
(sizeof(theName) ? theName : "NOT RESOLVED")));
}
}
protected void unit_error(int fd, mapping err, string code)
{
DEBUG("unit_error");
}
protected void close_connection(int fd)
{
DEBUG(_sockets);
if (_sockets[fd]["write_status"] == EECALLBACK)
// write_callback() will call close_connection() when socket fd
// is drained.
return;
map_delete(_sockets, fd);
map_delete(_resolve_pending, fd);
socket_close(fd);
}
void fetch(mixed args...)
{
int fd;
if ( sizeof(args) )
args = ({ base_args..., args..., });
else
args = base_args;
fd = external_start(DRIVER_UNIT,
UNIT_CONFIG,
(: read_callback :),
(: write_callback :),
(: close_connection :)
);
if(fd < 0) return;
store_client_info(fd);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment