Create a gist now

Instantly share code, notes, and snippets.

An agent that learns to chemotax using pybrain for neural networks and learning, pygame for graphics, and pymunk for physics.
from ChemotaxisEnv import ChemotaxisEnv
from EpisodicChemotaxisTask import ChemotaxisTask
from globals import SCREEN_SIZE
from pybrain.rl.agents import OptimizationAgent
from pybrain.rl.experiments import EpisodicExperiment
from pybrain.optimization import HillClimber
from import buildNetwork
import matplotlib.pyplot as pyplot
MAX_STEPS = 1200
# pybrain initialization
task = ChemotaxisTask(ChemotaxisEnv(), MAX_STEPS)
module = buildNetwork(2,2,2) # create a feed-forward neural network with 3 layers: 2 input neurons, 2 hidden neurons, and 2 output neurons
learner = HillClimber(task, module, maxEvaluations=MAX_TRIALS, mustMinimize=True, storeAllEvaluations=True, storeAllEvaluated=True, verbose=False)
reward_avgs = [e/MAX_STEPS for e in learner._allEvaluations]
# show average reward over trials
pyplot.plot(range(1,len(reward_avgs)+1), reward_avgs)
pyplot.ylabel("Average Reward")
pyplot.xlabel("Trial #")
from Food import Food
from Seeker import Seeker
from globals import SCREEN_SIZE, toPygame
from pybrain.rl.environments.environment import Environment
import pygame
class ChemotaxisEnv(Environment):
Experiment adapted from "Evolving Dynamical Neural Networks for Adaptive Behavior" (Beer & Gallagher, 1992)
An seeker is enclosed in a square box with a food item inside.
This food item emits a chemical signal whose intensity falls off
as the inverse square of the distance from the food.
The intensity of the chemical signal within the environment varies
five orders of magnitude from the item to the far corners of the box, i.e. at a distance of root(2).
We wish the seeker to find and remain in the vicinity of the food item,
starting from arbitrary locations and orientations within the environment.
To accomplish this task, the seeker is endowed with a circular body with a diameter of .01.
The seeker possesses chemical sensors that can directly sense
the intensity of the chemical signal at their location.
These sensors are symmetrically placed about the center line of the body.
In addition, the seeker has two effectors located on opposite sides of its body.
These effectors can apply forces that move the body forward and rotate it.
In the simplified physics of this environment,
the velocity of movement is proportional to the force applied.
def __init__(self):
# pygame initialization
self.screen = pygame.display.set_mode((SCREEN_SIZE, SCREEN_SIZE))
self.background = pygame.Surface(self.screen.get_size())
self.background = self.background.convert()
self.background.fill((255, 255, 255)) = Food()
self.food_sprite = pygame.sprite.RenderPlain( # create sprite group for the food
self.seeker = Seeker()
self.seeker_sprite = pygame.sprite.RenderPlain(self.seeker) # create sprite group for the seeker
# pybrain initialization
self.action = [0.0, 0.0]
def _draw(self):
self.screen.blit(self.background, (0, 0))
self.seeker.update(self.screen) # we don't need to draw this here because we draw it with pymunk
def _calcDistance(self, loc1, loc2):
""" Calculates the Euclidean distance between two coordinate pairs. """
from math import sqrt
return sqrt((loc2[0] - loc1[0]) ** 2 + (loc2[1] - loc1[1]) ** 2)
def calcSignal(self, loc):
""" Calculates the chemical signal at a specific location, which is
the inverse square of the distance between the given location and the food. """
dist = self._calcDistance(, loc)
if dist == 0:
return 1
return 1/dist # why does changing the reward magnitude change the sensor-delta magnitude?
def getSensors(self):
""" the currently visible state of the world (the observation may be
stochastic - repeated calls returning different values)
:rtype: by default, this is assumed to be a numpy array of doubles
# get sensor locations
lx, ly, rx, ry = self.seeker.calcAbsoluteSensorPositions()
# return the strength of the chemical signals at the seeker's left and right sensors
return [ self.calcSignal(toPygame((lx, ly))), self.calcSignal(toPygame((rx, ry))) ]
def performAction(self, action):
""" perform an action on the world that changes its internal state (maybe
:key action: an action that should be executed in the Environment.
:type action: by default, this is assumed to be a numpy array of doubles
action[0] is the left motor/effector neuron output, action[1] is the right
self.seeker.move_body(action[0], action[1])
# redraw
def reset(self):
""" Reinitializes the environment with the food in a random location
and the seeker with a random direction in a random location.
from random import random
self.movement_tracker = []*SCREEN_SIZE, random()*SCREEN_SIZE))
__author__ = 'Daniel Horowitz,'
from ChemotaxisEnv import ChemotaxisEnv
from globals import toPygame#, SCREEN_SIZE
from pybrain.rl.environments import EpisodicTask
class ChemotaxisTask(EpisodicTask):
def __init__(self, env=None, maxsteps=1000):
:key env: (optional) an instance of a ChemotaxisEnv (or a subclass thereof)
:key maxsteps: maximal number of steps (default: 1000)
if env == None:
env = ChemotaxisEnv()
self.env = env
EpisodicTask.__init__(self, env)
self.N = maxsteps
self.t = 0
#self.actor_limits = [(0,1), (0,1)] # scale (-1,1) to motor neurons
self.sensor_limits = [(0,1), (0,1)] # scale sensor neurons to (-1,1)
def reset(self):
self.t = 0
def performAction(self, action):
self.t += 1
EpisodicTask.performAction(self, action)
def isFinished(self):
if self.t >= self.N:
# maximal timesteps
return True
return False
def getReward(self):
""" The reward is equal to the chemical signal of the food at the seeker's location,
which is the inverse square of the distance between those locations. """
return self.env.calcSignal(toPygame(self.env.seeker.body.position))
def setMaxLength(self, n):
self.N = n
import pygame
class Food(pygame.sprite.Sprite):
loc = (0,0)
def __init__(self):
self.image = pygame.Surface([self.RADIUS*2,self.RADIUS*2])
self.rect =, (0,0,0), self.loc, self.RADIUS)
def setLocation(self, loc):
self.loc = loc
def update(self): = self.loc
def toPygame(xy):
""" convert pymunk coordinates to pygame coordinates """
return xy[0], SCREEN_SIZE-xy[1]
from pymunk import pygame_util
import pygame
import pymunk
from globals import SCREEN_SIZE
from math import pi
class Seeker(pygame.sprite.Sprite):
amin = 0
amax = 0
def __init__(self):
# pygame init
self.image = pygame.Surface([10, 10])
# pymunk init = pymunk.Space()
self.walls = [
pymunk.Segment(, (0,0+self.WALL_WIDTH), (SCREEN_SIZE, 0+self.WALL_WIDTH), self.WALL_WIDTH), # bottom
pymunk.Segment(, (0,0), (0, SCREEN_SIZE), self.WALL_WIDTH), # left
pymunk.Segment(, (0, SCREEN_SIZE), (SCREEN_SIZE, SCREEN_SIZE), self.WALL_WIDTH), # top
pymunk.Segment(, (SCREEN_SIZE-self.WALL_WIDTH, 0), (SCREEN_SIZE-self.WALL_WIDTH, SCREEN_SIZE), self.WALL_WIDTH) # right
def collision(space, arbiter, *args, **kwargs):
""" make seeker bounce off walls """
self.body.velocity.x = -self.body.velocity.x
self.body.velocity.y = -self.body.velocity.y
self.body.angle += pi
c = 1
for wall in self.walls:, c, begin=collision)
wall.collision_type = c
c += 1
wall.friction = 1.0 = 1
wall.color = pygame.color.THECOLORS["black"]
self.force_l = pymunk.Body(1,1)
self.force_r = pymunk.Body(1,1)
self.force_line_l = pymunk.Segment(self.force_l, (0,0), (0,0), 1)
self.force_line_r = pymunk.Segment(self.force_r, (0,0), (0,0), 1)
self.force_space = pymunk.Space()
self.force_space.add(self.force_line_l, self.force_line_r)
self.body = pymunk.Body(mass=5, moment=1000) # tweak these to decrease spinning out of control
self.body.velocity_limit = self.VELOCITY_LIMIT
self.body.angular_velocity_limit = self.ANGULAR_VELOCITY_LIMIT
self.body_shape = pymunk.Circle(self.body, self.RADIUS, (0, 0))
self.sensor_left = pymunk.Circle(self.body, self.SENSOR_RADIUS, (-self.RADIUS, 0))
self.sensor_right = pymunk.Circle(self.body, self.SENSOR_RADIUS, (self.RADIUS, 0))
self.body_shape.collision_type = 0
self.body_shape.color = pygame.color.THECOLORS["black"]
self.sensor_left.color = pygame.color.THECOLORS["red"]
self.sensor_right.color = pygame.color.THECOLORS["red"], self.body_shape, self.sensor_left, self.sensor_right)
def orientate_sensors(self, left_sensor_relative_pos, right_sensor_relative_pos):, self.sensor_right)
self.sensor_left = pymunk.Circle(self.body, self.SENSOR_RADIUS, left_sensor_relative_pos)
self.sensor_right = pymunk.Circle(self.body, self.SENSOR_RADIUS, right_sensor_relative_pos), self.sensor_right)
def update_force_lines(self, force_l_start, force_l_end, force_r_start, force_r_end):
self.force_space.remove(self.force_line_l, self.force_line_r)
self.force_line_l = pymunk.Segment(self.force_l, force_l_start, force_l_end, 1)
self.force_line_r = pymunk.Segment(self.force_r, force_r_start, force_r_end, 1)
self.force_space.add(self.force_line_l, self.force_line_r)
def position_body(self, pos=None):
from random import random
if not pos:
pos_range = SCREEN_SIZE-self.WALL_WIDTH # lower maximum
pos = random()*pos_range + self.WALL_WIDTH # raise minimum
self.body.position = (pos, pos)
self.body.position = pos
def reset_velocity(self):
self.body.velocity = (0,0)
self.body.angular_velocity = 0
def update(self, surface):
#lx,ly,rx,ry = self.calcSensorPositions()
# wrap sloppily around edges
#if (self.body.position.x < 0):
# self.position_body((SCREEN_SIZE, self.body.position.y))
#if (self.body.position.x > SCREEN_SIZE):
# self.position_body((0, self.body.position.y))
#if (self.body.position.y < 0):
# self.position_body((self.body.position.x, SCREEN_SIZE))
#if (self.body.position.y > SCREEN_SIZE):
# self.position_body((self.body.position.x, 0))
#print self.body.position
pygame_util.draw_space(surface, self.force_space)
def reset(self):
def calcRelativeSensorPositions(self):
""" Calculate locations of sensors using the magic of trigonometry. """
from math import sin, cos
lx = self.RADIUS * sin(self.body.angle)
ly = self.RADIUS * -cos(self.body.angle)
rx = self.RADIUS * -sin(self.body.angle)
ry = self.RADIUS * cos(self.body.angle)
return lx, ly, rx, ry
def calcAbsoluteSensorPositions(self):
lx, ly, rx, ry = self.calcRelativeSensorPositions()
lx += self.body.position[0]
ly += self.body.position[1]
rx += self.body.position[0]
ry += self.body.position[1]
return lx, ly, rx, ry
def constrain_angle(self, angle):
""" keep angle between 0 and 2*pi """
if angle > 2*pi:
angle = angle - 2*pi
elif angle < 0:
angle = 2*pi - angle
return angle
def move_body(self, la, ra):
""" apply forces to the seeker's body """
from math import sin, cos
print la, ra
self.body.angle = self.constrain_angle(self.body.angle)
lx, ly, rx, ry = self.calcRelativeSensorPositions()
self.orientate_sensors((lx,ly), (rx,ry))
a = self.body.angle
# break down forces into their x and y components
force_lx = la * cos(a)
force_ly = la * sin(a)
force_rx = ra * cos(a)
force_ry = ra * sin(a)
# apply forces at the sensor locations in the direction of the seeker
self.body.apply_force((force_lx, force_ly), (lx, ly))
self.body.apply_force((force_rx, force_ry), (rx, ry))
lx, ly, rx, ry = self.calcAbsoluteSensorPositions()
self.update_force_lines((lx, ly), (lx+force_lx, ly+force_ly), (rx, ry), (rx+force_rx, ry+force_ry))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment