Skip to content

Instantly share code, notes, and snippets.

@paulghaddad
Created February 10, 2020 20:02
Show Gist options
  • Save paulghaddad/0eaa81d2799b792cdd714e48624bf1e1 to your computer and use it in GitHub Desktop.
Save paulghaddad/0eaa81d2799b792cdd714e48624bf1e1 to your computer and use it in GitHub Desktop.
VM with OS Exercise
import pytest
from vm_with_os import VirtualMachine
def test_load_two_programs_to_memory():
vm = VirtualMachine()
segment_file_path_1 = 'add_255_3_test.vef'
vm.load_program(segment_file_path_1)
# The memory contains the first program's text
program_text = bytearray.fromhex('01011001021203010202010eff')
assert program_text in vm.memory
# The memory contains the first program's data
program_data = bytearray.fromhex('0000ff000300')
assert program_data in vm.memory
# The first program is registered in the VM's address registry
assert vm.address_registry[segment_file_path_1] == (0, 20)
# Load a second program
segment_file_path_2 = 'sub_256_3_test.vef'
vm.load_program(segment_file_path_2)
# The memory contains the second program's text
program_text = bytearray.fromhex('01011001021204010202010eff')
assert program_text in vm.memory
# The memory contains the second program's data
program_data = bytearray.fromhex('000000010300')
assert program_data in vm.memory
# The second program is registered in the VM's address registry
assert vm.address_registry[segment_file_path_2] == (20, 40)
def test_processes_run_sequentially():
vm = VirtualMachine()
# Load two programs
segment_file_path_1 = 'add_255_3_test.vef'
segment_file_path_2 = 'sub_256_3_test.vef'
vm.load_program(segment_file_path_1)
vm.load_program(segment_file_path_2)
# Run the VM
vm.run()
# Confirm the output addresses are correct for program 1
assert vm.memory[14:16] == bytearray.fromhex('0201')
# Confirm the output addresses are correct for program 2
assert vm.memory[34:36] == bytearray.fromhex('fd00')
class VirtualMachine:
def __init__(self, memory=[]):
self.memory = bytearray(1024)
self.address_registry = {}
self.memory_pointer = 0
self.process_queue = deque()
def run(self):
while self.process_queue:
# Get the next process in the front of the queue
name, pc_address = self.process_queue[0]
address_offset = self.get_process_address(name)
registers = [pc_address, 0x00, 0x00]
PC = registers[0]
for count in range(2):
instruction = self.fetch_instruction_code(PC)
if instruction == 'HALT':
# Remove process from queue and move to next process
self.process_queue.popleft()
break
arg1, arg2 = self.fetch_instruction_operands(PC)
if instruction == 'LOAD_WORD':
register_number = arg1
address_start = self.translate_address(arg2, address_offset)
input_value = self.load_word(self.memory[address_start:address_start+2])
registers[register_number] = input_value
elif instruction == 'STORE_WORD':
register_number = arg1
address_start = self.translate_address(arg2, address_offset)
input_bytes = self.read_word_from_register(registers[register_number])
self.store_word_to_memory(address_start, *input_bytes)
elif instruction == 'ADD':
register1, register2 = registers[arg1], registers[arg2]
registers[1] = register1 + register2
elif instruction == 'SUB':
register1, register2 = registers[arg1], registers[arg2]
registers[1] = register1 - register2
PC += WORDS_PER_INSTRUCTION
# Update current instruction position for process
self.process_queue[0][1] = PC
# Rotate queue
self.process_queue.rotate(-1)
def load_program(self, name):
with open(name, 'rb') as segment_file:
address_start = self.memory_pointer
for data in segment_file:
# Get number of headers
num_segments = data[0]
# Decode each header
for i in range(num_segments):
header = data[3*i+1:3*i+4]
segment_type = header[0] >> 7
target_address = header[0] & 0b01111111
segment_length = header[1]
payload_location = header[2]
# Load to memory
segment_offset = 1 + 3*num_segments + payload_location
payload = data[segment_offset:segment_offset+segment_length]
abs_memory_offset = address_start + target_address
self.memory[abs_memory_offset:abs_memory_offset+segment_length] = payload
self.memory_pointer += (target_address - self.memory_pointer) + segment_length + address_start
# Set program's memory location in the address space registry
self.address_registry[name] = (address_start, self.memory_pointer)
# Add program to process queue
self.process_queue.append([name, address_start])
def get_process_address(self, process):
return self.address_registry[process][0]
def fetch_instruction_code(self, program_counter):
instruction_address = self.memory[program_counter]
return INSTRUCTION_SET[instruction_address]
def fetch_instruction_operands(self, program_counter):
return self.memory[program_counter+1], self.memory[program_counter+2]
def load_word(self, input_bytes):
return input_bytes[0] * 1 + input_bytes[1] * 256
def read_word_from_register(self, register_value):
return register_value % 256, register_value // 256
def store_word_to_memory(self, address_start, *input_bytes):
self.memory[address_start] = input_bytes[0]
self.memory[address_start+1] = input_bytes[1]
def translate_address(self, address, address_offset):
return address_offset + address
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment