Skip to content

Instantly share code, notes, and snippets.

@ravenxrz
Last active March 5, 2020 04:48
Show Gist options
  • Save ravenxrz/f2d5b6dba8687b54aea540096837179a to your computer and use it in GitHub Desktop.
Save ravenxrz/f2d5b6dba8687b54aea540096837179a to your computer and use it in GitHub Desktop.
验证不同拥塞算法下的拥塞窗口变化
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
% 网络实验lab3-- 拥塞控制算法
% 数据分析脚本
% 适用文件命名规则: {algorithm}_{type}{typevalue}.txt
% 如: cublic_loss2.5%.txt
% author: raven
% date: 2019-12-13
%----------------------------------------------------------
% 数据样本每列参数
% 1 timestamp //时间戳
% 2 saddr:port // 源IP及端口,我的数据是在发送端捕捉的,所以port是固定的8089
% 3 daddr:port // 目的IP及端口
% 4 skb->len // 收到的数据包skb大小,收到的都是ACK包,所以len都比较小。
% 5 snd_nxt // 下一个待发送数据的序列号
% 6 snd_una // 待确认数据的序列号
% 7 snd_cwnd // 拥塞窗口大小
% 8 ssthresh // 慢启动阈值
% 9 snd_wnd // 接收窗口大小
% 10 srtt // smoothed RTT
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
clear;
close all;
clc;
algorithm = 'westwood'; % 算法名,cubic | reno | vegas | westwood
type = 'loss'; % 需要分析的类型 loss | delay | corrupt | duplicate
type_values = {}; % 分析的类型有哪些,如loss类下有3个文件
data = {}; % 字典类型存储数据
data_dir = 'E:\network_course\lab3\Data\'; % 数据存放目录
%% 读取所有数据 --- (不考虑数据量过大超过内存的情况)
% 读data_dir下的包含algorithm的文件名
filelist = dir(strcat(data_dir,algorithm,'*.txt'));
file_num = length(filelist);
file_names = cell(1,file_num);
for i = 1:file_num
file_names{i} = filelist(i).name;
end
% 过滤出指定type的文本
file_names = filter_file(file_names,type);
file_num = length(file_names);
% 读取数据
data= cell(1,file_num);
for i = 1: file_num
data{i} = load(strcat(data_dir,file_names{i}));
temp = data{i};
data{i} = [temp(:,1) temp(:,7) temp(:,8)]; % 只保留第1列(时间戳)第7列(拥塞窗口)和第8列(慢启动阈值)
end
clear temp;
% 读取每种type对应的参数值,如loss2.5% 则参数值为2.5%
type_values = cell(1,file_num);
for i = 1:file_num
temp = strrep(file_names{i},[algorithm '_' type],'');
type_values{i}= strrep(temp,'.txt','');
end
% 读取normal type时的data,用于对比
normal = load(strcat(data_dir,algorithm,'_normal.txt'));
normal = [normal(:,1) normal(:,7) normal(:,8)];
%% 绘制对比图
figure();
blue = [55,126,184]/255;
green = [77,175,74]/255;
plot(normal(:,1),normal(:,2),'-','linewidth',2,'Color',blue);
hold on;
grid on;
plot(normal(:,1),normal(:,3),'-','linewidth',2,'Color',green);
xlabel('timestamp');
ylabel('window size');
set(gca,'xtick',(0:1:41));
xlim([0 41]);
title([algorithm '-' 'normal'],'Interpreter', 'none');
legend('congestion windows','ssthresh');
set(gca,'FontSize',22,'looseInset',[0 0 0 0]); %修改字体大小,裁剪留白
set(gcf, 'position', get(0,'ScreenSize')); %放缩到全屏
saveas(gcf,[data_dir algorithm '-' 'normal.jpg'])
% 绘制剩下file_num个图
for i = 1:file_num
figure();
temp = data{i};
plot(temp(:,1),temp(:,2),'-','linewidth',2,'Color',blue);
hold on;grid on;
plot(temp(:,1),temp(:,3),'-','linewidth',2,'Color',green);
xlabel('timestamp');
ylabel('window size');
set(gca,'xtick',(0:1:41));
xlim([0 41]);
title([algorithm '-' type '-' type_values{i}],'Interpreter', 'none');
legend('congestion windows','ssthresh');
set(gca,'FontSize',22,'looseInset',[0 0 0 0]);
set(gcf, 'position', get(0,'ScreenSize'));
saveas(gcf,[data_dir algorithm '-' type '-' type_values{i} '.jpg'])
end
%% 函数段
function new_file_names = filter_file(file_names,rule)
% 根据rule来过滤file_name
% file_names: 元胞类型
% rule: 字符串类型
file_num = length(file_names);
new_file_names = cell(1,file_num);
counter = 1;
for i = 1: file_num
if ~isequal([],strfind(file_names{i},rule))
disp(['add ' file_names{i}]);
new_file_names{counter} = file_names{i};
counter= counter+1;
end
end
% 删除所有空元素
new_file_names(cellfun(@isempty,new_file_names))=[];
end
# -*- coding: utf-8 -*-
"""
client客户端
"""
import socket
from common import *
import signal
# 当前的共享目录
shared_path = '/media/sf_Data'
# 这里的router_ip相当于socket中的server
router_ip = '192.168.1.2'
# server 的ip
server_ip = '192.168.2.1'
# 需要哪些请求,loss,corrupt,delay等
request_type = ['normal', 'loss', 'delay', 'duplicate', 'corrupt']
#request_type = ['duplicate','corrupt']
def initialize():
"""
初始化:
1. 初始化client自身的拥塞算法
#2. 发送请修改server的拥塞算法 (已舍弃)
:return:
"""
# step 1 修改client自身的拥塞算法
# 注入内核
cmd = 'echo ' + congestion_algorithm + ' >' + 'net.ipv4.tcp_congestion_control'
run(cmd)
# 更改算法
cmd = 'sysctl -w net.ipv4.tcp_congestion_control=' + congestion_algorithm
run(cmd)
# #step2 修改server的拥塞算法
# try:
# s = socket.socket()
# s.connect((server_ip, SERVER_SOCKET_PORT))
# print 'connect server success'
# # 发送修改请求
# s.send(congestion_algorithm)
# stat = s.recv(1024)
# if stat == protocol['ACK_FROM_SERVER']:
# print 'change server congestion_algorithm success'
# else:
# raise Exception('change server congestion_algorithm failed')
# except:
# raise Exception('connect to server socket failed')
# finally:
# s.close()
#run('sysctl -w net.ipv4.tcp_congestion_control=' + congestion_algorithm)
# 卸载tcp_probe
#run('modprobe -r tcp_probe')
# re-install tcp_probe with参数full=1,如果不设置full=1的话,在有些条件下,tcp_probe是没有数据的
#run('modprobe tcp_probe port=5001 full=1')
def kill_cat():
"""
kill 掉cat tcpprobe进程
:return:
"""
out = os.popen("ps aux | grep tcpprobe").read()
try:
for line in out.splitlines():
if '/proc/net/tcpprobe' in line:
pid = int(line.split()[1])
print 'kill ' + str(pid)
os.kill(pid, signal.SIGKILL)
except:
print 'error on kill cat'
def communiate(sock, request):
"""
与server进行通信,调用完一次这个函数,完成一种类型的record(如一组loss率的测量,或一组delay的测量)
:param sock:
:param request:
:return:
"""
# step1 发出type修改请求
sock_send_msg(sock,request)
while True:
# step2 接收server的响应
stat = sock_recv_msg(sock)
if protocol['ACK_FROM_ROUTER'] in stat:
# 成功收到来自server的回复
# step2 解析当前的value,解析只是采用了简单的replace
type_value = stat.replace(protocol['ACK_FROM_ROUTER'], '').replace(' ', '_')
# step3 开始执行测试
cmd = 'cat /proc/net/tcpprobe >' + os.path.join(shared_path,
congestion_algorithm + '_' + request + type_value + '.txt') + ' &'
run(cmd)
cmd='iperf -t 40 -c ' + server_ip
run(cmd)
kill_cat()
# step4 测试完成
sock_send_msg(sock,protocol['ACK_FROM_CLIENT'])
elif stat == protocol['NO_MORE_DATA']:
print 'server no more data'
break
elif stat == protocol['PASS_THIS_TYPE']:
print 'pass this type'
break
else:
raise Exception('error on client communiate')
def socket_to_router():
"""
client连接router的socket
:return:
"""
s = socket.socket()
s.connect((router_ip, ROUTER_SOCKET_PORT))
for request in request_type:
communiate(s, request)
sock_send_msg(s,protocol['REQUEST_OVER'])
s.close()
if __name__ == '__main__':
initialize()
print 'initialize finished'
socket_to_router()
# -*- coding:utf-8 -*-
import os
protocol = {
'ACK_FROM_ROUTER': 'ack_from_router', # ROUTER发送ACK给CLIENT
'ACK_FROM_CLIENT': 'ack_from_client', # CLIENT发送ACK给ROUTER
'ACK_FROM_SERVER': 'ack_from_server', # SERVER发送ACK给CLIENT
'NO_MORE_DATA': 'no more data', # 没有更多的数据可查询标志
'REQUEST_OVER': 'request_over', # 请求结束标志
'PASS_THIS_TYPE': 'pass_this_type' # 数据为空,无法查询标志
}
ROUTER_SOCKET_PORT = 6682
#SERVER_SOCKET_PORT = 7001
# 采用哪种拥塞控制算法
congestion_algorithm = 'westwood' # reno cubic vegas westwood等
def run(cmd):
print cmd
res = os.popen(cmd)
output_str = res.read() # 获得输出字符串
print(output_str)
def sock_send_msg(sock, msg):
sock.send(msg)
print 'send ' + msg
def sock_recv_msg(sock):
msg = sock.recv(1024)
print 'recv ' + msg
return msg
#-*- coding:utf-8 -*-
"""
辅助脚本:程序因为意外崩溃,需要执行该脚本,清理多余cat进程
"""
import os
import signal
def kill_cat():
"""
:return:
"""
out = os.popen("ps aux | grep tcpprobe").read()
print out
try:
for line in out.splitlines():
if '/proc/net/tcpprobe' in line:
pid = int(line.split()[1])
print 'kill '+str(pid)
os.kill(pid, signal.SIGKILL)
except Exception as reason:
print 'error on kill cat ' + reason
if __name__ == '__main__':
kill_cat()
# -*- coding:utf-8 -*-
"""
router
"""
import socket
from common import *
config = {
'normal': [''], # normal 状态,置为空则不测量,置为空字符串为要测量
#'loss': ['2.5%', '10%','40%'],
'loss': ['0.001%','0.01%','1%'],
'delay': ['30ms 10ms','200ms 10ms','1000ms 100ms'], # foramt:'延迟 抖动'
'duplicate': ['1%', '5%', '10%'],
'corrupt': ['0.2%', '1%', '5%']
}
# config = {
# 'normal': [], # normal 状态,置为空则不测量,置为空字符串为要测量
# 'loss': [],
# 'loss': [],
# 'delay': [], # foramt:'延迟 抖动'
# 'duplicate': [],
# 'corrupt': ['1%','20%','30%','40%']
# }
# 网卡名
interface_name = 'dev eth14'
def communiate(sock):
"""
响应一次request,也就是执行完这个函数,config中的一种类型将会记录完成
:param sock:
:return:
"""
# step1 接受来client的请求
type = sock_recv_msg(sock)
print 'type ' + type
# 判定是否已经请求完成
if type == protocol['REQUEST_OVER']:
raise Exception('client exit')
# 判断type是否有效
types = [key for key, value in config.items()]
if type not in types:
raise Exception("input type error")
# 是否需要跳过本次请求
type_num = len(config[type])
if type_num == 0:
sock_send_msg(sock,protocol['PASS_THIS_TYPE'])
return
# normal请求单独处理
if type == 'normal':
run('tc qdisc delete ' + interface_name + ' root netem ')
sock_send_msg(sock,protocol['ACK_FROM_ROUTER'])
if sock_recv_msg(sock) == protocol['ACK_FROM_CLIENT']:
sock_send_msg(sock,protocol['NO_MORE_DATA']) # normal模型下,只有一次测试
else:
raise Exception('router: type normal ack from client error')
return
# type有效, 更新当前策略
for idx in range(type_num):
# step2 修改配置
run(' tc qdisc delete ' + interface_name + ' root netem ')
run(' tc qdisc add ' + interface_name + ' root netem ' + type + ' ' + config[type][idx])
# 打印当前配置
run('tc qdisc')
print 'change config'
# step3 通知client已修改完成,并将当前的配置值返回
sock_send_msg(sock,protocol['ACK_FROM_ROUTER'] + config[type][idx])
# step4 等待client通知
stat = sock_recv_msg(sock)
if stat == protocol['ACK_FROM_CLIENT']: # 表示client已经处理完数据
if idx == type_num - 1:
# 当前已经是最后一个数据,通知client,已经无data可用
sock_send_msg(sock,protocol['NO_MORE_DATA'])
else: # 如果client处理中出现一些问题
pass # 这里可做一些扩展处理,如重新设置状态等
def socket_service():
"""
开启socket服务,等待client连接
:return:
"""
try:
s = socket.socket()
s.bind(('192.168.1.2', ROUTER_SOCKET_PORT))
s.listen(5) # 最大连接数
except socket.error as msg:
print msg
print 'starrt router socket'
sock, _ = s.accept()
print 'client connect'
while True:
try:
communiate(sock)
except Exception as reason:
print reason
break
# close socket to reuse the port
s.close()
if __name__ == '__main__':
socket_service()
# -*- coding:utf-8 -*-
"""
server 端脚本
"""
import socket
from common import *
# def socket_service():
# """
# 开启socket服务,等待client连接,完成功能:
# 1. 等到client发出更改算法请求
# 2. 更改算法
# 3. 发出响应已经更改
# :return:
# """
# try:
# s = socket.socket()
# s.bind(('192.168.2.1', SERVER_SOCKET_PORT))
# s.listen(5) # 最大连接数
# except socket.error as msg:
# print msg
#
# print 'starrt server socket'
#
# # step1 接受连接
# sock, _ = s.accept()
# print 'client connect'
#
# # step2 修改
# congestion_algorithm = sock.recv(1024)
# # 注入内核
# cmd = 'echo ' + congestion_algorithm + ' >' + 'net.ipv4.tcp_congestion_control'
# print cmd
# run(cmd)
# # 更改算法
# cmd = 'sysctl -w net.ipv4.tcp_congestion_control=' + congestion_algorithm
# print cmd
# # 开启iperf进程
# start_iperf_process()
#
# # step3 回执响应
# sock.send(protocol['ACK_FROM_SERVER'])
#
# # close socket to reuse the port
# s.close()
#
# def start_iperf_process():
# """
# 开启iperf_process
# :return:
# """
# os.system('iperf -s -D')
if __name__ == '__main__':
cmd = 'echo ' + congestion_algorithm + ' >' + 'net.ipv4.tcp_congestion_control'
run(cmd)
# 更改算法
cmd = 'sysctl -w net.ipv4.tcp_congestion_control=' + congestion_algorithm
run(cmd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment