Skip to content

Instantly share code, notes, and snippets.

@Ivlyth
Created April 2, 2020 11:15
Show Gist options
  • Save Ivlyth/70802e768589cfecee165054cd17ba83 to your computer and use it in GitHub Desktop.
Save Ivlyth/70802e768589cfecee165054cd17ba83 to your computer and use it in GitHub Desktop.
self-defined simple BPF syntax parser
# -*- coding:utf8 -*-
"""
Author : Myth
Date : 2020/3/16
Email : email4myth at gmail.com
"""
from __future__ import unicode_literals
import sys
import os
import subprocess
import base64
import ipaddress
import re
TEST_PCAP_DATA = b'1MOyoQIABAAAAAAAAAAAAAAABABxAAAA'
TEST_PCAP_FILE = os.path.join(os.path.dirname(__file__), 'test-bpf.pcap')
if not os.path.exists(TEST_PCAP_FILE):
open(TEST_PCAP_FILE, 'wb').write(base64.b64decode(TEST_PCAP_DATA))
class CommandRet(object):
def __init__(self, retcode, stdout, stderr):
self.retcode = retcode
self._stdout = stdout.strip()
self._stderr = stderr.strip()
@property
def success(self):
return self.retcode == 0
def __bool__(self):
return self.success
def __nonzero__(self):
return self.success
@property
def stdout(self):
if isinstance(self._stdout, bytes):
return self._stdout.decode('utf-8', errors='ignore')
return self._stdout
@property
def stderr(self):
if isinstance(self._stderr, bytes):
return self._stderr.decode('utf-8', errors='ignore')
return self._stderr
def run_command(cmd, keep_stdout=True):
if not keep_stdout:
cmd += ' &> /dev/null'
ret = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
retcode = ret.wait()
stdout, stderr = ret.communicate()
return CommandRet(retcode, stdout, stderr)
def test_bpf(bpf):
ret = run_command("tcpdump -r %s -w /tmp/test-bpf.pcap -W 1 -G 0.1 '%s'" % (TEST_PCAP_FILE, bpf))
if not ret.success:
return ret.stderr.splitlines()[-1]
def is_valid_ipv4(host):
try:
ipaddress.IPv4Address(host)
except:
return False
return True
def is_valid_ipv4_net(net):
try:
ipaddress.IPv4Network(net)
except:
return False
return True
def is_valid_ipv6(host):
print 'host is %s' % host
try:
ipaddress.IPv6Address(host)
except:
return False
return True
def is_valid_ipv6_net(net):
try:
ipaddress.IPv6Network(net)
except Exception:
return False
return True
class Protocol(object):
def __init__(self, name, bpf):
self.name = name
self.bpf = bpf
class ProtocolExpr(object):
def __init__(self, token, protocol, negative=False):
self.token = token
self.protocol = protocol
self.negative = negative
def to_bpf(self):
bpf = self.protocol.bpf
if self.negative:
bpf = 'not %s' % bpf
return bpf
class AddressInfo(object):
def __init__(self, host_or_net, net_mask='', is_ipv6=False, ports=()):
self.host_or_net = host_or_net
self.net_mask = net_mask
self.is_ipv6 = is_ipv6
self.ports = ports
def to_bpf(self):
# FIXME
if self.net_mask:
host_or_net_bpf = 'net %s/%s' % (self.host_or_net, self.net_mask)
elif self.host_or_net:
host_or_net_bpf = 'host %s' % self.host_or_net
else:
host_or_net_bpf = ''
ports_bpf_list = []
for port in self.ports:
if isinstance(port, int):
ports_bpf_list.append('port %s' % port)
else:
ports_bpf_list.append('portrange %s' % port)
ports_bpf = ' or '.join(ports_bpf_list)
if len(ports_bpf_list) > 1:
ports_bpf = '(%s)' % ports_bpf
if host_or_net_bpf and ports_bpf:
return '(%s and %s)' % (host_or_net_bpf, ports_bpf)
elif host_or_net_bpf:
return host_or_net_bpf
else:
return ports_bpf
class AddressExpr(object):
'''
可以只有主机或网络, 也可以带端口, 也可以只有端口
'''
def __init__(self, token, negative=False):
self.token = token
self.negative = negative
self.addr_info = self.parse()
def parse(self):
# TODO
expr = self.token.value
last_middle_bracket = expr.rfind(']')
last_colon = expr.rfind(':')
if last_colon == len(expr) - 1:
raise BPFTokenError('冒号后缺少端口信息', self.token.start + last_colon, self.token.end)
ports = []
if last_colon > last_middle_bracket: # maybe ports FIXME 需要精细化标记错误位置
port_expr = expr[last_colon + 1:]
ports_list = port_expr.split(',')
for port in ports_list:
# TODO 严格的范围包含测试
if '-' in port: # range
start, _, end = port.partition('-')
if not start:
raise BPFTokenError('错误的端口范围: 缺少开始端口', self.token.start, self.token.end)
if not start.isdigit():
raise BPFTokenError('错误的端口范围: 端口必须为数字', self.token.start, self.token.end)
if not end:
raise BPFTokenError('错误的端口范围: 缺少结束端口', self.token.start, self.token.end)
if not end.isdigit():
raise BPFTokenError('错误的端口范围: 端口必须为数字', self.token.start, self.token.end)
start = int(start)
end = int(end)
if start < 0 or start > 65535:
raise BPFTokenError('错误的端口范围: 开始端口超出合法范围', self.token.start, self.token.end)
if end < 0 or end > 65535:
raise BPFTokenError('错误的端口范围: 结束端口超出合法范围', self.token.start, self.token.end)
if end < start:
raise BPFTokenError('错误的端口范围: 结束端口不得小于开始端口', self.token.start, self.token.end)
if end == start:
ports.append(start) # int
else:
ports.append(port) # str
else:
if not port.isdigit():
raise BPFTokenError('端口必须为数字', self.token.start, self.token.end)
port = int(port)
if port < 0 or port > 65535:
raise BPFTokenError('端口超出合法范围', self.token.start, self.token.end)
ports.append(port)
host_or_net = expr
if last_colon > last_middle_bracket:
host_or_net = expr[:last_colon]
net_mask = ''
is_ipv6 = False
if host_or_net:
host_or_net, sep, net_mask = host_or_net.partition('/')
if sep: # must have mask
if not host_or_net:
raise BPFTokenError('错误的 IP 地址', self.token.start, self.token.end)
if not net_mask:
raise BPFTokenError('缺少网络掩码', self.token.start, self.token.end)
if not net_mask.isdigit():
raise BPFTokenError('网络掩码应为数字', self.token.start, self.token.end)
if host_or_net[0] == '[': # treat as ipv6
if not is_valid_ipv6(host_or_net[1: -1]):
raise BPFTokenError('错误的 IPv6 地址', self.token.start, self.token.end)
if net_mask and not is_valid_ipv6_net('%s/%s' % (host_or_net[1: -1], net_mask)):
raise BPFTokenError('错误的网络段: %s/%s, 主机位不得被设置' % (host_or_net, net_mask), self.token.start, self.token.end)
host_or_net = host_or_net[1: -1]
is_ipv6 = True
else: # treat as ipv4
if not is_valid_ipv4(host_or_net):
# is ipv6 without bracket ?
if is_valid_ipv6(host_or_net):
raise BPFTokenError('IPv6 应该被包裹在中括号内') # TODO FIXME 需要标记位置信息
else:
raise BPFTokenError('错误的 IPv4 地址', self.token.start, self.token.end)
if net_mask and not is_valid_ipv4_net('%s/%s' % (host_or_net, net_mask)):
raise BPFTokenError('错误的网络段: %s/%s, 主机位不得被设置' % (host_or_net, net_mask), self.token.start, self.token.end)
else:
pass # only ports is valid
return AddressInfo(host_or_net, net_mask, is_ipv6, ports)
def to_bpf(self):
return self.addr_info.to_bpf()
class OPExpr(object):
'''
currently only `AND` and `OR`
'''
def __init__(self, token):
self.token = token
def to_bpf(self):
return self.token.ivalue
class LogicExpr(object):
'''
currently only `NOT`
'''
def __init__(self, token):
self.token = token
def to_bpf(self):
return self.token.ivalue
class GroupExpr(object):
def __init__(self, token):
self.token = token
self.exprs = []
def add(self, expr):
self.exprs.append(expr)
def to_bpf(self):
bpf = ' '.join(e.to_bpf() for e in self.exprs)
if len(self.exprs) > 1:
return '(%s)' % bpf
else:
return bpf
def is_open_bracket(self):
return '(' == self.token.value
PROTOCOLS = [
# protocol_name, bpf
('ip', '(ip and ip6)'),
('ip4', 'ip'),
('ip6', 'ip6'),
('tcp', 'tcp'),
('tcp4', '(ip and tcp)'),
('tcp6', '(ip6 and tcp)'),
('udp', 'udp'),
('udp4', '(ip and udp)'),
('udp6', '(ip6 and udp)'),
('icmp', 'icmp'),
('icmp4', '(ip and icmp)'),
('icmp6', '(ip6 and icmp)'),
]
PROTOCOL_MAP = dict([(name, Protocol(name, bpf)) for name, bpf in PROTOCOLS])
VALID_EXPR = re.compile('[()a-z\-,\s\d!]+', re.IGNORECASE)
BLANK = re.compile('\s+')
class BPFTokenError(Exception):
def __init__(self, message, start=0, end=-1):
super(BPFTokenError, self).__init__(message)
self.start = start
self.end = end
def __str__(self):
return '%s (from %s to %s)' % (self.message, self.start, self.end)
class Token(object):
def __init__(self, start, end, value):
self.start = start
self.end = end
self.value = value
self.ivalue = value.lower()
class Tokenizer(object):
def __init__(self, expr):
self.expr = expr
def __iter__(self):
s = '' # current token
start = current = 0
for i, c in enumerate(iter(self.expr)):
current = i
if BLANK.match(c):
if s:
yield Token(start, current + 1, s)
s = ''
elif c in '()':
if s:
yield Token(start, current + 1, s)
s = ''
yield Token(current, current + 1, c)
elif c in '!':
if s: # 这里也可以兼容, 但还是严格一些比较好
raise BPFTokenError('期待空格, 但是遇到: "!"', current, current + 1)
yield Token(current, current + 1, 'not') # 将用户的 ! 转换为 not 关键字输出, 为后续处理统一标准
else:
if not s: # just the beginning
start = i
s += c
if s:
yield Token(start, current + 1, s)
class Stack(object):
def __init__(self):
self._stack = []
def push(self, item):
self._stack.append(item)
def pop(self):
if not self.is_empty():
return self._stack.pop()
def is_empty(self):
return len(self._stack) == 0
def at_the_top(self):
if not self.is_empty():
return self._stack[-1]
def parse(expr):
expr = expr.strip()
if not expr:
return '' # empty expr
tokenizer = Tokenizer(expr)
exprs = []
pre_expr = None
stack = Stack()
for token in tokenizer:
# print token.value, '-->', tokenizer.expr[token.start: token.end]
expr = create_expr_from_token(token)
if pre_expr:
if isinstance(pre_expr, LogicExpr):
if not isinstance(expr,
(OPExpr, ProtocolExpr, AddressExpr, GroupExpr)): # 这里 GroupExpr 不需区分开始结束, 下边会检测组相关问题
raise BPFTokenError('期待得到 "not", "协议名称" 或者 "地址信息" 或者 "组", 但是得到了: "%s"' % expr.token.value,
expr.token.start,
expr.token.end)
elif isinstance(pre_expr, OPExpr):
if not isinstance(expr, (ProtocolExpr, AddressExpr)):
raise BPFTokenError('期待得到 "协议名称" 或者 "地址信息", 但是得到了: "%s"' % expr.token.value, expr.token.start,
expr.token.end)
elif isinstance(pre_expr, (ProtocolExpr, AddressExpr)):
if not isinstance(expr, (LogicExpr, GroupExpr)):
raise BPFTokenError('期待得到 "and" 或者 "or" 或者 "组", 但是得到了: "%s"' % expr.token.value, expr.token.start,
expr.token.end)
elif isinstance(pre_expr, GroupExpr):
if pre_expr.is_open_bracket():
if not isinstance(expr, (
OPExpr, ProtocolExpr, AddressExpr, GroupExpr)): # 这里 GroupExpr 不需区分开始结束, 下边会检测组相关问题
raise BPFTokenError('期待得到 "not" 或者 "协议名称" 或者 "地址信息" 或者 "组", 但是得到了: "%s"' % expr.token.value,
expr.token.start, expr.token.end)
else:
if not isinstance(expr, (LogicExpr, GroupExpr)): # 这里 GroupExpr 不需区分开始结束, 下边会检测组相关问题
raise BPFTokenError('期待得到 "and" 或者 "or" 或者 "结束组", 但是得到了: "%s"' % expr.token.value,
expr.token.start, expr.token.end)
else:
if not isinstance(expr,
(OPExpr, ProtocolExpr, AddressExpr, GroupExpr)): # 这里 GroupExpr 不需区分开始结束, 下边会检测组相关问题
raise BPFTokenError('期待得到 "not", "协议名称" 或者 "地址信息", 但是得到了: "%s"' % expr.token.value, expr.token.start,
expr.token.end)
pre_expr = expr
# TODO 利用栈来构建组 Group
if isinstance(expr, GroupExpr):
if expr.token.value == '(': # start a new group
exprs.append(expr)
stack.push(expr)
elif expr.token.value == ')': # end a group
if stack.is_empty():
raise BPFTokenError("单独的关闭组", token.start, token.end)
else:
group_expr = stack.pop()
if group_expr.token.value != '(':
raise BPFTokenError("未成对的组", group_expr.token.start, expr.token.end)
if not group_expr.exprs:
raise BPFTokenError("无内容的组", group_expr.token.start, expr.token.end)
else:
if not stack.is_empty():
group = stack.at_the_top()
group.add(expr)
else:
exprs.append(expr)
if not stack.is_empty():
group_expr = stack.pop()
raise BPFTokenError("未成对的组", group_expr.token.start, expr.token.end)
last_expr = exprs[-1]
if not isinstance(last_expr, (GroupExpr, ProtocolExpr, AddressExpr)):
raise BPFTokenError('期待得到 "组", "协议名称" 或者 "地址信息", 但是得到了: "%s"' % last_expr.token.value, last_expr.token.start,
last_expr.token.end)
bpf = ' '.join([expr.to_bpf() for expr in exprs])
err = test_bpf(bpf)
if err:
if err.endswith('expression rejects all packets'):
raise BPFTokenError('错误的表达式, 过滤掉了所有数据')
else:
raise BPFTokenError('错误的表达式: %s' % err)
return bpf
def create_expr_from_token(token):
'''
:param token:
:return:
'''
# group start or end
if token.ivalue in '()':
return GroupExpr(token)
# and / or
if token.ivalue in ('and', 'or'):
return LogicExpr(token)
# not
elif token.ivalue == 'not':
return OPExpr(token)
# protocols
elif token.ivalue in PROTOCOL_MAP:
return ProtocolExpr(token, PROTOCOL_MAP[token.ivalue])
# should be addresses
return AddressExpr(token)
def main():
expr = ' '.join(sys.argv[1:])
if not expr:
expr = 'tcp4 or ( tcp or udp6 and icmp4)and udp6 and !10.0.81.0/24:80-10000,20000-30000,48888,58888 and 10.0.81.48:11111'
print 'expr is "%s"' % expr
try:
bpf = parse(expr)
print 'bpf is "%s"' % bpf
except BPFTokenError as e:
print '============== ERROR =============='
print e
print expr[e.start: e.end]
print '============== ERROR =============='
raise
test_rules = [
'TcP or udp or (10.0.81.0/24:80-10000,20000-30000,48888,58888)',
'ip4 or tcp6',
'tcp and udp',
'tcp4',
'!tcp6',
'!tcp4 and !udp',
'10.0.81.1/24:80',
'10.0.81.0/24:80-99999',
'10.0.81.0/24:80-78',
'10.0.81.0/24:80 and 192.168.0.0/16:80',
'10.0.81.0/24:80 and 192.168.0.0/16:80,90',
'10.0.81.0/24:80 and 192.168.0.0/16:80,90,100',
'10.0.81.9 and 10.0.81.10 or 10.0.81.11:80-80',
'10.0.81.9 and 10.0.81.10 and 10.0.81.11',
':80,443,10000-11000',
':80,8080,3306',
'!:80,8080,3306',
'!10.0.81.0/24:443',
'udp and :8080',
'udp or [240e:e1:f300:1:3::]/120:80-99,101',
'tcp and (10.0.81.0/24 or :8080 or 192.168.1.234)'
]
def test():
for rule in test_rules:
print '================================================================'
print "rule: %s" % rule
try:
bpf = parse(rule)
print "parsed bpf: %s" % bpf
except BPFTokenError as e:
print "Error: %s (%s - %s: '%s')" % (e.message, e.start, e.end, rule[e.start: e.end])
print ''
if __name__ == '__main__':
# main()
test()
'''
使用表达式字符串调用 parse 方法, 如果解析成功, 则返回解析后的 bpf 字符串, 如果解析失败, 则抛出 BPFTokenError,
取其 message 作为错误原因, start 与 end 属性标记了错误出现的位置
错误位置处理:
如果 end=-1, 则仅需要展示错误信息, 不需要高亮某一部分, 否则可以高亮展示给定的区域
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment