Skip to content

Instantly share code, notes, and snippets.

@ptsneves
Created September 23, 2020 14:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ptsneves/9801481e02db23b8b5822b8b3f2f946c to your computer and use it in GitHub Desktop.
Save ptsneves/9801481e02db23b8b5822b8b3f2f946c to your computer and use it in GitHub Desktop.
testimage with nfs boot
# Released under the MIT license (see COPYING.MIT)
import os
import bb
import oeqa.utils.commands as commands
from oeqa.core.target.ssh import OESSHTarget
class MinimalController(OESSHTarget):
supported_image_fstypes = ["ext4"]
def __init__(self, logger, target_ip, server_ip, **kwargs):
super(MinimalController, self).__init__(
logger, target_ip, server_ip, **kwargs)
d = kwargs['datastore']
self.start_cmd = d.expand(d['TEST_HOST_START_CMD'])
if not self.start_cmd:
raise RuntimeError("TEST_HOST_START_CMD not defined")
self.stop_cmd = d.expand(d['TEST_HOST_STOP_CMD'])
if not self.stop_cmd:
raise RuntimeError("TEST_HOST_STOP_CMD not defined")
def __del__(self):
self.stop()
def turnBoard(self, state):
if state == "on":
bb.note("Turning board on target: {}".format(self.start_cmd))
commands.runCmd(self.start_cmd, env = os.environ)
elif state == "off":
bb.note("Running off target: {}".format(self.stop_cmd))
commands.runCmd(self.stop_cmd, env = os.environ)
else:
raise AssertionError("Can only turn board on or off. Wrong Option passed: " + state)
def cyclePower(self):
bb.note("Power cycling")
self.turnBoard("off")
self.turnBoard("on")
def executeCommandsOnTarget(self, cmd_list):
for cmd in cmd_list:
bb.note("Running on target: {}".format(cmd))
(status, output) = self.run(cmd)
if status:
raise AssertionError("{} on target failed with exit code: {}. stderr: {}".format(cmd,
status, output))
def executeCommandsOnHost(self, cmd_list):
for cmd in cmd_list:
bb.note("Running on host: {}".format(cmd))
commands.runCmd(cmd, preexec_fn=os.setsid, env=os.environ)
def start(self, **kwargs):
self.cyclePower()
def stop(self):
try:
self.turnBoard("off")
except AssertionError as e:
bb.warn("Error while stopping process: {}".format(e))
def generate_host_action_json(execute = None, io = None):
if not execute and not io:
bbfatal("Cannot generate action with no execute and no io")
action = {}
if execute:
action["execute"] = execute
if io:
action["io"] = io
return action
def get_option(global_json, option_id):
if option_id not in global_json.keys():
global_json[option_id] = []
return global_json[option_id]
def get_action(option, action):
if "command" not in option.keys():
option["command"] = {}
if action not in option["command"].keys():
option["command"][action] = []
return option["command"][action]
def append_serial(global_json, option_id, action, serial_device, baud, io = []):
option = get_option(global_json, option_id)
cmd = {}
cmd["type"] = "serial"
cmd["device"] = serial_device
cmd["baud"] = baud
action = get_action(cmd, action)
action.append({"io" : io})
option.append(cmd)
return global_json
def create_expect(expect_dict):
return {"expect" : expect_dict}
def create_send(to_send_string):
return {"send" : to_send_string}
def convert_expect_list_to_io(expects):
io = []
for expect in expects:
io.append(create_expect(expect))
return io
def append_serial_io(global_json, option_id, action, serial_device, baud, io):
import json
if not io:
return global_json
return append_serial(global_json, option_id, action, serial_device, baud, io)
def append_host_cmd(global_json, option_id, action, execute, io = []):
option = get_option(global_json, option_id)
cmd = {}
cmd["type"] = "host"
action = get_action(cmd, action)
action.append(generate_host_action_json(execute, io))
option.append(cmd)
return global_json
def get_image_base(d):
return "{}-{}".format(d.getVar("IMAGE_BASENAME"), d.getVar("MACHINE"))
def get_abs_image_dir(d):
result = os.path.join(d.getVar("TOPDIR"), d.getVar("DEPLOY_DIR_IMAGE"))
return result
def get_ext4_file_name(d):
return "{}.ext4".format(get_image_base(d))
def is_inside_docker():
return os.path.exists('/.dockerenv')
def get_docker_container_id():
if is_inside_docker():
with open("/proc/1/cpuset", "r") as f:
res = f.read().split()
return os.path.basename(res[0])
return None
def get_volumes_from_container():
container_id = get_docker_container_id()
if container_id:
return "--volumes-from {}".format(container_id)
def append_network_setup(d, global_json):
machine = d.getVar("MACHINE")
network_up_cmd = "setup-interface {interface} up {ip}/24 {broadcast_ip}".format(
interface = d.getVar("HOST_ETHERNET_DEVICE"),
ip = d.getVar("TEST_SERVER_IP"),
broadcast_ip = d.getVar("TEST_SERVER_BROADCAST_IP")
)
network_down_cmd = "setup-interface {interface} down".format(interface = d.getVar("HOST_ETHERNET_DEVICE"))
global_json = append_host_cmd(global_json, "{}-after-power".format(machine), "on", network_up_cmd)
global_json = append_host_cmd(global_json, "{}-after-power".format(machine), "off", network_down_cmd)
return global_json
def append_tftp_docker(d, global_json):
if is_inside_docker():
docker_volume_arg = get_volumes_from_container()
else:
container_mount_point = "/var/tftpboot"
docker_volume_arg = " -v {}:{}".format(get_abs_image_dir(d), container_mount_point)
tftp_cmd_on = "docker run -d --rm --net=host --name tftp {} airborne/tftpd:latest".format(docker_volume_arg)
tftp_cmd_off = "docker stop tftp || exit 0"
machine = d.getVar("MACHINE")
global_json = append_host_cmd(global_json, "{}-before-power".format(machine), "on", tftp_cmd_on)
global_json = append_host_cmd(global_json, "{}-after-power".format(machine), "off", tftp_cmd_off)
return global_json
def append_nfs_docker(d, global_json):
machine = d.getVar("MACHINE")
container_ext4_location = ""
docker_volume_arg = ""
if is_inside_docker():
docker_volume_arg = get_volumes_from_container()
container_ext4_location = os.path.join(get_abs_image_dir(d), get_ext4_file_name(d))
else:
container_mount_point = "/yocto"
docker_volume_arg = " -v {}:{}".format(get_abs_image_dir(d), container_mount_point)
container_ext4_location = os.path.join(container_mount_point, get_ext4_file_name(d))
nfs_on_arguments = {'extra-docker-arguments' : docker_volume_arg,
'machine' : machine,
'ext4-file' : container_ext4_location}
nfsd_cmd_on_template = "docker run -d --rm --net=host --name nfs-yocto-target-{machine} --privileged {extra-docker-arguments} \
-e EXT4_IMAGE={ext4-file} airborne/nfs-server-ext4:latest"
nfsd_cmd_on = nfsd_cmd_on_template.format(**nfs_on_arguments)
nfsd_cmd_off = "docker stop nfs-yocto-target-{machine} || exit 0".format(machine=machine)
global_json = append_host_cmd(global_json, "{}-before-power".format(machine), "on", nfsd_cmd_on)
global_json = append_host_cmd(global_json, "{}-after-power".format(machine), "off", nfsd_cmd_off)
return global_json
def append_qemu(d, global_json):
if not d.getVar("MACHINE") == "virt-arm":
return global_json
qemu = """{qemu_binary} {qemu_extra_option} {qemu_machine} {ram_parameters} -serial mon:pty:{host_serial_device} -kernel {kernel_path} -append \"root=/dev/nfs ip={qemu_ip}::{gateway_ip} nfsrootdebug nfsroot={nfs_server_ip}:/,v4,tcp r0 rootwait earlyprintk debug rw highres=off mem=2048M\" """.format(
host_serial_device = d.getVar("HOST_SERIAL_DEVICE"),
qemu_ip = d.getVar("TEST_TARGET_IP"),
nfs_server_ip = d.getVar("TEST_SERVER_IP"),
gateway_ip = d.getVar("TEST_SERVER_IP"),
ram_parameters = d.getVar("QB_MEM"),
qemu_machine = d.getVar("QB_MACHINE"),
qemu_extra_option = d.getVar("QB_OPT_APPEND"),
qemu_binary = d.getVar("QB_SYSTEM_NAME"),
kernel_path = os.path.join(get_abs_image_dir(d), d.getVar("KERNEL_IMAGETYPE"))
)
io = [{"expect" : {"text" : "char device redirected to", "timeout" : "10"}}]
global_json = append_host_cmd(global_json, "{}-before-power".format("virt-arm"), "on", qemu, io)
global_json = append_host_cmd(
global_json, "{}-after-power".format("virt-arm"), "off", 'kill $(pidof {}) || true'.format(d.getVar("QB_SYSTEM_NAME")))
global_json = append_host_cmd(global_json, "{}-after-power".format("virt-arm"), "off", 'rm -f {}'.format(d.getVar("HOST_SERIAL_DEVICE")))
return global_json
def append_serial_boot_io(machine, serial_device, baud, io, global_json):
return append_serial_io(global_json, "{}-after-power".format(machine), "on",
serial_device, baud, io)
def get_serial_device(d):
from bb.fetch2 import runfetchcmd
return runfetchcmd("lab-controller -d {} --get-serial-device".format(d.getVar("MACHINE")), d)
def generate_power_options(d):
import json
global_dict = {}
machine = d.getVar("MACHINE")
host_serial = d.getVar("HOST_SERIAL_DEVICE")
host_baud = d.getVar("HOST_SERIAL_BAUD")
boot_io = json.loads(d.getVar("BOOT_IO_" + machine))
global_dict = append_tftp_docker(d, global_dict)
global_dict = append_nfs_docker(d, global_dict)
global_dict = append_network_setup(d, global_dict)
global_dict = append_qemu(d, global_dict)
global_dict = append_serial_boot_io(machine, host_serial, host_baud, boot_io, global_dict)
global_json = json.dumps(global_dict, indent = 4, sort_keys = True)
return global_json
def boot_expect_arietta(d):
import json
boot_expect = []
# boot_expect.append({"text" : "VFS: Mounted root (nfs4 filesystem) readonly on device","timeout" : "25"}),
# boot_expect.append({"text" : "INIT: Entering runlevel: 5", "timeout" : "40"})
boot_expect.append({"text" : "exFAT: Version 1.2.10", "timeout" : "45"})
#It can take quite long because of the camera
boot_expect.append({"text" : "INIT: Entering runlevel: 5", "timeout" : "40"})
#Takes a long time because of key generation
boot_expect.append({"text" : "Poky (Yocto Project Reference Distro)", "timeout" : "50"})
return json.dumps(convert_expect_list_to_io(boot_expect))
def boot_expect_virt_arm(d):
import json
boot_expect = []
boot_expect.append({"text" : "INIT: Entering runlevel: 5", "timeout" : "40"})
boot_expect.append({"text" : "Poky (Yocto Project Reference Distro)", "timeout" : "35"})
boot_expect.append({"text" : "login:", "timeout" : "5"})
return json.dumps(convert_expect_list_to_io(boot_expect))
def boot_expect_imx6_tinyrex_pro(d):
import json
io = []
io.append(create_expect({"text": "Hit any key to stop autoboot", "timeout": "5"}))
io.append(create_send("\r"))
io.append(create_send("setenv ipaddr {}\r".format(d.getVar("TEST_TARGET_IP"))))
io.append(create_send("setenv serverip {}\r".format(d.getVar("TEST_SERVER_IP"))))
io.append(create_send("setenv tftp_dir .\r"))
io.append(create_send("setenv netargs setenv bootargs console=${console},${baudrate} ${smp} ${extra} ${video} root=/dev/nfs ip=${ipaddr}:${serverip}:${serverip}:255.255.252.0 nfsroot=${serverip}:/,v4,tcp\r"))
io.append(create_send("run netboot\r"))
io.append(create_expect({"text" : "INIT: Entering runlevel: 5", "timeout" : "40"}))
io.append(create_expect({"text" : "Poky (Yocto Project Reference Distro)", "timeout" : "35"}))
io.append(create_expect({"text" : "login:", "timeout" : "5"}))
return json.dumps(io)
BOOT_IO_arietta = "${@boot_expect_arietta(d)}"
BOOT_IO_virt-arm = "${@boot_expect_virt_arm(d)}"
BOOT_IO_imx6-tinyrexpro = "${@boot_expect_imx6_tinyrex_pro(d)}"
TESTIMAGEDEPENDS_append += " \
lab-controller-native:do_populate_sysroot \
procps-native:do_populate_sysroot \
"
#POWER_OPTIONS = "${@generate_power_options(d)}"
IMAGE_FSTYPES += "ext4"
IMAGE_CLASSES += "testimage"
TEST_LAB_DEVICE ?= "${MACHINE}"
TEST_HOST_START_CMD = "lab-controller -p on -d ${TEST_LAB_DEVICE} --optional-power '${POWER_OPTIONS}'"
TEST_HOST_STOP_CMD = "lab-controller -p off -d ${TEST_LAB_DEVICE} --optional-power '${POWER_OPTIONS}'"
TEST_TARGET ?= "MinimalController"
inherit testimage
do_machine_on() {
bbnote ${TEST_HOST_START_CMD}
${TEST_HOST_START_CMD}
}
do_machine_off() {
bbnote ${TEST_HOST_STOP_CMD}
${TEST_HOST_STOP_CMD}
}
addtask do_machine_on
addtask do_machine_off
do_machine_on[vardepsexclude] += "BB_ORIGENV DATETIME"
do_machine_on[nostamp] = "1"
do_machine_on[depends] += "${TESTIMAGEDEPENDS}"
do_machine_on[lockfiles] += "${TESTIMAGELOCK}"
do_machine_off[vardepsexclude] += "BB_ORIGENV DATETIME"
do_machine_off[nostamp] = "1"
do_machine_off[depends] += "${TESTIMAGEDEPENDS}"
do_machine_off[lockfiles] += "${TESTIMAGELOCK}"
require conf/machine/include/qemu.inc
require conf/machine/include/tune-cortexa7.inc
#To build rootfs in u-boot image format
#single ext4 is required for runqemu tasks
IMAGE_FSTYPES_append_virt-arm += " ext4"
UBOOT_MACHINE = "qemu_arm_defconfig"
UBOOT_ENV = "uEnv"
PREFERRED_VERSION_u-boot = "2018.01"
PREFERRED_PROVIDER_virtual/kernel = "linux-yocto"
#This variable is necessary so that the SysV init
#adds the login prompt to this consoles.
#ttyAMA0 is integrated in the dtb and is part of
#the qemu virt emulated machine.
#ttyS0 and possibly others are required for the runqemu
#which required 2 serial consoles. An annoying requirement.
SERIAL_CONSOLES ?= "115200;ttyAMA0 115200;ttyS0"
# The following variables are required by the qemuboot
# class which generates a qemuboot.conf that contains
# information for the runqemu wrapper of bitbake.
IMAGE_CLASSES += "qemuboot"
QB_MEM = "-m 2048"
QB_MACHINE = "-machine virt,highmem=off"
# Add the 'virtio-rng-pci' device otherwise the guest may run out of entropy
QB_OPT_APPEND += "-device virtio-rng-pci"
QB_OPT_APPEND += "-device virtio-net-pci,netdev=net0,mac=52:54:00:12:34:02 -netdev tap,id=net0,ifname=tap0,script=no,downscript=no"
#As we have serial connections we need no graphics
QB_OPT_APPEND += "-daemonize -display none"
MACHINE_EXTRA_RDEPENDS += "kernel-module-gpio-mockup pwm-mockup strace gdb"
KERNEL_MODULE_AUTOLOAD += "pwm-mockup gpio-mockup"
KERNEL_MODULE_PROBECONF += "gpio-mockup"
module_conf_gpio-mockup += "options gpio-mockup gpio_mockup_ranges=-1,512"
#This is a required rootfs type. Do not ask me why
QB_ROOTFS = "ext4"
#The qemu emulator binary required
QB_SYSTEM_NAME = "qemu-system-${TUNE_ARCH}"
PACKAGECONFIG_pn-qemu-native = "kvm fdt"
HOST_SERIAL_DEVICE = "/tmp/qemu-serial"
HOST_SERIAL_BAUD = "115200"
HOST_ETHERNET_DEVICE = "tap0"
TEST_SERVER_IP = "192.168.3.124"
TEST_TARGET_IP = "192.168.3.71"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment