Created
February 10, 2020 20:02
-
-
Save paulghaddad/0eaa81d2799b792cdd714e48624bf1e1 to your computer and use it in GitHub Desktop.
VM with OS Exercise
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pytest | |
from vm_with_os import VirtualMachine | |
def test_load_two_programs_to_memory(): | |
vm = VirtualMachine() | |
segment_file_path_1 = 'add_255_3_test.vef' | |
vm.load_program(segment_file_path_1) | |
# The memory contains the first program's text | |
program_text = bytearray.fromhex('01011001021203010202010eff') | |
assert program_text in vm.memory | |
# The memory contains the first program's data | |
program_data = bytearray.fromhex('0000ff000300') | |
assert program_data in vm.memory | |
# The first program is registered in the VM's address registry | |
assert vm.address_registry[segment_file_path_1] == (0, 20) | |
# Load a second program | |
segment_file_path_2 = 'sub_256_3_test.vef' | |
vm.load_program(segment_file_path_2) | |
# The memory contains the second program's text | |
program_text = bytearray.fromhex('01011001021204010202010eff') | |
assert program_text in vm.memory | |
# The memory contains the second program's data | |
program_data = bytearray.fromhex('000000010300') | |
assert program_data in vm.memory | |
# The second program is registered in the VM's address registry | |
assert vm.address_registry[segment_file_path_2] == (20, 40) | |
def test_processes_run_sequentially(): | |
vm = VirtualMachine() | |
# Load two programs | |
segment_file_path_1 = 'add_255_3_test.vef' | |
segment_file_path_2 = 'sub_256_3_test.vef' | |
vm.load_program(segment_file_path_1) | |
vm.load_program(segment_file_path_2) | |
# Run the VM | |
vm.run() | |
# Confirm the output addresses are correct for program 1 | |
assert vm.memory[14:16] == bytearray.fromhex('0201') | |
# Confirm the output addresses are correct for program 2 | |
assert vm.memory[34:36] == bytearray.fromhex('fd00') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class VirtualMachine: | |
def __init__(self, memory=[]): | |
self.memory = bytearray(1024) | |
self.address_registry = {} | |
self.memory_pointer = 0 | |
self.process_queue = deque() | |
def run(self): | |
while self.process_queue: | |
# Get the next process in the front of the queue | |
name, pc_address = self.process_queue[0] | |
address_offset = self.get_process_address(name) | |
registers = [pc_address, 0x00, 0x00] | |
PC = registers[0] | |
for count in range(2): | |
instruction = self.fetch_instruction_code(PC) | |
if instruction == 'HALT': | |
# Remove process from queue and move to next process | |
self.process_queue.popleft() | |
break | |
arg1, arg2 = self.fetch_instruction_operands(PC) | |
if instruction == 'LOAD_WORD': | |
register_number = arg1 | |
address_start = self.translate_address(arg2, address_offset) | |
input_value = self.load_word(self.memory[address_start:address_start+2]) | |
registers[register_number] = input_value | |
elif instruction == 'STORE_WORD': | |
register_number = arg1 | |
address_start = self.translate_address(arg2, address_offset) | |
input_bytes = self.read_word_from_register(registers[register_number]) | |
self.store_word_to_memory(address_start, *input_bytes) | |
elif instruction == 'ADD': | |
register1, register2 = registers[arg1], registers[arg2] | |
registers[1] = register1 + register2 | |
elif instruction == 'SUB': | |
register1, register2 = registers[arg1], registers[arg2] | |
registers[1] = register1 - register2 | |
PC += WORDS_PER_INSTRUCTION | |
# Update current instruction position for process | |
self.process_queue[0][1] = PC | |
# Rotate queue | |
self.process_queue.rotate(-1) | |
def load_program(self, name): | |
with open(name, 'rb') as segment_file: | |
address_start = self.memory_pointer | |
for data in segment_file: | |
# Get number of headers | |
num_segments = data[0] | |
# Decode each header | |
for i in range(num_segments): | |
header = data[3*i+1:3*i+4] | |
segment_type = header[0] >> 7 | |
target_address = header[0] & 0b01111111 | |
segment_length = header[1] | |
payload_location = header[2] | |
# Load to memory | |
segment_offset = 1 + 3*num_segments + payload_location | |
payload = data[segment_offset:segment_offset+segment_length] | |
abs_memory_offset = address_start + target_address | |
self.memory[abs_memory_offset:abs_memory_offset+segment_length] = payload | |
self.memory_pointer += (target_address - self.memory_pointer) + segment_length + address_start | |
# Set program's memory location in the address space registry | |
self.address_registry[name] = (address_start, self.memory_pointer) | |
# Add program to process queue | |
self.process_queue.append([name, address_start]) | |
def get_process_address(self, process): | |
return self.address_registry[process][0] | |
def fetch_instruction_code(self, program_counter): | |
instruction_address = self.memory[program_counter] | |
return INSTRUCTION_SET[instruction_address] | |
def fetch_instruction_operands(self, program_counter): | |
return self.memory[program_counter+1], self.memory[program_counter+2] | |
def load_word(self, input_bytes): | |
return input_bytes[0] * 1 + input_bytes[1] * 256 | |
def read_word_from_register(self, register_value): | |
return register_value % 256, register_value // 256 | |
def store_word_to_memory(self, address_start, *input_bytes): | |
self.memory[address_start] = input_bytes[0] | |
self.memory[address_start+1] = input_bytes[1] | |
def translate_address(self, address, address_offset): | |
return address_offset + address |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment