Created
July 29, 2012 17:00
-
-
Save Orpheon/3200243 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import math, struct | |
import time as time_module | |
import node, player, constants, function | |
import client as cl | |
class Nodemap(object): | |
# Minimum distance nodes have to have from one-another to exist - aka Optimization | |
MIN_NODE_DISTANCE = 8 | |
# Extra height above ground the nodes are given, to ensure collision even when jumping. | |
EXTRA_NODE_HEIGHT = 25 | |
# The "fps" of the simulation used by walkers to discover paths afterwards | |
SIMULATION_FPS = 120 | |
# The maximum time a walker can stay at a place before declared stuck | |
MAX_TIME_STUCK = 4# in seconds | |
# Average character height | |
APPROX_CHARACTER_HEIGHT = 35 | |
# Distance from edges of platforms | |
MIN_DISTANCE_FROM_EDGE = 0 | |
def __init__(self): | |
self.nodelist = [] | |
def create(self, game, state, client): | |
nodemap = self.load_nodemap(game, state, client) | |
def destroy(self, game): | |
self.save_nodemap(game) | |
for nav_node in self.nodelist: | |
nav_node.to_destroy = True | |
def save_nodemap(self, game): | |
nodemap = self.serialize_nodemap() | |
mapname = game.map.mapname | |
text = open(mapname+".nodemap", "w") | |
endstring = struct.pack(">B", constants.TEMPESTBOT_VERSION) + nodemap | |
text.write(endstring) | |
text.close() | |
print "Saved nodemap." | |
def load_nodemap(self, game, state, client): | |
self.generate_nodes(game, state, client) | |
return | |
try: | |
text = open(game.map.mapname+".nodemap", "r") | |
nodemap = text.read() | |
version = struct.unpack_from(">B", nodemap) | |
nodemap = nodemap[1:] | |
print "Nodemap of {} found, Tempest Bot version {}".format(game.map.mapname, version) | |
self.deserialize_nodemap(game, state, nodemap) | |
text.close() | |
except IOError: | |
print "No valid nodemap found, will generate one. Please wait..." | |
self.generate_nodes(game, state) | |
def serialize_nodemap(self): | |
nodemap = "" | |
# Number of nodes in total | |
nodemap += struct.pack(">I", len(self.nodelist)) | |
# Add all the details important to creation | |
for nav_node in self.nodelist: | |
nodemap += struct.pack(">II", nav_node.x, nav_node.y) | |
# Add the connections | |
for nav_node in self.nodelist: | |
nodemap += struct.pack(">B", len(nav_node.connection_nodes)) | |
for othernode in nav_node.connection_nodes: | |
index = nav_node.connection_nodes.index(othernode) | |
# Pack the connecion details, using the index of the other node in self.nodelist | |
nodemap += struct.pack(">IIb", self.nodelist.index(othernode), nav_node.connection_distances[index], nav_node.connection_commands[index]) | |
return nodemap | |
def deserialize_nodemap(self, game, state, nodemap): | |
numof_nodes = struct.unpack_from(">I", nodemap)[0] | |
nodemap = nodemap[4:] | |
print "Will load nodemap, {} nodes to load.".format(numof_nodes) | |
# Create all the nodes, they'll put themselves in self.nodelist | |
for a in range(numof_nodes): | |
x, y = struct.unpack_from(">II", nodemap) | |
nodemap = nodemap[8:] | |
node.Node(game, state, self, (x, y)) | |
print "Loading nodes, ~{}% done.".format(str(int(a*35/numof_nodes))) | |
# Now go through them again, and add connections | |
for nav_node in self.nodelist: | |
numconnections = struct.unpack_from(">B", nodemap)[0] | |
nodemap = nodemap[1:] | |
if numconnections > 4: | |
print("ATTENTION NUMCONNECTIONS TOO LARGE") | |
for a in range(numconnections): | |
nodeindex, distance, command = struct.unpack_from(">IIb", nodemap) | |
nodemap = nodemap[9:] | |
nav_node.add_connection(self.nodelist[nodeindex], distance, command) | |
print "Loading nodes, ~{}% done.".format(str(int(self.nodelist.index(nav_node)*65/len(self.nodelist) + 35))) | |
print "Loading nodes done." | |
def generate_nodes(self, game, state, client): | |
mask = game.map.collision_mask | |
onPlane = False | |
### SET THE NODES ### | |
for y in range(self.EXTRA_NODE_HEIGHT+self.APPROX_CHARACTER_HEIGHT, game.map.height-1): | |
for x in range(game.map.width-1): | |
if onPlane: | |
if x == 0:# We changed map row | |
nav_node = node.Node(game, state, self, (map_width-self.MIN_DISTANCE_FROM_EDGE, y-1-self.EXTRA_NODE_HEIGHT)) | |
nav_node.edge_offset = -1 | |
onPlane = False | |
if (not mask.get_at((x, y))) and mask.get_at((x, y+1)):# If this is a floor pixel | |
if not onPlane: # If it's the first pixel of a certain plane | |
nav_node = node.Node(game, state, self, (x+self.MIN_DISTANCE_FROM_EDGE, y-self.EXTRA_NODE_HEIGHT)) | |
nav_node.edge_offset = 1 | |
onPlane = True | |
else:# We're off the ground, but we were a pixel ago. Also, we're not at the beginning of the screen. | |
if onPlane: | |
nav_node = node.Node(game, state, self, (x-1-self.MIN_DISTANCE_FROM_EDGE, y-self.EXTRA_NODE_HEIGHT)) | |
nav_node.edge_offset = -1 | |
onPlane = False | |
index = 0 | |
while index < len(self.nodelist)-1: | |
nav_node = self.nodelist[index] | |
for y in range(self.APPROX_CHARACTER_HEIGHT): | |
if mask.get_at((nav_node.x, nav_node.y-y)): | |
nav_node.destroy(state) | |
index -= 1 | |
break | |
index += 1 | |
for nav_node in self.nodelist: | |
numof_neighbours = 0 | |
for othernode in self.nodelist: | |
if othernode == nav_node: continue # We're dealing with the same node. | |
if math.hypot(nav_node.x-othernode.x, nav_node.y-othernode.y) <= self.MIN_NODE_DISTANCE: | |
numof_neighbours += 1 | |
if numof_neighbours > 1: | |
index = self.nodelist.index(nav_node) | |
self.nodelist[index].to_destroy = True | |
# TODO: Remove nodes in spawn as soon as spawn is implemented. | |
# Actually destroy all the nodes who were marked to die | |
index = 0 | |
# I hate doing this, but we have to compensate for removing items in the list | |
while index < len(self.nodelist)-1: | |
nav_node = self.nodelist[index] | |
for y in range(self.APPROX_CHARACTER_HEIGHT): | |
if mask.get_at((nav_node.x, nav_node.y-y)): | |
nav_node.to_destroy = True | |
break | |
if nav_node.to_destroy: | |
nav_node.destroy(state) | |
index -= 1 | |
index += 1 | |
### CONNECT THE NODES ### | |
# For each node, create a character object that moves in a certain direction, and see to what it connects. | |
walkerplayer = player.Player(game, state, len(state.players)) | |
walkerplayer.spawn(game, state) | |
walkerchar = state.entities[walkerplayer.character_id] | |
client.spectator = cl.spectator.Spectator(walkerplayer.id) | |
client.our_player_id = walkerplayer.id | |
for origin_node in self.nodelist: | |
for command in [1, -1, 2, -2]:# walk right, walk left, walk right and jump, walk left and jump. All 4 possible paths from a given node. | |
has_left_origin_node = False# Used to detect when we are stuck | |
time = 0 | |
distance = 0 | |
please_break = False | |
tried_jumping = False | |
# oldx and oldy are the original positions of the nodes at generation | |
oldx = origin_node.x + origin_node.edge_offset*self.MIN_DISTANCE_FROM_EDGE | |
oldy = origin_node.y + self.EXTRA_NODE_HEIGHT | |
if mask.get_at((int(oldx+walkerchar.collision_mask.get_size()[0]/2), oldy)): | |
walkerchar.x = oldx - walkerchar.collision_mask.get_size()[0] | |
elif mask.get_at((int(oldx-walkerchar.collision_mask.get_size()[0]/2), oldy)): | |
walkerchar.x = oldx | |
else: | |
walkerchar.x = oldx - walkerchar.collision_mask.get_size()[0]/2 | |
walkerchar.y = oldy - walkerchar.collision_mask.get_size()[1] | |
while not mask.overlap(walkerchar.collision_mask, (int(walkerchar.x), int(walkerchar.y+1))): | |
walkerchar.y += 1 | |
# Set the input of the character | |
walkerplayer.left = command>0 | |
walkerplayer.right = command<0 | |
walkerplayer.up = (abs(command)-1)>0 | |
walkerchar.hspeed = 0 | |
walkerchar.vspeed = 0 | |
while True: | |
oldx = walkerchar.x | |
game.previous_state = game.current_state.copy() | |
game.current_state.update(game, 1/self.SIMULATION_FPS) | |
client.renderer.render(client, game, 1/self.SIMULATION_FPS, walkerchar.id) | |
collides_with_origin = False | |
# Check collision with any nodes | |
for othernode in self.nodelist: | |
if othernode.collide_with_character(walkerchar): | |
# Is this the original node? | |
if othernode != origin_node: | |
if othernode not in origin_node.connection_nodes: | |
# Yay, new path found. Create a connection | |
origin_node.add_connection(othernode, distance, command) | |
please_break = True# Exit the while | |
else: | |
collides_with_origin = True | |
break | |
if not collides_with_origin: | |
walkerplayer.up = False | |
if please_break: | |
break | |
if time > self.MAX_TIME_STUCK: | |
if origin_node.collide_with_character(walkerchar) or round(oldx) == round(walkerchar.x): | |
if not tried_jumping: | |
walkerplayer.up = True | |
tried_jumping = True | |
time = 0 | |
# if origin_node.collide_with_character(walkerchar): | |
# if abs(command) == 1: | |
# command *= 2 | |
else: | |
# We're stuck. Abort | |
break | |
else: | |
tried_jumping = False | |
time += 1/self.SIMULATION_FPS | |
distance += math.hypot(walkerchar.hspeed, walkerchar.vspeed) | |
print "Generating nodemap, ~"+str(int((self.nodelist.index(origin_node)+1)/len(self.nodelist)*100))+"% done" | |
walkerplayer.destroy(game, state) | |
self.save_nodemap(game) | |
print "Done." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment