Last active
January 12, 2022 09:38
-
-
Save songtianlun/3d844cbb83a10e3a664dd2303c9948f7 to your computer and use it in GitHub Desktop.
一个 python 实现的 QMP 客户端,用于向 QEMU 发送 QMP 指令,(借鉴了 PVE 的 QMP Client 实现)。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# coding=utf8 | |
""" | |
# Author: songtianlun | |
# mail: songtianlun@frytea.com | |
# Created Time : 2022-01-07 15:06:17 | |
# License: GPL-2.0-only | |
# Description: | |
使用 selector 实现 I/O 多路复用,可并行向不同虚拟机发送 qmp,qga 命令; | |
注: QEMU 只能处理一条连接,因此我们应当尽快关闭连接。 | |
使用方法: | |
方法一(同步):使用 cmd() 方法快速执行一条命令,并获取返回结果; | |
方法二(异步):使用 queue_cmd() 方法将命令加入队列,指定回调函数,最后使用 queue_execute() 方法一次性执行完毕。 | |
""" | |
import json | |
import os | |
import selectors | |
import re | |
import socket | |
import time | |
import ast | |
from flask import current_app | |
from mimic_daemon_server.qemu_servers.helpers import Helpers | |
from mimic_daemon_server.cluster.common import IPCC | |
class QMPClient: | |
def __init__(self, eventcb=None): | |
self.sel = selectors.DefaultSelector() | |
self.queue_lookup = {} # 记录已打开的 fd 的连接信息 | |
self.queue_info = {} # qmp 消息队列,区别于局部使用的 queue_info | |
self.cmdid_seq = 0 | |
self.cmdid_seq_qga = 0 | |
if eventcb: | |
self.eventcb = eventcb | |
self.qga_allow_close_cmds = { | |
'guest-shutdown': 1, | |
'guest-suspend-ram': 1, | |
'guest-suspend-disk': 1, | |
'guest-suspend-hybrid': 1, | |
} | |
def __del__(self): | |
self.sel.close() | |
# 有 fd 响应时被调用 | |
def sel_input(self, conn, mask): | |
# current_app.logger.debug('selectors input is called.') | |
queue_info = self.lookup_queue_info(conn, ) | |
if not queue_info: | |
return | |
sname = queue_info.get('sname') | |
vmid = queue_info.get('vmid') | |
qga = queue_info.get('qga') | |
curcmd = queue_info.get('current') | |
if not curcmd: | |
raise Exception('unable to lookup current command with VM {} ({})'.format(vmid, sname)) | |
raw = None | |
recvs = None | |
try: | |
recvs = conn.recv(4096).decode(encoding='utf-8') # 一次接收的最大数据量设置太小可能导致返回值获取不全 | |
# current_app.logger.debug("recv: '{}'".format(recvs)) | |
except Exception as err: | |
current_app.logger.error("qmp recv err: {}".format(err)) | |
if qga: | |
recvs_patten = re.compile(r'^.*\xff([^\n]+}\r?\n[^\n]+})\r?\n(.*)$') | |
else: | |
recvs_patten = re.compile(r'^(.*})\r?\n(.*)$') | |
if not recvs_patten.match(recvs): | |
return | |
raw = recvs_patten.match(recvs).group(1) | |
# current_app.logger.debug('QMP - re sub raw: {}'.format(raw)) | |
try: | |
jsons = raw.split('\n') | |
if qga: | |
if len(jsons) != 2: | |
raise Exception("response is not complete") | |
dect = ast.literal_eval(jsons[0]) | |
cmdid = dect.get('return') | |
if not cmdid: | |
raise Exception("received responsed without command id") | |
if cmdid < curcmd.get('id'): | |
return | |
if curcmd.get('id') == cmdid: | |
raise Exception("got wrong command id '{}' (expected {})".format(cmdid, curcmd.get('id'))) | |
queue_info.pop('current') | |
dect = ast.literal_eval(jsons[1]) | |
if curcmd.get('callback'): | |
curcmd.get('callback')(vmid, dect) | |
# current_app.logger.debug('qga called guest callback') | |
return | |
for craw in jsons: | |
# current_app.logger.debug("QMP - complete '{}' type '{}'".format(craw, type(craw))) | |
try: | |
dect = json.loads(craw) | |
except Exception as err: | |
current_app.logger.error("failed loads str({}) to obj with error:{}".format(craw, err)) | |
continue | |
if dect.get('QMP'): | |
continue | |
if dect.get('error') and dect['error'].get('desc'): | |
desc = dect['error'].get('desc') | |
desc = desc.rstrip('\n') | |
if re.compile(r'Connection can not be completed immediately').match(desc): | |
raise Exception(desc) | |
current_app.logger.warn('QMP - skip an qmp error mes: {}'.format(desc)) | |
continue | |
if dect.get('event'): | |
if self.eventcb: | |
self.eventcb(dect) | |
continue | |
cmdid = dect.get('id') | |
if not cmdid: | |
raise Exception("received responsed without command id") | |
if curcmd.get('id') != cmdid: | |
raise Exception("got wrong command id '{}' (expected {})".format(cmdid, curcmd.get('id'))) | |
if curcmd.get('callback'): | |
curcmd.get('callback')(vmid, dect) | |
queue_info.pop('current') # 确保循环结束后将消息弹出 | |
except Exception as err: | |
queue_info['error'] = err | |
self.check_queue() | |
def push_cmd_to_queue(self, vmid, cmd): | |
if not cmd.get('execute'): | |
raise Exception("no command name specified") | |
if None in cmd.get('arguments'): # QMP 参数必须为对象,因此当传入参数为 None 或含有异常 None 时置为空对象 | |
cmd['arguments'] = {} | |
# current_app.logger.debug('QMP - cmd push to queue: {}'.format(cmd)) | |
execute = cmd.get('execute') | |
qga = 1 if re.compile(r'^guest\-+').match(execute) else 0 | |
sname = Helpers().qmp_socket(vmid, qga) | |
if not self.queue_info.get(sname): | |
self.queue_info[sname] = {'qga': qga, 'vmid': vmid, 'sname': sname, 'cmds': []} | |
self.queue_info[sname]['cmds'].append(cmd) | |
return self.queue_info[sname] | |
def lookup_queue_info(self, fh): | |
queue_info = self.queue_lookup.get(fh) | |
if not queue_info: | |
current_app.logger.warn("internal error - unable to lookup queue info") | |
return | |
return queue_info | |
def check_queue(self): | |
running = 0 | |
for sname in self.queue_info.keys(): | |
queue_info = self.queue_info.get(sname) | |
# current_app.logger.debug("QMP - check {}".format(sname)) | |
fh = queue_info.get('fh') | |
if not fh: | |
continue | |
qga = queue_info.get('qga') | |
if queue_info.get('error'): | |
self.close_connection(queue_info) | |
continue | |
if queue_info.get('current'): | |
running += 1 | |
continue | |
if not len(queue_info.get('cmds')): | |
self.close_connection(queue_info) | |
continue | |
try: | |
cmd = queue_info['current'] = queue_info['cmds'].pop(0) # 从列头取元素 | |
cmd['id'] = self.next_cmdid(qga) | |
fd = -1 | |
if cmd.get('execute') == 'add-fd' or cmd.get('execute') == 'getfd': | |
fd = cmd.get('arguments').get('fd') | |
qmpcmd = None | |
if qga: | |
qmpcmd = str({'execute': 'guest-sync-delimited', | |
'arguments': {'id': int(cmd.get('id'))}}) + "\n" + \ | |
str({'execute': cmd.get('execute'), | |
'arguments': cmd.get('arguments')}) + "\n" | |
else: | |
qmpcmd = json.dumps({'execute': cmd.get('execute'), | |
'arguments': cmd.get('arguments'), | |
'id': cmd.get('id')}) | |
# current_app.logger.debug("QMP - run cmd {}".format(qmpcmd)) | |
if fd >= 0: | |
ret = IPCC.sendfd(fh, fd, qmpcmd) # TODO fileno(fh) 暂无测试 | |
if ret < 0: | |
raise Exception('sendfd failed') | |
else: | |
self.queue_info[sname].get('fh').sendall(bytes(qmpcmd, encoding="utf8")) | |
running += 1 | |
except Exception as err: | |
queue_info['error'] = err | |
if not running: | |
# 退出,或中断循环 | |
# 由于使用同步的多路复用 | |
# 这里无需中断 | |
pass | |
# current_app.logger.debug("QMP - current running {}".format(running)) | |
return running | |
def loop(self, timeout): | |
""" | |
:param timeout: | |
:return: | |
:desc: | |
主循环, 监听处理 IO 事件 | |
监听到消息时回调并退出,有需要时再次调用监听。 | |
""" | |
while True: | |
for key, mask in self.sel.select(timeout=timeout): | |
callback = key.data | |
callback(key.fileobj, mask) | |
break | |
break | |
def queue_execute(self, timeout, noerr): | |
""" | |
:param timeout:int: | |
:param noerr:int: | |
:return: | |
:desc: | |
执行消息队列中的所有命令。 | |
""" | |
if not timeout: | |
timeout = 3 | |
for sname in self.queue_info.keys(): | |
quene_info = self.queue_info.get(sname) | |
if not len(quene_info.get('cmds')): | |
continue | |
quene_info['error'] = None | |
quene_info['current'] = None | |
try: | |
# current_app.logger.debug("QMP - open {}".format(sname)) | |
self.open_connection(quene_info, timeout) | |
if not quene_info.get('qga'): | |
cap_cmd = {'execute': 'qmp_capabilities', 'arguments': {}} | |
quene_info['cmds'].insert(0, cap_cmd) # 插入列表开头,必须先发该命令再发其他命令 | |
except Exception as err: | |
quene_info['error'] = err | |
running = None | |
while True: | |
running = self.check_queue() | |
if not running: | |
break | |
time.sleep(0.1) # sleep 100ms | |
self.loop(timeout) | |
errors = '' | |
for sname in self.queue_info.keys(): | |
quene_info = self.queue_info[sname] | |
# current_app.logger.debug("QMP - close {}".format(sname)) | |
self.close_connection(quene_info) | |
if quene_info.get('error'): | |
if noerr: | |
if noerr < 2: | |
current_app.logger.warn(quene_info.get('error')) | |
else: | |
errors += quene_info.get('error') | |
self.queue_info.clear() | |
self.queue_lookup.clear() | |
if errors: | |
raise Exception(errors) | |
def open_connection(self, cmd_info, timeout=None): | |
if cmd_info.get('fh'): | |
raise Exception('duplicate call to open') | |
vmid = cmd_info.get('vmid') | |
qga = cmd_info.get('qga') | |
sname = Helpers().qmp_socket(vmid, qga) | |
if not timeout: | |
timeout = 1 | |
fh = None | |
sotype = 'qga' if qga else 'qmp' | |
# 阻塞至连接成功或超时 | |
try: | |
fh = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
fh.settimeout(timeout) | |
fh.connect(sname) | |
except Exception as err: | |
raise Exception("unable to connect to VM {} {} socket - {}" | |
.format(vmid, sotype, err)) | |
cmd_info['fh'] = fh | |
self.queue_lookup[fh] = cmd_info | |
self.sel.register(fh, selectors.EVENT_READ, self.sel_input) | |
def close_connection(self, cmd_info): | |
fh = cmd_info.get('fh') | |
if fh: | |
self.queue_lookup.pop(fh) | |
self.sel.unregister(fh) | |
cmd_info.pop('fh') | |
def queue_cmd(self, vmid, callback, execute, *params): | |
cmd = { | |
'execute': execute, | |
'arguments': params, | |
'callback': callback | |
} | |
self.push_cmd_to_queue(vmid, cmd) | |
return | |
def cmd(self, vmid, cmd, timeout): | |
""" | |
:param vmid: | |
:param cmd: | |
:param timeout: | |
:return: | |
:desc: | |
执行一条命令 | |
""" | |
result = {} | |
def callback(cvmid, resp): | |
nonlocal result | |
result = resp.get('return') | |
if result and resp.get('error'): | |
result = {'error': resp.get('error')} | |
# current_app.logger.debug(result) | |
if not (cmd and cmd.get('execute')): | |
raise Exception('no command specified') | |
cmd['callback'] = callback | |
if cmd.get('arguments') is None: | |
cmd['arguments'] = {} | |
# current_app.logger.debug("qmp run with single cmd: {}".format(cmd)) | |
queue_info = self.push_cmd_to_queue(vmid, cmd) | |
if not timeout: | |
if cmd.get('execute') == 'query-migrate': | |
timeout = 60 * 60 # 1 hour | |
elif re.compile(r'^(eject|change)').match(cmd.get('execute')): | |
timeout = 60 # cd rom 挂载命令执行较慢 | |
elif cmd.get('execute') == 'guest-fsfreeze-freeze': | |
# 冻结所有 guest FS 同步,如果我们杀死它,它很可能保持在一个不可冻结的锁定状态,所以使用一个较长的超时 | |
timeout = 60 * 60 # 1 hour | |
elif cmd.get('execute') == 'guest-fsfreeze-thaw': | |
# thaw 没有长时间阻塞操作,要么立即返回,要么永远不会(死锁) | |
timeout = 10 | |
elif cmd.get('execute') == 'savevm-start' or \ | |
cmd.get('execute') == 'savevm-end' or \ | |
cmd.get('execute') == 'query-backup' or \ | |
cmd.get('execute') == 'query-block-jobs' or \ | |
cmd.get('execute') == 'block-job-cancel' or \ | |
cmd.get('execute') == 'block-job-complete' or \ | |
cmd.get('execute') == 'backup-cancel' or \ | |
cmd.get('execute') == 'query-savevm' or \ | |
cmd.get('execute') == 'delete-drive-snapshot' or \ | |
cmd.get('execute') == 'guest-shutdown' or \ | |
cmd.get('execute') == 'blockdev-snapshot-internal-sync' or \ | |
cmd.get('execute') == 'blockdev-snapshot-delete-internal-sync' or \ | |
cmd.get('execute') == 'snapshot-drive': | |
timeout = 10 * 60 | |
else: | |
timeout = 3 | |
self.queue_execute(timeout, 2) | |
if queue_info.get('error'): | |
raise Exception("VM {} qmp command '{}' failed - {}" | |
.format(vmid, cmd.get('execute'), queue_info.get('error'))) | |
return result | |
def next_cmdid(self, qga): | |
if qga: | |
self.cmdid_seq_qga += 1 | |
return "{}0{}".format(os.getpid(), self.cmdid_seq_qga) | |
else: | |
self.cmdid_seq += 1 | |
return "{}0{}".format(os.getpid(), self.cmdid_seq) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment