public
Last active

An agent that learns to chemotax using pybrain for neural networks and learning, pygame for graphics, and pymunk for physics.

  • Download Gist
ChemotaxisEnv.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
from Food import Food
from Seeker import Seeker
from globals import SCREEN_SIZE, toPygame
from pybrain.rl.environments.environment import Environment
import pygame
 
class ChemotaxisEnv(Environment):
"""
Experiment adapted from "Evolving Dynamical Neural Networks for Adaptive Behavior" (Beer & Gallagher, 1992)
An seeker is enclosed in a square box with a food item inside.
This food item emits a chemical signal whose intensity falls off
as the inverse square of the distance from the food.
The intensity of the chemical signal within the environment varies
five orders of magnitude from the item to the far corners of the box, i.e. at a distance of root(2).
We wish the seeker to find and remain in the vicinity of the food item,
starting from arbitrary locations and orientations within the environment.
 
To accomplish this task, the seeker is endowed with a circular body with a diameter of .01.
The seeker possesses chemical sensors that can directly sense
the intensity of the chemical signal at their location.
These sensors are symmetrically placed about the center line of the body.
In addition, the seeker has two effectors located on opposite sides of its body.
These effectors can apply forces that move the body forward and rotate it.
In the simplified physics of this environment,
the velocity of movement is proportional to the force applied.
"""
def __init__(self):
# pygame initialization
 
self.screen = pygame.display.set_mode((SCREEN_SIZE, SCREEN_SIZE))
pygame.mouse.set_visible(0)
 
self.background = pygame.Surface(self.screen.get_size())
self.background = self.background.convert()
self.background.fill((255, 255, 255))
self.food = Food()
self.food_sprite = pygame.sprite.RenderPlain(self.food) # create sprite group for the food
 
self.seeker = Seeker()
self.seeker_sprite = pygame.sprite.RenderPlain(self.seeker) # create sprite group for the seeker
 
self._draw()
 
# pybrain initialization
self.action = [0.0, 0.0]
self.reset()
 
def _draw(self):
#self.seeker_sprite.update(self.background)
self.screen.blit(self.background, (0, 0))
self.food.update()
self.food_sprite.draw(self.screen)
self.seeker.update(self.screen) # we don't need to draw this here because we draw it with pymunk
pygame.display.flip()
 
def _calcDistance(self, loc1, loc2):
""" Calculates the Euclidean distance between two coordinate pairs. """
from math import sqrt
return sqrt((loc2[0] - loc1[0]) ** 2 + (loc2[1] - loc1[1]) ** 2)
 
def calcSignal(self, loc):
""" Calculates the chemical signal at a specific location, which is
the inverse square of the distance between the given location and the food. """
 
dist = self._calcDistance(self.food.loc, loc)
if dist == 0:
return 1
else:
return 1/dist # why does changing the reward magnitude change the sensor-delta magnitude?
 
def getSensors(self):
""" the currently visible state of the world (the observation may be
stochastic - repeated calls returning different values)
 
:rtype: by default, this is assumed to be a numpy array of doubles
"""
# get sensor locations
lx, ly, rx, ry = self.seeker.calcAbsoluteSensorPositions()
 
# return the strength of the chemical signals at the seeker's left and right sensors
return [ self.calcSignal(toPygame((lx, ly))), self.calcSignal(toPygame((rx, ry))) ]
 
def performAction(self, action):
""" perform an action on the world that changes its internal state (maybe
stochastically).
:key action: an action that should be executed in the Environment.
:type action: by default, this is assumed to be a numpy array of doubles
action[0] is the left motor/effector neuron output, action[1] is the right
"""
 
self.seeker.move_body(action[0], action[1])
 
self.movement_tracker.append(toPygame(self.seeker.body.position))
 
# redraw
self._draw()
 
def reset(self):
""" Reinitializes the environment with the food in a random location
and the seeker with a random direction in a random location.
"""
from random import random
self.movement_tracker = []
self.food.setLocation((random()*SCREEN_SIZE, random()*SCREEN_SIZE))
self.seeker.reset()
EpisodicChemotaxisTask.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
__author__ = 'Daniel Horowitz, dhorowitz@oxy.edu'
 
from ChemotaxisEnv import ChemotaxisEnv
from globals import toPygame#, SCREEN_SIZE
from pybrain.rl.environments import EpisodicTask
 
class ChemotaxisTask(EpisodicTask):
def __init__(self, env=None, maxsteps=1000):
"""
:key env: (optional) an instance of a ChemotaxisEnv (or a subclass thereof)
:key maxsteps: maximal number of steps (default: 1000)
"""
if env == None:
env = ChemotaxisEnv()
self.env = env
EpisodicTask.__init__(self, env)
self.N = maxsteps
self.t = 0
#self.actor_limits = [(0,1), (0,1)] # scale (-1,1) to motor neurons
self.sensor_limits = [(0,1), (0,1)] # scale sensor neurons to (-1,1)
 
def reset(self):
EpisodicTask.reset(self)
self.t = 0
 
def performAction(self, action):
self.t += 1
EpisodicTask.performAction(self, action)
 
def isFinished(self):
if self.t >= self.N:
# maximal timesteps
return True
return False
def getReward(self):
""" The reward is equal to the chemical signal of the food at the seeker's location,
which is the inverse square of the distance between those locations. """
return self.env.calcSignal(toPygame(self.env.seeker.body.position))
 
def setMaxLength(self, n):
self.N = n
Food.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import pygame
 
class Food(pygame.sprite.Sprite):
loc = (0,0)
RADIUS = 3
def __init__(self):
pygame.sprite.Sprite.__init__(self)
self.image = pygame.Surface([self.RADIUS*2,self.RADIUS*2])
self.rect = pygame.draw.circle(self.image, (0,0,0), self.loc, self.RADIUS)
def setLocation(self, loc):
self.loc = loc
def update(self):
self.rect.center = self.loc
Seeker.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
from pymunk import pygame_util
import pygame
import pymunk
from globals import SCREEN_SIZE
from math import pi
 
class Seeker(pygame.sprite.Sprite):
STEP_TIME = 0.1
RADIUS = 5
SENSOR_RADIUS = 2
VELOCITY_LIMIT = 5
ANGULAR_VELOCITY_LIMIT = pi/16
WALL_WIDTH = 2.0
amin = 0
amax = 0
 
def __init__(self):
# pygame init
pygame.sprite.Sprite.__init__(self)
self.image = pygame.Surface([10, 10])
 
# pymunk init
self.space = pymunk.Space()
self.walls = [
pymunk.Segment(self.space.static_body, (0,0+self.WALL_WIDTH), (SCREEN_SIZE, 0+self.WALL_WIDTH), self.WALL_WIDTH), # bottom
pymunk.Segment(self.space.static_body, (0,0), (0, SCREEN_SIZE), self.WALL_WIDTH), # left
pymunk.Segment(self.space.static_body, (0, SCREEN_SIZE), (SCREEN_SIZE, SCREEN_SIZE), self.WALL_WIDTH), # top
pymunk.Segment(self.space.static_body, (SCREEN_SIZE-self.WALL_WIDTH, 0), (SCREEN_SIZE-self.WALL_WIDTH, SCREEN_SIZE), self.WALL_WIDTH) # right
]
def collision(space, arbiter, *args, **kwargs):
""" make seeker bounce off walls """
self.body.velocity.x = -self.body.velocity.x
self.body.velocity.y = -self.body.velocity.y
self.body.angle += pi
c = 1
for wall in self.walls:
self.space.add_collision_handler(0, c, begin=collision)
wall.collision_type = c
c += 1
wall.friction = 1.0
wall.group = 1
wall.color = pygame.color.THECOLORS["black"]
self.space.add(self.walls)
self.force_l = pymunk.Body(1,1)
self.force_r = pymunk.Body(1,1)
self.force_line_l = pymunk.Segment(self.force_l, (0,0), (0,0), 1)
self.force_line_r = pymunk.Segment(self.force_r, (0,0), (0,0), 1)
self.force_space = pymunk.Space()
self.force_space.add(self.force_line_l, self.force_line_r)
 
self.body = pymunk.Body(mass=5, moment=1000) # tweak these to decrease spinning out of control
self.body.velocity_limit = self.VELOCITY_LIMIT
self.body.angular_velocity_limit = self.ANGULAR_VELOCITY_LIMIT
 
self.body_shape = pymunk.Circle(self.body, self.RADIUS, (0, 0))
self.sensor_left = pymunk.Circle(self.body, self.SENSOR_RADIUS, (-self.RADIUS, 0))
self.sensor_right = pymunk.Circle(self.body, self.SENSOR_RADIUS, (self.RADIUS, 0))
self.body_shape.collision_type = 0
self.body_shape.color = pygame.color.THECOLORS["black"]
self.sensor_left.color = pygame.color.THECOLORS["red"]
self.sensor_right.color = pygame.color.THECOLORS["red"]
self.space.add(self.body, self.body_shape, self.sensor_left, self.sensor_right)
self.reset()
 
def orientate_sensors(self, left_sensor_relative_pos, right_sensor_relative_pos):
self.space.remove(self.sensor_left, self.sensor_right)
self.sensor_left = pymunk.Circle(self.body, self.SENSOR_RADIUS, left_sensor_relative_pos)
self.sensor_right = pymunk.Circle(self.body, self.SENSOR_RADIUS, right_sensor_relative_pos)
self.space.add(self.sensor_left, self.sensor_right)
def update_force_lines(self, force_l_start, force_l_end, force_r_start, force_r_end):
self.force_space.remove(self.force_line_l, self.force_line_r)
self.force_line_l = pymunk.Segment(self.force_l, force_l_start, force_l_end, 1)
self.force_line_r = pymunk.Segment(self.force_r, force_r_start, force_r_end, 1)
self.force_space.add(self.force_line_l, self.force_line_r)
 
def position_body(self, pos=None):
from random import random
 
if not pos:
pos_range = SCREEN_SIZE-self.WALL_WIDTH # lower maximum
pos = random()*pos_range + self.WALL_WIDTH # raise minimum
self.body.position = (pos, pos)
else:
self.body.position = pos
 
def reset_velocity(self):
self.body.velocity = (0,0)
self.body.angular_velocity = 0
 
def update(self, surface):
#lx,ly,rx,ry = self.calcSensorPositions()
 
self.space.step(self.STEP_TIME)
# wrap sloppily around edges
#if (self.body.position.x < 0):
# self.position_body((SCREEN_SIZE, self.body.position.y))
#if (self.body.position.x > SCREEN_SIZE):
# self.position_body((0, self.body.position.y))
#if (self.body.position.y < 0):
# self.position_body((self.body.position.x, SCREEN_SIZE))
#if (self.body.position.y > SCREEN_SIZE):
# self.position_body((self.body.position.x, 0))
 
#print self.body.position
 
pygame_util.draw_space(surface, self.space)
pygame_util.draw_space(surface, self.force_space)
 
def reset(self):
self.position_body()
self.reset_velocity()
 
def calcRelativeSensorPositions(self):
""" Calculate locations of sensors using the magic of trigonometry. """
from math import sin, cos
lx = self.RADIUS * sin(self.body.angle)
ly = self.RADIUS * -cos(self.body.angle)
rx = self.RADIUS * -sin(self.body.angle)
ry = self.RADIUS * cos(self.body.angle)
 
return lx, ly, rx, ry
 
def calcAbsoluteSensorPositions(self):
lx, ly, rx, ry = self.calcRelativeSensorPositions()
lx += self.body.position[0]
ly += self.body.position[1]
rx += self.body.position[0]
ry += self.body.position[1]
 
return lx, ly, rx, ry
 
def constrain_angle(self, angle):
""" keep angle between 0 and 2*pi """
if angle > 2*pi:
angle = angle - 2*pi
elif angle < 0:
angle = 2*pi - angle
return angle
 
def move_body(self, la, ra):
""" apply forces to the seeker's body """
from math import sin, cos
print la, ra
self.body.angle = self.constrain_angle(self.body.angle)
lx, ly, rx, ry = self.calcRelativeSensorPositions()
self.orientate_sensors((lx,ly), (rx,ry))
self.body.reset_forces()
a = self.body.angle
# break down forces into their x and y components
force_lx = la * cos(a)
force_ly = la * sin(a)
force_rx = ra * cos(a)
force_ry = ra * sin(a)
# apply forces at the sensor locations in the direction of the seeker
self.body.apply_force((force_lx, force_ly), (lx, ly))
self.body.apply_force((force_rx, force_ry), (rx, ry))
lx, ly, rx, ry = self.calcAbsoluteSensorPositions()
self.update_force_lines((lx, ly), (lx+force_lx, ly+force_ly), (rx, ry), (rx+force_rx, ry+force_ry))
chemotaxis.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
from ChemotaxisEnv import ChemotaxisEnv
from EpisodicChemotaxisTask import ChemotaxisTask
from globals import SCREEN_SIZE
from pybrain.rl.agents import OptimizationAgent
from pybrain.rl.experiments import EpisodicExperiment
from pybrain.optimization import HillClimber
 
from pybrain.tools.shortcuts import buildNetwork
import matplotlib.pyplot as pyplot
 
MAX_TRIALS = 10
MAX_STEPS = 1200
 
# pybrain initialization
task = ChemotaxisTask(ChemotaxisEnv(), MAX_STEPS)
module = buildNetwork(2,2,2) # create a feed-forward neural network with 3 layers: 2 input neurons, 2 hidden neurons, and 2 output neurons
 
learner = HillClimber(task, module, maxEvaluations=MAX_TRIALS, mustMinimize=True, storeAllEvaluations=True, storeAllEvaluated=True, verbose=False)
learner.learn()
 
reward_avgs = [e/MAX_STEPS for e in learner._allEvaluations]
 
# show average reward over trials
 
pyplot.figure(1)
pyplot.plot(range(1,len(reward_avgs)+1), reward_avgs)
pyplot.ylabel("Average Reward")
pyplot.xlabel("Trial #")
 
pyplot.show()
globals.py
Python
1 2 3 4 5
SCREEN_SIZE = 400
 
def toPygame(xy):
""" convert pymunk coordinates to pygame coordinates """
return xy[0], SCREEN_SIZE-xy[1]

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.