Skip to content

Instantly share code, notes, and snippets.

@exbotanical
Last active January 1, 2023 01:21
Show Gist options
  • Save exbotanical/a19afa00e5db69ec20e2428e7ba4492b to your computer and use it in GitHub Desktop.
Save exbotanical/a19afa00e5db69ec20e2428e7ba4492b to your computer and use it in GitHub Desktop.
A chat server written in c with support for subscribable topics
#include <arpa/inet.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <time.h>
#include <unistd.h>
#include <uuid/uuid.h>
#include "deps/libutil/array.h"
#include "deps/libutil/fmt.h"
#define LOG(fmt, ...) printf(fmt, __VA_ARGS__)
#define CARRIAGE_RETURN '\r'
#define NULL_TERMINATOR '\0'
#define NEWLINE '\n'
#define META_COMMAND_PREFIX '/'
#define TOPIC_CLEANUP_INTERVAL_SECONDS 1200 // 20 minutes
#define MAX_CLIENTS 200 // Maximum concurrent clients allowed
#define BUFFER_SIZE 2048 // Default base buffer size for I/O
#define SERVER_PORT 5000 // The port on which the chat server listens
static _Atomic unsigned int client_count = 0;
static int yes = 1;
pthread_mutex_t topics_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t clients_mutex = PTHREAD_MUTEX_INITIALIZER;
typedef struct {
struct sockaddr_in addr; // Client address
int fd; // Connection file descriptor
char uuid[UUID_STR_LEN]; // Unique identifier for client
char nick[32]; // Client name
Array* topics; // Subscribed topics
} Client;
typedef struct {
time_t last_at_zero_subs; // Time at which this topic last reached zero
// subscribers (-1 if subs)
unsigned int subs; // Number of subscribers
char* name[16]; // The topic name (unique)
char owner_id[UUID_STR_LEN]; // A uuid matching the id of the client that
// created this topic
} Topic;
typedef struct {
char* name; // The activity name, also the metacommand itself
char* help_message; // The activity help message, as printed by the /help
// activity
bool (*handler)(Client* client, char* out_buffer); // The activity handler
} Activity;
/* Global State */
Client* clients[MAX_CLIENTS];
Array* topics;
Array* activities;
/* Declarations */
int start_server();
Client* start_client(int listener, struct sockaddr_in client_addr,
const char* nick);
void send_message_self(Client* client, char* buffer);
void send_message_client(char* uuid, char* buffer);
void send_message_all(char* buffer, char* topic);
bool process_meta_command(Client* client, char* input_buf, char* out_buffer);
void topic_unsubscribe(Client* client, char* topic_name, char* buffer);
void topic_subscribe(Client* client, char* topic_name, char* buffer);
Topic* topic_new(Client* client, char* topic_name);
bool matcher_topic(void* topic, void* target);
void topic_add_sub(Topic* topic);
/* Utilities */
char* build_message(Client* client, char* buffer, char* prefix) {
char* message = strtok(NULL, " ");
sprintf(buffer, prefix, client->nick);
while (message != NULL) {
strcat(buffer, " ");
strcat(buffer, message);
message = strtok(NULL, " ");
}
strcat(buffer, "\r\n");
}
void strip_newline(char* str) {
while (*str != NULL_TERMINATOR) {
if (*str == CARRIAGE_RETURN || *str == NEWLINE) {
*str = NULL_TERMINATOR;
}
str++;
}
}
/* Activities */
bool matcher_activity_name(void* activity, void* target) {
return !strcmp(((Activity*)activity)->name, (char*)target);
}
bool quit_activity_handler(Client* client, char* out_buffer) { return true; }
bool ping_activity_handler(Client* client, char* out_buffer) {
send_message_self(client, "pong\n");
return false;
}
bool nick_activity_handler(Client* client, char* out_buffer) {
char* new_nick = strtok(NULL, " ");
char* old_nick = strdup(client->nick);
strncpy(client->nick, new_nick, sizeof(client->nick));
client->nick[sizeof(client->nick) - 1] = NULL_TERMINATOR;
sprintf(out_buffer, ":: %s is now %s\r\n", old_nick, new_nick);
free(old_nick);
send_message_all(out_buffer, NULL);
return false;
}
bool msg_activity_handler(Client* client, char* out_buffer) {
char* uuid = strtok(NULL, " ");
build_message(client, out_buffer, "[PM][%s]");
send_message_client(uuid, out_buffer);
return false;
}
bool list_activity_handler(Client* client, char* out_buffer) {
pthread_mutex_lock(&clients_mutex);
for (unsigned int i = 0; i < MAX_CLIENTS; i++) {
if (clients[i]) {
bool is_me = clients[i] == client;
sprintf(out_buffer, "%snick: %s (%s)\n", is_me ? "(you) " : "",
clients[i]->nick, clients[i]->uuid);
send_message_self(client, out_buffer);
}
}
pthread_mutex_unlock(&clients_mutex);
return false;
}
bool topics_activity_handler(Client* client, char* out_buffer) {
if (!topics->len) {
strcat(out_buffer,
"no topics. use `/add_topic <topic_name>` to create one!");
} else {
for (int i = 0; i < topics->len; i++) {
strcat(out_buffer, (char*)((Topic*)topics->state[i])->name);
if (topics->len != 1 && i != topics->len - 1) {
strcat(out_buffer, ", ");
}
}
}
strcat(out_buffer, "\r\n");
send_message_self(client, out_buffer);
return false;
}
bool my_topics_activity_handler(Client* client, char* out_buffer) {
if (!client->topics->len) {
strcat(out_buffer,
"no subscribed topics. use `/subscribe <topic_name>` to add one!");
} else {
for (int i = 0; i < client->topics->len; i++) {
strcat(out_buffer, (char*)client->topics->state[i]);
if (client->topics->len != 1 && i != client->topics->len - 1) {
strcat(out_buffer, ", ");
}
}
}
strcat(out_buffer, "\r\n");
send_message_self(client, out_buffer);
return false;
}
bool add_topic_activity_handler(Client* client, char* out_buffer) {
char* topic_name = strtok(NULL, " ");
pthread_mutex_lock(&topics_mutex);
// Check if the topic already exists in the topics list TODO: optimize
for (int i = 0; i < topics->len; i++) {
if (matcher_topic((Topic*)topics->state[i], topic_name)) {
sprintf(out_buffer, ":: topic %s already exists\r\n", topic_name);
send_message_self(client, out_buffer);
pthread_mutex_unlock(&topics_mutex);
return false;
}
}
pthread_mutex_unlock(&topics_mutex);
Topic* topic = topic_new(client, topic_name);
array_push(topics, topic);
array_push(client->topics, strdup(topic_name));
topic_add_sub(topic);
sprintf(out_buffer, "::%s created topic %s\r\n", client->nick, topic_name);
send_message_all(out_buffer, NULL);
return false;
}
bool remove_topic_activity_handler(Client* client, char* out_buffer) {
char* topic_name = strtok(NULL, " ");
pthread_mutex_lock(&topics_mutex);
for (int i = 0; i < topics->len; i++) {
if (matcher_topic((void*)topics->state[i], topic_name)) {
Topic* topic = (Topic*)topics->state[i];
// Check if the client trying to remove this topic actually owns it
if (strcmp(topic->owner_id, client->uuid) != 0) {
sprintf(out_buffer,
":: you must be the owner of topic %s to remove it\r\n",
topic_name);
send_message_self(client, out_buffer);
pthread_mutex_unlock(&topics_mutex);
return false;
}
// Check if the topic still has subscribers
if (topic->subs) {
sprintf(out_buffer,
":: topic %s cannot be removed while it has subscribers\r\n",
topic_name);
send_message_self(client, out_buffer);
pthread_mutex_unlock(&topics_mutex);
return false;
}
array_remove(topics, i);
sprintf(out_buffer, ":: topic %s removed\r\n", topic_name);
send_message_all(out_buffer, NULL);
pthread_mutex_unlock(&topics_mutex);
return false;
}
}
sprintf(out_buffer, ":: topic %s does not exist\r\n", topic_name);
send_message_self(client, out_buffer);
pthread_mutex_unlock(&topics_mutex);
return false;
}
bool unsubscribe_activity_handler(Client* client, char* out_buffer) {
char* topic_name = strtok(NULL, " ");
topic_unsubscribe(client, topic_name, out_buffer);
return false;
}
bool subscribe_activity_handler(Client* client, char* out_buffer) {
char* topic_name = strtok(NULL, " ");
topic_subscribe(client, topic_name, out_buffer);
return false;
}
bool msg_topic_activity_handler(Client* client, char* out_buffer) {
char* topic = strtok(NULL, " ");
build_message(client, out_buffer, "[TM][%s]");
send_message_all(topic, out_buffer);
return false;
}
bool help_activity_handler(Client* client, char* out_buffer) {
for (int i = 0; i < activities->len; i++) {
strcat(out_buffer, ((Activity*)activities->state[i])->help_message);
}
send_message_self(client, out_buffer);
return false;
}
void register_activities() {
activities = array_init();
Activity* quit_activity = malloc(sizeof(Activity*));
quit_activity->help_message = ":: /quit\tExit chat room\r\n";
quit_activity->handler = &quit_activity_handler;
quit_activity->name = "quit";
array_push(activities, quit_activity);
Activity* ping_activity = malloc(sizeof(Activity*));
ping_activity->help_message = ":: /ping\tServer test\r\n";
ping_activity->name = "ping";
ping_activity->handler = &ping_activity_handler;
array_push(activities, *(&ping_activity));
Activity* nick_activity = malloc(sizeof(Activity*));
nick_activity->help_message = ":: /nick <name>\tChange nickname\r\n";
nick_activity->name = "nick";
nick_activity->handler = &nick_activity_handler;
array_push(activities, nick_activity);
Activity* msg_activity = malloc(sizeof(Activity*));
msg_activity->help_message =
":: /msg <uuid> <message>\tSend private message\r\n";
msg_activity->name = "msg";
msg_activity->handler = &msg_activity_handler;
array_push(activities, msg_activity);
Activity* list_activity = malloc(sizeof(Activity*));
list_activity->help_message = ":: /list\tShow active nicks\r\n";
list_activity->name = "list";
list_activity->handler = &list_activity_handler;
array_push(activities, list_activity);
Activity* topics_activity = malloc(sizeof(Activity*));
topics_activity->help_message = ":: /topics\tShow active topics\r\n";
topics_activity->name = "topics";
topics_activity->handler = &topics_activity_handler;
array_push(activities, topics_activity);
Activity* my_topics_activity = malloc(sizeof(Activity*));
my_topics_activity->help_message = ":: /my_topics\tShow subscriptions\r\n";
my_topics_activity->name = "my_topics";
my_topics_activity->handler = &my_topics_activity_handler;
array_push(activities, my_topics_activity);
Activity* add_topic_activity = malloc(sizeof(Activity*));
add_topic_activity->help_message =
":: /add_topic <topic_name>\tAdd a new topic\r\n";
add_topic_activity->name = "add_topic";
add_topic_activity->handler = &add_topic_activity_handler;
array_push(activities, add_topic_activity);
Activity* remove_topic_activity = malloc(sizeof(Activity*));
remove_topic_activity->help_message =
":: /remove_topic <topic_name>\tRemove an existing topic\r\n";
remove_topic_activity->name = "remove_topic";
remove_topic_activity->handler = &remove_topic_activity_handler;
array_push(activities, remove_topic_activity);
Activity* subscribe_activity = malloc(sizeof(Activity*));
subscribe_activity->help_message =
":: /subscribe <topic_name>\tSubscribe to a topic\r\n";
subscribe_activity->name = "subscribe";
subscribe_activity->handler = &subscribe_activity_handler;
array_push(activities, subscribe_activity);
Activity* unsubscribe_activity = malloc(sizeof(Activity*));
unsubscribe_activity->help_message =
":: /unsubscribe <topic_name>\tUnsubscribe from a topic\r\n";
unsubscribe_activity->name = "unsubscribe";
unsubscribe_activity->handler = &unsubscribe_activity_handler;
array_push(activities, unsubscribe_activity);
Activity* msg_topic_activity = malloc(sizeof(Activity*));
msg_topic_activity->help_message =
":: /msg_topic <topic_name> <message>\tPost a message "
"to subscribers "
"of a topic\r\n";
msg_topic_activity->name = "msg_topic";
msg_topic_activity->handler = &msg_topic_activity_handler;
array_push(activities, msg_topic_activity);
Activity* help_activity = malloc(sizeof(Activity*));
help_activity->help_message = ":: /help\tShow help\r\n";
help_activity->name = "help";
help_activity->handler = &help_activity_handler;
array_push(activities, help_activity);
}
/* Topics */
bool matcher_topic_name(void* topic_name, void* target) {
return !strcmp((char*)topic_name, (char*)target);
}
bool matcher_topic(void* topic, void* target) {
return matcher_topic_name(((Topic*)topic)->name, (char*)target);
}
Topic* topic_new(Client* client, char* topic_name) {
Topic* topic = (Topic*)malloc(sizeof(Topic));
memcpy(topic->name, topic_name, strlen(topic_name));
memcpy(topic->owner_id, client->uuid, UUID_STR_LEN);
topic->last_at_zero_subs = time(NULL);
topic->subs = 0;
return topic;
}
void topic_remove_sub(Topic* topic) {
topic->subs--;
if (topic->subs == 0) {
topic->last_at_zero_subs = time(NULL);
}
}
void topic_add_sub(Topic* topic) {
topic->subs++;
topic->last_at_zero_subs = -1;
}
void topic_subscribe(Client* client, char* topic_name, char* buffer) {
// Check if topic already in this client's subscribed topics
if (array_includes(client->topics, matcher_topic_name, topic_name)) {
sprintf(buffer, ":: you are already subscribed to %s\r\n", topic_name);
send_message_self(client, buffer);
return;
}
pthread_mutex_lock(&topics_mutex);
// Find the topic so we can update its subscriber count TODO: optimize
for (int i = 0; i < topics->len; i++) {
if (matcher_topic(topics->state[i], topic_name)) {
Topic* topic = ((Topic*)topics->state[i]);
array_push(client->topics, topic_name);
topic_add_sub(topic);
sprintf(buffer, ":: subscribed to %s\r\n", topic_name);
send_message_self(client, buffer);
pthread_mutex_unlock(&topics_mutex);
return;
}
}
// Topic not in topics list
pthread_mutex_unlock(&topics_mutex);
sprintf(buffer, ":: topic %s does not exist\r\n", topic_name);
send_message_self(client, buffer);
}
void topic_unsubscribe(Client* client, char* topic_name, char* buffer) {
int idx = array_find(client->topics, matcher_topic_name, topic_name);
if (idx != -1) {
int topic_idx = array_find(topics, matcher_topic, topic_name);
Topic* topic = ((Topic*)topics->state[topic_idx]);
topic_remove_sub(topic);
array_remove(client->topics, idx);
sprintf(buffer, ":: unsubscribed from %s\r\n", topic_name);
} else {
sprintf(buffer, ":: you are not subscribed to %s\r\n", topic_name);
}
send_message_self(client, buffer);
}
void* topic_cleanup_routine(void* arg) {
while (true) {
pthread_mutex_lock(&topics_mutex);
// Check each topic to see whether it
// 1) has no subs and
// 2) has not had subs for longer than TOPIC_CLEANUP_INTERVAL_SECONDS
for (int i = 0; i < topics->len; i++) {
Topic* topic = ((Topic*)topics->state[i]);
time_t now = time(NULL);
if (topic->subs == 0 && difftime(now, topic->last_at_zero_subs) >
TOPIC_CLEANUP_INTERVAL_SECONDS) {
array_remove(topics, i);
}
}
pthread_mutex_unlock(&topics_mutex);
send_message_all(fmt_str(":: stale topics have been removed. next update "
"in %d minutes\r\n",
TOPIC_CLEANUP_INTERVAL_SECONDS / 60),
NULL);
sleep(TOPIC_CLEANUP_INTERVAL_SECONDS);
}
}
/* Clients */
void client_register(Client* client) {
pthread_mutex_lock(&clients_mutex);
for (unsigned int i = 0; i < MAX_CLIENTS; i++) {
if (!clients[i]) {
clients[i] = client;
break;
}
}
pthread_mutex_unlock(&clients_mutex);
}
void client_unregister(Client* client) {
pthread_mutex_lock(&clients_mutex);
for (unsigned int i = 0; i < MAX_CLIENTS; i++) {
if (clients[i]->uuid == client->uuid) {
clients[i] = NULL;
break;
}
}
pthread_mutex_unlock(&clients_mutex);
}
void client_signoff(Client* client, char* out_buffer) {
sprintf(out_buffer, ":: %s has left\r\n", client->nick);
send_message_all(out_buffer, NULL);
close(client->fd);
// Unsubscribe client from all topics (TODO: store pointers in client)
for (int i = 0; i < client->topics->len; i++) {
char* topic_name = (char*)client->topics->state[i];
int topic_idx = array_find(topics, matcher_topic, topic_name);
if (topic_idx != -1) {
Topic* topic = (Topic*)topics->state[topic_idx];
topic_remove_sub(topic);
}
}
client_unregister(client);
array_free(client->topics);
free(client);
client_count--;
}
void* client_routine(void* arg) {
char out_buffer[BUFFER_SIZE];
char input_buf[BUFFER_SIZE / 2];
int read_len;
client_count++;
Client* client = (Client*)arg;
sprintf(out_buffer, ":: %s has joined\r\n", client->nick);
send_message_all(out_buffer, NULL);
// Begin receiving input from client
while ((read_len = read(client->fd, input_buf, sizeof(input_buf) - 1)) > 0) {
input_buf[read_len] = NULL_TERMINATOR;
out_buffer[0] = NULL_TERMINATOR;
if (!strlen(input_buf)) {
continue;
}
strip_newline(input_buf);
if (input_buf[0] == META_COMMAND_PREFIX) {
if (process_meta_command(client, input_buf, out_buffer)) {
break;
}
} else {
// just forward the text to the global chat
send_message_all(input_buf, NULL);
}
}
client_signoff(client, out_buffer);
pthread_detach(pthread_self());
return NULL;
}
/* Messaging */
void send_message_self(Client* client, char* buffer) {
if (write(client->fd, buffer, strlen(buffer)) < 0) {
perror("write to fd failed");
}
}
void send_message_client(char* uuid, char* buffer) {
pthread_mutex_lock(&clients_mutex);
for (unsigned int i = 0; i < MAX_CLIENTS; i++) {
if (clients[i] && !strcmp(clients[i]->uuid, uuid)) {
send_message_self(clients[i], buffer);
}
}
pthread_mutex_unlock(&clients_mutex);
}
void send_message_all(char* buffer, char* topic) {
pthread_mutex_lock(&clients_mutex);
for (unsigned int i = 0; i < MAX_CLIENTS; i++) {
if (clients[i]) {
// If a topic was specified, only send to clients subscribed to that topic
if (topic &&
!array_includes(clients[i]->topics, matcher_topic_name, topic)) {
continue;
}
if (write(clients[i]->fd, buffer, strlen(buffer)) < 0) {
perror("write to fd failed");
break;
}
}
}
pthread_mutex_unlock(&clients_mutex);
}
bool process_meta_command(Client* client, char* input_buf, char* out_buffer) {
char* command = strtok(input_buf, " ");
command++;
int idx = array_find(activities, matcher_activity_name, command);
if (idx != -1) {
Activity* activity = (Activity*)activities->state[idx];
return activity->handler(client, out_buffer);
}
sprintf(out_buffer, ":: Unknown meta command: %s\r\n", command);
send_message_self(client, out_buffer);
// Continue loop
return false;
}
/* Networking */
Client* start_client(int listener, struct sockaddr_in client_addr,
const char* nick) {
socklen_t client_len = sizeof(client_addr);
int fd = accept(listener, (struct sockaddr*)&client_addr, &client_len);
if ((client_count + 1) == MAX_CLIENTS) {
close(fd);
return NULL;
}
Client* client = (Client*)malloc(sizeof(Client));
client->addr = client_addr;
client->fd = fd;
uuid_t bin_uuid;
uuid_generate_random(bin_uuid);
uuid_unparse(bin_uuid, client->uuid);
memcpy(client->nick, nick, strlen(nick));
client->topics = array_init();
return client;
}
int start_server() {
int listener = socket(AF_INET, SOCK_STREAM, 0);
// lose the pesky "Address already in use" error message (thanks, Beej)
if (setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) {
perror("setsockopt");
exit(1);
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(SERVER_PORT);
if (bind(listener, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
perror("bind");
exit(1);
}
if (listen(listener, 10) < 0) {
perror("listen");
exit(1);
}
return listener;
}
/* Main */
int main(int argc, char* argv[]) {
if (argc < 2) {
printf("must supply nick\n");
exit(1);
}
struct sockaddr_in client_addr;
pthread_t thread_id;
pthread_t cleanup_thread_id;
char* nick = argv[1];
signal(SIGPIPE, SIG_IGN);
int listener = start_server();
topics = array_init();
register_activities();
// Start cleanup of topics
pthread_create(&cleanup_thread_id, NULL, &topic_cleanup_routine, NULL);
while (true) {
Client* client = start_client(listener, client_addr, nick);
if (client == NULL) {
continue;
}
client_register(client);
pthread_create(&thread_id, NULL, &client_routine, (void*)client);
sleep(1);
}
return EXIT_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment