Skip to content

Instantly share code, notes, and snippets.

@mcchae
Last active September 9, 2015 05:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcchae/84849230f243657e0ba3 to your computer and use it in GitHub Desktop.
Save mcchae/84849230f243657e0ba3 to your computer and use it in GitHub Desktop.
udp queue test
/*
* fuq.c
*
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>
#include <syslog.h>
#include "fuq.h"
#define min(x,y) (x < y) ? x : y
//#######################################################################################
pthread_mutex_t lock;
//#######################################################################################
void * fuq_init(uint len_q, uint mtu_size)
{
FUQ *fuq = (FUQ *)calloc(1, sizeof(FUQ));
fuq->_len_q = len_q;
fuq->_mtu_size = mtu_size;
fuq->_qitem_size = sizeof(SADDR) + sizeof(uint) + mtu_size;
fuq->_q = calloc(fuq->_len_q, fuq->_qitem_size);
if (fuq->_q == NULL) {
fuq->_errno = errno;
}
if (pthread_mutex_init(&fuq->_lock, NULL) != 0) {
fuq->_errno = errno;
}
return fuq;
}
//#######################################################################################
void fuq_quit(void *_fuq)
{
FUQ *fuq = (FUQ*)_fuq;
pthread_mutex_destroy(&fuq->_lock);
free(fuq);
}
//#######################################################################################
int fuq_errno(void *_fuq)
{
FUQ *fuq = (FUQ*)_fuq;
pthread_mutex_lock(&fuq->_lock);
int rc = fuq->_errno;
pthread_mutex_unlock(&fuq->_lock);
return rc;
}
//#######################################################################################
int fuq_empty(void *_fuq)
{
FUQ *fuq = (FUQ*)_fuq;
pthread_mutex_lock(&fuq->_lock);
int rc = fuq->_head == fuq->_tail;
pthread_mutex_unlock(&fuq->_lock);
return rc;
}
//#######################################################################################
int fuq_push(void *_fuq, SADDR *other, char *buf, uint16 n)
{
FUQ *fuq = (FUQ*)_fuq;
pthread_mutex_lock(&fuq->_lock);
char * head_item = fuq->_q + (fuq->_qitem_size * fuq->_head);
memcpy(head_item, other, sizeof(SADDR)); head_item += sizeof(SADDR);
memcpy(head_item, &n, sizeof(n)); head_item += sizeof(n);
memcpy(head_item, buf, min(n, fuq->_mtu_size));
++fuq->_head;
if (fuq->_head >= fuq->_len_q)
fuq->_head = 0;
// TODO : if _head == _tail then queue overflow!
if (fuq->_head == fuq->_tail) {
fprintf(stderr,"FutureUdpQueue Full with QLen(%u)\n", fuq->_len_q);
}
int rc = fuq->_head;
pthread_mutex_unlock(&fuq->_lock);
return rc;
}
//#######################################################################################
int fuq_pop(void *_fuq, SADDR *other, char *buf, uint size_buf, uint16 *buf_len)
{
FUQ *fuq = (FUQ*)_fuq;
pthread_mutex_lock(&fuq->_lock);
if (fuq->_head == fuq->_tail)
return 0;
char * tail_item = fuq->_q + (fuq->_qitem_size * fuq->_tail);
memcpy(other, tail_item, sizeof(SADDR)); tail_item += sizeof(SADDR);
memcpy(buf_len, tail_item, sizeof(uint16)); tail_item += sizeof(uint16);
memcpy(buf, tail_item, min(*buf_len, fuq->_mtu_size));
++fuq->_tail;
if (fuq->_tail >= fuq->_len_q)
fuq->_tail = 0;
int rc = fuq->_tail;
pthread_mutex_unlock(&fuq->_lock);
return rc;
}
/*
* fuq.h
*
*/
#ifndef __FUQ_H__
#define __FUQ_H__
#include <pthread.h>
#include <netinet/in.h>
//#######################################################################################
#define MTU_SIZE 1500
//#######################################################################################
typedef unsigned int uint;
typedef unsigned short uint16;
typedef struct sockaddr_in SADDR;
typedef struct _FUQ {
char * _q;
uint _len_q;
uint _qitem_size;
uint _mtu_size;
uint _head;
uint _tail;
int _errno;
pthread_mutex_t _lock;
} FUQ;
//#######################################################################################
extern void * fuq_init(uint len_q, uint mtu_size);
extern void fuq_quit(void *_fuq);
extern int fuq_errno(void *_fuq);
extern int fuq_empty(void *_fuq);
extern int fuq_push(void *_fuq, SADDR *other, char *buf, uint16 n);
extern int fuq_pop(void *_fuq, SADDR *other, char *buf, uint size_buf, uint16 *buf_len);
#endif
INC=-Iinc/
CC=gcc
DEBUG=-g
all: out/libfuq.a out/udp_send out/udp_recv_file out/udp_recv
out/udp_send.o: src/udp_send.c
$(CC) -o $@ $(DEBUG) -c $(INC) $<
out/udp_recv_file.o: src/udp_recv_file.c
$(CC) -o $@ $(DEBUG) -c $(INC) $<
out/udp_recv.o: src/udp_recv.c
$(CC) -o $@ $(DEBUG) -c $(INC) $<
out/fuq.o: src/fuq.c
$(CC) -o $@ $(DEBUG) -c $(INC) $<
out/libfuq.a: out/fuq.o
ar -rcs $@ $<
out/udp_send: src/udp_send.c
$(CC) $(DEBUG) $(INC) -o $@ $<
out/udp_recv_file: src/udp_recv_file.c
$(CC) $(DEBUG) $(INC) -o $@ $<
out/udp_recv: src/udp_recv.c
$(CC) $(DEBUG) $(INC) -o $@ $< out/libfuq.a -lpthread
clean:
rm -f out/*
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include "fuq.h"
//#######################################################################################
void *fuq_pop_main(void *fuq)
{
static uint __s_cnt;
struct sockaddr_in other;
char buf[MTU_SIZE+100];
unsigned short n;
while (1) {
if (fuq_empty(fuq)) {
usleep(10);
continue;
}
fuq_pop(fuq, &other, buf, sizeof(buf), &n);
printf("Received from %s:%d: ",
inet_ntoa(other.sin_addr),
ntohs(other.sin_port));
write(1, buf, n);
write(1, "\n", 1);
fflush(stdout);
++__s_cnt;
if (__s_cnt % 10000 == 0) {
buf[18]='\0';
fprintf(stderr, "[%u] <%s>\n", __s_cnt, buf);
}
}
return NULL;
}
//#######################################################################################
int main(int argc, char* argv[])
{
char buf[MTU_SIZE];
struct sockaddr_in self, other;
int len = sizeof(struct sockaddr_in);
int n, s, port;
unsigned int len_q = 100000;
if (argc != 3) {
fprintf(stderr, "Usage: %s <port> <#len_queue_item>\n", argv[0]);
return 1; }
/* initialize socket */
if ((s=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
perror("socket");
return 1;
}
/* bind to server port */
port = atoi(argv[1]);
memset((char *) &self, 0, sizeof(struct sockaddr_in));
self.sin_family = AF_INET;
self.sin_port = htons(port);
self.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(s, (struct sockaddr *) &self, sizeof(self)) == -1) {
perror("bind");
return 1;
}
len_q = (unsigned int)atoi(argv[2]);
void * fuq;
while(1) {
fuq = fuq_init(len_q, MTU_SIZE);
if (fuq_errno(fuq) == 0) {
fprintf(stderr, "Info: FutureUdpQueue (size=%u) initailized!\n", len_q);
break;
}
fuq_quit(fuq);
len_q /= 2;
if (len_q < 10000) {
fprintf(stderr,"Error: have not enough memory for the FutureUdpQueue\n");
return 1;
}
}
pthread_t tid;
int err = pthread_create(&tid, NULL, &fuq_pop_main, fuq);
if (err != 0) {
fprintf(stderr,"Can't create thread!");
return 2;
}
while ((n = recvfrom(s, buf, MTU_SIZE, 0,
(struct sockaddr *) &other, (socklen_t*)&len)) != -1) {
fuq_push(fuq, &other, buf, n);
}
pthread_join(tid, NULL);
fuq_quit(fuq);
close(s);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#define BUF_SIZE 1024
int main(int argc, char* argv[])
{
char buf[BUF_SIZE];
struct sockaddr_in self, other;
int len = sizeof(struct sockaddr_in);
int n, s, port;
if (argc != 2) {
fprintf(stderr, "Usage: %s <port>\n", argv[0]);
return 1;
}
/* initialize socket */
if ((s=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
perror("socket");
return 1;
}
/* bind to server port */
port = atoi(argv[1]);
memset((char *) &self, 0, sizeof(struct sockaddr_in));
self.sin_family = AF_INET;
self.sin_port = htons(port);
self.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(s, (struct sockaddr *) &self, sizeof(self)) == -1) {
perror("bind");
return 1;
}
while ((n = recvfrom(s, buf, BUF_SIZE, 0,
(struct sockaddr *) &other, (socklen_t*)&len)) != -1) {
printf("Received from %s:%d: ",
inet_ntoa(other.sin_addr),
ntohs(other.sin_port));
write(1, buf, n);
write(1, "\n", 1);
fflush(stdout);
// /* echo back to client */
// sendto(s, buf, n, 0, (struct sockaddr *) &other, len);
}
close(s);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <sys/stat.h>
#include <unistd.h>
#include "fuq.h"
//########################################################################################
int gen_logs(int s, struct sockaddr_in * server, unsigned long long limit, unsigned int usecond)
{
char line[MTU_SIZE+1];
unsigned long long cnt = 0;
int len = sizeof(struct sockaddr_in);
int i;
for (i = 0; i < MTU_SIZE; ++i) line[i] = '#';
line[MTU_SIZE] = '\0';
for (cnt = 0; cnt < limit; ++cnt)
{
sprintf(line, "[%016llu]", cnt);
line[18] = '#';
if (sendto(s, line, MTU_SIZE, 0, (struct sockaddr *) server, len) == -1) {
perror("sendto()");
continue;
}
if (usecond > 0)
usleep(usecond);
}
return cnt;
}
//########################################################################################
void usage(const char *prog, const char * msg)
{
if (msg != NULL && strlen(msg) > 0)
fprintf(stderr, "ERROR:%s\n", msg);
fprintf(stderr, "Usage: %s <host> <port> <num-udp> <usecond-sleep>\n"
" udp gen program.\n",
prog);
exit(1);
}
//########################################################################################
int main(int argc, char *argv[])
{
struct sockaddr_in server;
struct hostent *host;
int s, port;
unsigned long long limit;
unsigned int usecond;
if (argc != 5) {
usage(argv[0], "Invalid parameters.");
}
host = gethostbyname(argv[1]);
if (host == NULL) {
perror("gethostbyname");
return 1;
}
port = atoi(argv[2]);
limit = (unsigned long long)atol(argv[3]);
usecond = (unsigned int)atoi(argv[4]);
/* initialize socket */
if ((s=socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
perror("socket");
return 1;
}
/* initialize server addr */
memset((char *) &server, 0, sizeof(struct sockaddr_in));
server.sin_family = AF_INET;
server.sin_port = htons(port);
server.sin_addr = *((struct in_addr*) host->h_addr);
gen_logs(s, &server, limit, usecond);
close(s);
return 0;
}
#!/bin/bash
LCNT_R=-1
LCNT_E=-1
while [ 1 ];do
NOW=$(date +%Y%m%d-%H%M%S)
CCNT_R=$(netstat -su | sed -ne '/packets received/p' | awk '{print $1}')
CCNT_E=$(netstat -su | sed -ne '/packet receive errors/p' | awk '{print $1}')
if [ $LCNT_R -gt -1 ];then
DIFF_R=$(($CCNT_R - $LCNT_R))
DIFF_E=$(($CCNT_E - $LCNT_E))
echo "[$NOW] $DIFF_R UDP packets received ($DIFF_E errors) during last 1 minutes."
else
echo "[$NOW] starting get UDP packet counts every 1 minutes."
fi
LCNT_R=$CCNT_R
LCNT_E=$CCNT_E
sleep 60
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment