Skip to content

Instantly share code, notes, and snippets.

@songtianlun
Last active January 12, 2022 09:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save songtianlun/3d844cbb83a10e3a664dd2303c9948f7 to your computer and use it in GitHub Desktop.
Save songtianlun/3d844cbb83a10e3a664dd2303c9948f7 to your computer and use it in GitHub Desktop.
一个 python 实现的 QMP 客户端,用于向 QEMU 发送 QMP 指令,(借鉴了 PVE 的 QMP Client 实现)。
#!/usr/bin/python
# coding=utf8
"""
# Author: songtianlun
# mail: songtianlun@frytea.com
# Created Time : 2022-01-07 15:06:17
# License: GPL-2.0-only
# Description:
使用 selector 实现 I/O 多路复用,可并行向不同虚拟机发送 qmp,qga 命令;
注: QEMU 只能处理一条连接,因此我们应当尽快关闭连接。
使用方法:
方法一(同步):使用 cmd() 方法快速执行一条命令,并获取返回结果;
方法二(异步):使用 queue_cmd() 方法将命令加入队列,指定回调函数,最后使用 queue_execute() 方法一次性执行完毕。
"""
import json
import os
import selectors
import re
import socket
import time
import ast
from flask import current_app
from mimic_daemon_server.qemu_servers.helpers import Helpers
from mimic_daemon_server.cluster.common import IPCC
class QMPClient:
def __init__(self, eventcb=None):
self.sel = selectors.DefaultSelector()
self.queue_lookup = {} # 记录已打开的 fd 的连接信息
self.queue_info = {} # qmp 消息队列,区别于局部使用的 queue_info
self.cmdid_seq = 0
self.cmdid_seq_qga = 0
if eventcb:
self.eventcb = eventcb
self.qga_allow_close_cmds = {
'guest-shutdown': 1,
'guest-suspend-ram': 1,
'guest-suspend-disk': 1,
'guest-suspend-hybrid': 1,
}
def __del__(self):
self.sel.close()
# 有 fd 响应时被调用
def sel_input(self, conn, mask):
# current_app.logger.debug('selectors input is called.')
queue_info = self.lookup_queue_info(conn, )
if not queue_info:
return
sname = queue_info.get('sname')
vmid = queue_info.get('vmid')
qga = queue_info.get('qga')
curcmd = queue_info.get('current')
if not curcmd:
raise Exception('unable to lookup current command with VM {} ({})'.format(vmid, sname))
raw = None
recvs = None
try:
recvs = conn.recv(4096).decode(encoding='utf-8') # 一次接收的最大数据量设置太小可能导致返回值获取不全
# current_app.logger.debug("recv: '{}'".format(recvs))
except Exception as err:
current_app.logger.error("qmp recv err: {}".format(err))
if qga:
recvs_patten = re.compile(r'^.*\xff([^\n]+}\r?\n[^\n]+})\r?\n(.*)$')
else:
recvs_patten = re.compile(r'^(.*})\r?\n(.*)$')
if not recvs_patten.match(recvs):
return
raw = recvs_patten.match(recvs).group(1)
# current_app.logger.debug('QMP - re sub raw: {}'.format(raw))
try:
jsons = raw.split('\n')
if qga:
if len(jsons) != 2:
raise Exception("response is not complete")
dect = ast.literal_eval(jsons[0])
cmdid = dect.get('return')
if not cmdid:
raise Exception("received responsed without command id")
if cmdid < curcmd.get('id'):
return
if curcmd.get('id') == cmdid:
raise Exception("got wrong command id '{}' (expected {})".format(cmdid, curcmd.get('id')))
queue_info.pop('current')
dect = ast.literal_eval(jsons[1])
if curcmd.get('callback'):
curcmd.get('callback')(vmid, dect)
# current_app.logger.debug('qga called guest callback')
return
for craw in jsons:
# current_app.logger.debug("QMP - complete '{}' type '{}'".format(craw, type(craw)))
try:
dect = json.loads(craw)
except Exception as err:
current_app.logger.error("failed loads str({}) to obj with error:{}".format(craw, err))
continue
if dect.get('QMP'):
continue
if dect.get('error') and dect['error'].get('desc'):
desc = dect['error'].get('desc')
desc = desc.rstrip('\n')
if re.compile(r'Connection can not be completed immediately').match(desc):
raise Exception(desc)
current_app.logger.warn('QMP - skip an qmp error mes: {}'.format(desc))
continue
if dect.get('event'):
if self.eventcb:
self.eventcb(dect)
continue
cmdid = dect.get('id')
if not cmdid:
raise Exception("received responsed without command id")
if curcmd.get('id') != cmdid:
raise Exception("got wrong command id '{}' (expected {})".format(cmdid, curcmd.get('id')))
if curcmd.get('callback'):
curcmd.get('callback')(vmid, dect)
queue_info.pop('current') # 确保循环结束后将消息弹出
except Exception as err:
queue_info['error'] = err
self.check_queue()
def push_cmd_to_queue(self, vmid, cmd):
if not cmd.get('execute'):
raise Exception("no command name specified")
if None in cmd.get('arguments'): # QMP 参数必须为对象,因此当传入参数为 None 或含有异常 None 时置为空对象
cmd['arguments'] = {}
# current_app.logger.debug('QMP - cmd push to queue: {}'.format(cmd))
execute = cmd.get('execute')
qga = 1 if re.compile(r'^guest\-+').match(execute) else 0
sname = Helpers().qmp_socket(vmid, qga)
if not self.queue_info.get(sname):
self.queue_info[sname] = {'qga': qga, 'vmid': vmid, 'sname': sname, 'cmds': []}
self.queue_info[sname]['cmds'].append(cmd)
return self.queue_info[sname]
def lookup_queue_info(self, fh):
queue_info = self.queue_lookup.get(fh)
if not queue_info:
current_app.logger.warn("internal error - unable to lookup queue info")
return
return queue_info
def check_queue(self):
running = 0
for sname in self.queue_info.keys():
queue_info = self.queue_info.get(sname)
# current_app.logger.debug("QMP - check {}".format(sname))
fh = queue_info.get('fh')
if not fh:
continue
qga = queue_info.get('qga')
if queue_info.get('error'):
self.close_connection(queue_info)
continue
if queue_info.get('current'):
running += 1
continue
if not len(queue_info.get('cmds')):
self.close_connection(queue_info)
continue
try:
cmd = queue_info['current'] = queue_info['cmds'].pop(0) # 从列头取元素
cmd['id'] = self.next_cmdid(qga)
fd = -1
if cmd.get('execute') == 'add-fd' or cmd.get('execute') == 'getfd':
fd = cmd.get('arguments').get('fd')
qmpcmd = None
if qga:
qmpcmd = str({'execute': 'guest-sync-delimited',
'arguments': {'id': int(cmd.get('id'))}}) + "\n" + \
str({'execute': cmd.get('execute'),
'arguments': cmd.get('arguments')}) + "\n"
else:
qmpcmd = json.dumps({'execute': cmd.get('execute'),
'arguments': cmd.get('arguments'),
'id': cmd.get('id')})
# current_app.logger.debug("QMP - run cmd {}".format(qmpcmd))
if fd >= 0:
ret = IPCC.sendfd(fh, fd, qmpcmd) # TODO fileno(fh) 暂无测试
if ret < 0:
raise Exception('sendfd failed')
else:
self.queue_info[sname].get('fh').sendall(bytes(qmpcmd, encoding="utf8"))
running += 1
except Exception as err:
queue_info['error'] = err
if not running:
# 退出,或中断循环
# 由于使用同步的多路复用
# 这里无需中断
pass
# current_app.logger.debug("QMP - current running {}".format(running))
return running
def loop(self, timeout):
"""
:param timeout:
:return:
:desc:
主循环, 监听处理 IO 事件
监听到消息时回调并退出,有需要时再次调用监听。
"""
while True:
for key, mask in self.sel.select(timeout=timeout):
callback = key.data
callback(key.fileobj, mask)
break
break
def queue_execute(self, timeout, noerr):
"""
:param timeout:int:
:param noerr:int:
:return:
:desc:
执行消息队列中的所有命令。
"""
if not timeout:
timeout = 3
for sname in self.queue_info.keys():
quene_info = self.queue_info.get(sname)
if not len(quene_info.get('cmds')):
continue
quene_info['error'] = None
quene_info['current'] = None
try:
# current_app.logger.debug("QMP - open {}".format(sname))
self.open_connection(quene_info, timeout)
if not quene_info.get('qga'):
cap_cmd = {'execute': 'qmp_capabilities', 'arguments': {}}
quene_info['cmds'].insert(0, cap_cmd) # 插入列表开头,必须先发该命令再发其他命令
except Exception as err:
quene_info['error'] = err
running = None
while True:
running = self.check_queue()
if not running:
break
time.sleep(0.1) # sleep 100ms
self.loop(timeout)
errors = ''
for sname in self.queue_info.keys():
quene_info = self.queue_info[sname]
# current_app.logger.debug("QMP - close {}".format(sname))
self.close_connection(quene_info)
if quene_info.get('error'):
if noerr:
if noerr < 2:
current_app.logger.warn(quene_info.get('error'))
else:
errors += quene_info.get('error')
self.queue_info.clear()
self.queue_lookup.clear()
if errors:
raise Exception(errors)
def open_connection(self, cmd_info, timeout=None):
if cmd_info.get('fh'):
raise Exception('duplicate call to open')
vmid = cmd_info.get('vmid')
qga = cmd_info.get('qga')
sname = Helpers().qmp_socket(vmid, qga)
if not timeout:
timeout = 1
fh = None
sotype = 'qga' if qga else 'qmp'
# 阻塞至连接成功或超时
try:
fh = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
fh.settimeout(timeout)
fh.connect(sname)
except Exception as err:
raise Exception("unable to connect to VM {} {} socket - {}"
.format(vmid, sotype, err))
cmd_info['fh'] = fh
self.queue_lookup[fh] = cmd_info
self.sel.register(fh, selectors.EVENT_READ, self.sel_input)
def close_connection(self, cmd_info):
fh = cmd_info.get('fh')
if fh:
self.queue_lookup.pop(fh)
self.sel.unregister(fh)
cmd_info.pop('fh')
def queue_cmd(self, vmid, callback, execute, *params):
cmd = {
'execute': execute,
'arguments': params,
'callback': callback
}
self.push_cmd_to_queue(vmid, cmd)
return
def cmd(self, vmid, cmd, timeout):
"""
:param vmid:
:param cmd:
:param timeout:
:return:
:desc:
执行一条命令
"""
result = {}
def callback(cvmid, resp):
nonlocal result
result = resp.get('return')
if result and resp.get('error'):
result = {'error': resp.get('error')}
# current_app.logger.debug(result)
if not (cmd and cmd.get('execute')):
raise Exception('no command specified')
cmd['callback'] = callback
if cmd.get('arguments') is None:
cmd['arguments'] = {}
# current_app.logger.debug("qmp run with single cmd: {}".format(cmd))
queue_info = self.push_cmd_to_queue(vmid, cmd)
if not timeout:
if cmd.get('execute') == 'query-migrate':
timeout = 60 * 60 # 1 hour
elif re.compile(r'^(eject|change)').match(cmd.get('execute')):
timeout = 60 # cd rom 挂载命令执行较慢
elif cmd.get('execute') == 'guest-fsfreeze-freeze':
# 冻结所有 guest FS 同步,如果我们杀死它,它很可能保持在一个不可冻结的锁定状态,所以使用一个较长的超时
timeout = 60 * 60 # 1 hour
elif cmd.get('execute') == 'guest-fsfreeze-thaw':
# thaw 没有长时间阻塞操作,要么立即返回,要么永远不会(死锁)
timeout = 10
elif cmd.get('execute') == 'savevm-start' or \
cmd.get('execute') == 'savevm-end' or \
cmd.get('execute') == 'query-backup' or \
cmd.get('execute') == 'query-block-jobs' or \
cmd.get('execute') == 'block-job-cancel' or \
cmd.get('execute') == 'block-job-complete' or \
cmd.get('execute') == 'backup-cancel' or \
cmd.get('execute') == 'query-savevm' or \
cmd.get('execute') == 'delete-drive-snapshot' or \
cmd.get('execute') == 'guest-shutdown' or \
cmd.get('execute') == 'blockdev-snapshot-internal-sync' or \
cmd.get('execute') == 'blockdev-snapshot-delete-internal-sync' or \
cmd.get('execute') == 'snapshot-drive':
timeout = 10 * 60
else:
timeout = 3
self.queue_execute(timeout, 2)
if queue_info.get('error'):
raise Exception("VM {} qmp command '{}' failed - {}"
.format(vmid, cmd.get('execute'), queue_info.get('error')))
return result
def next_cmdid(self, qga):
if qga:
self.cmdid_seq_qga += 1
return "{}0{}".format(os.getpid(), self.cmdid_seq_qga)
else:
self.cmdid_seq += 1
return "{}0{}".format(os.getpid(), self.cmdid_seq)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment