Last active
March 19, 2022 03:05
-
-
Save desophos/4477624 to your computer and use it in GitHub Desktop.
An agent that learns to chemotax using pybrain for neural networks and learning, pygame for graphics, and pymunk for physics.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from ChemotaxisEnv import ChemotaxisEnv | |
from EpisodicChemotaxisTask import ChemotaxisTask | |
from globals import SCREEN_SIZE | |
from pybrain.rl.agents import OptimizationAgent | |
from pybrain.rl.experiments import EpisodicExperiment | |
from pybrain.optimization import HillClimber | |
from pybrain.tools.shortcuts import buildNetwork | |
import matplotlib.pyplot as pyplot | |
MAX_TRIALS = 10 | |
MAX_STEPS = 1200 | |
# pybrain initialization | |
task = ChemotaxisTask(ChemotaxisEnv(), MAX_STEPS) | |
module = buildNetwork(2,2,2) # create a feed-forward neural network with 3 layers: 2 input neurons, 2 hidden neurons, and 2 output neurons | |
learner = HillClimber(task, module, maxEvaluations=MAX_TRIALS, mustMinimize=True, storeAllEvaluations=True, storeAllEvaluated=True, verbose=False) | |
learner.learn() | |
reward_avgs = [e/MAX_STEPS for e in learner._allEvaluations] | |
# show average reward over trials | |
pyplot.figure(1) | |
pyplot.plot(range(1,len(reward_avgs)+1), reward_avgs) | |
pyplot.ylabel("Average Reward") | |
pyplot.xlabel("Trial #") | |
pyplot.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from Food import Food | |
from Seeker import Seeker | |
from globals import SCREEN_SIZE, toPygame | |
from pybrain.rl.environments.environment import Environment | |
import pygame | |
class ChemotaxisEnv(Environment): | |
""" | |
Experiment adapted from "Evolving Dynamical Neural Networks for Adaptive Behavior" (Beer & Gallagher, 1992) | |
An seeker is enclosed in a square box with a food item inside. | |
This food item emits a chemical signal whose intensity falls off | |
as the inverse square of the distance from the food. | |
The intensity of the chemical signal within the environment varies | |
five orders of magnitude from the item to the far corners of the box, i.e. at a distance of root(2). | |
We wish the seeker to find and remain in the vicinity of the food item, | |
starting from arbitrary locations and orientations within the environment. | |
To accomplish this task, the seeker is endowed with a circular body with a diameter of .01. | |
The seeker possesses chemical sensors that can directly sense | |
the intensity of the chemical signal at their location. | |
These sensors are symmetrically placed about the center line of the body. | |
In addition, the seeker has two effectors located on opposite sides of its body. | |
These effectors can apply forces that move the body forward and rotate it. | |
In the simplified physics of this environment, | |
the velocity of movement is proportional to the force applied. | |
""" | |
def __init__(self): | |
# pygame initialization | |
self.screen = pygame.display.set_mode((SCREEN_SIZE, SCREEN_SIZE)) | |
pygame.mouse.set_visible(0) | |
self.background = pygame.Surface(self.screen.get_size()) | |
self.background = self.background.convert() | |
self.background.fill((255, 255, 255)) | |
self.food = Food() | |
self.food_sprite = pygame.sprite.RenderPlain(self.food) # create sprite group for the food | |
self.seeker = Seeker() | |
self.seeker_sprite = pygame.sprite.RenderPlain(self.seeker) # create sprite group for the seeker | |
self._draw() | |
# pybrain initialization | |
self.action = [0.0, 0.0] | |
self.reset() | |
def _draw(self): | |
#self.seeker_sprite.update(self.background) | |
self.screen.blit(self.background, (0, 0)) | |
self.food.update() | |
self.food_sprite.draw(self.screen) | |
self.seeker.update(self.screen) # we don't need to draw this here because we draw it with pymunk | |
pygame.display.flip() | |
def _calcDistance(self, loc1, loc2): | |
""" Calculates the Euclidean distance between two coordinate pairs. """ | |
from math import sqrt | |
return sqrt((loc2[0] - loc1[0]) ** 2 + (loc2[1] - loc1[1]) ** 2) | |
def calcSignal(self, loc): | |
""" Calculates the chemical signal at a specific location, which is | |
the inverse square of the distance between the given location and the food. """ | |
dist = self._calcDistance(self.food.loc, loc) | |
if dist == 0: | |
return 1 | |
else: | |
return 1/dist # why does changing the reward magnitude change the sensor-delta magnitude? | |
def getSensors(self): | |
""" the currently visible state of the world (the observation may be | |
stochastic - repeated calls returning different values) | |
:rtype: by default, this is assumed to be a numpy array of doubles | |
""" | |
# get sensor locations | |
lx, ly, rx, ry = self.seeker.calcAbsoluteSensorPositions() | |
# return the strength of the chemical signals at the seeker's left and right sensors | |
return [ self.calcSignal(toPygame((lx, ly))), self.calcSignal(toPygame((rx, ry))) ] | |
def performAction(self, action): | |
""" perform an action on the world that changes its internal state (maybe | |
stochastically). | |
:key action: an action that should be executed in the Environment. | |
:type action: by default, this is assumed to be a numpy array of doubles | |
action[0] is the left motor/effector neuron output, action[1] is the right | |
""" | |
self.seeker.move_body(action[0], action[1]) | |
self.movement_tracker.append(toPygame(self.seeker.body.position)) | |
# redraw | |
self._draw() | |
def reset(self): | |
""" Reinitializes the environment with the food in a random location | |
and the seeker with a random direction in a random location. | |
""" | |
from random import random | |
self.movement_tracker = [] | |
self.food.setLocation((random()*SCREEN_SIZE, random()*SCREEN_SIZE)) | |
self.seeker.reset() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__author__ = 'Daniel Horowitz, dhorowitz@oxy.edu' | |
from ChemotaxisEnv import ChemotaxisEnv | |
from globals import toPygame#, SCREEN_SIZE | |
from pybrain.rl.environments import EpisodicTask | |
class ChemotaxisTask(EpisodicTask): | |
def __init__(self, env=None, maxsteps=1000): | |
""" | |
:key env: (optional) an instance of a ChemotaxisEnv (or a subclass thereof) | |
:key maxsteps: maximal number of steps (default: 1000) | |
""" | |
if env == None: | |
env = ChemotaxisEnv() | |
self.env = env | |
EpisodicTask.__init__(self, env) | |
self.N = maxsteps | |
self.t = 0 | |
#self.actor_limits = [(0,1), (0,1)] # scale (-1,1) to motor neurons | |
self.sensor_limits = [(0,1), (0,1)] # scale sensor neurons to (-1,1) | |
def reset(self): | |
EpisodicTask.reset(self) | |
self.t = 0 | |
def performAction(self, action): | |
self.t += 1 | |
EpisodicTask.performAction(self, action) | |
def isFinished(self): | |
if self.t >= self.N: | |
# maximal timesteps | |
return True | |
return False | |
def getReward(self): | |
""" The reward is equal to the chemical signal of the food at the seeker's location, | |
which is the inverse square of the distance between those locations. """ | |
return self.env.calcSignal(toPygame(self.env.seeker.body.position)) | |
def setMaxLength(self, n): | |
self.N = n |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pygame | |
class Food(pygame.sprite.Sprite): | |
loc = (0,0) | |
RADIUS = 3 | |
def __init__(self): | |
pygame.sprite.Sprite.__init__(self) | |
self.image = pygame.Surface([self.RADIUS*2,self.RADIUS*2]) | |
self.rect = pygame.draw.circle(self.image, (0,0,0), self.loc, self.RADIUS) | |
def setLocation(self, loc): | |
self.loc = loc | |
def update(self): | |
self.rect.center = self.loc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SCREEN_SIZE = 400 | |
def toPygame(xy): | |
""" convert pymunk coordinates to pygame coordinates """ | |
return xy[0], SCREEN_SIZE-xy[1] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pymunk import pygame_util | |
import pygame | |
import pymunk | |
from globals import SCREEN_SIZE | |
from math import pi | |
class Seeker(pygame.sprite.Sprite): | |
STEP_TIME = 0.1 | |
RADIUS = 5 | |
SENSOR_RADIUS = 2 | |
VELOCITY_LIMIT = 5 | |
ANGULAR_VELOCITY_LIMIT = pi/16 | |
WALL_WIDTH = 2.0 | |
amin = 0 | |
amax = 0 | |
def __init__(self): | |
# pygame init | |
pygame.sprite.Sprite.__init__(self) | |
self.image = pygame.Surface([10, 10]) | |
# pymunk init | |
self.space = pymunk.Space() | |
self.walls = [ | |
pymunk.Segment(self.space.static_body, (0,0+self.WALL_WIDTH), (SCREEN_SIZE, 0+self.WALL_WIDTH), self.WALL_WIDTH), # bottom | |
pymunk.Segment(self.space.static_body, (0,0), (0, SCREEN_SIZE), self.WALL_WIDTH), # left | |
pymunk.Segment(self.space.static_body, (0, SCREEN_SIZE), (SCREEN_SIZE, SCREEN_SIZE), self.WALL_WIDTH), # top | |
pymunk.Segment(self.space.static_body, (SCREEN_SIZE-self.WALL_WIDTH, 0), (SCREEN_SIZE-self.WALL_WIDTH, SCREEN_SIZE), self.WALL_WIDTH) # right | |
] | |
def collision(space, arbiter, *args, **kwargs): | |
""" make seeker bounce off walls """ | |
self.body.velocity.x = -self.body.velocity.x | |
self.body.velocity.y = -self.body.velocity.y | |
self.body.angle += pi | |
c = 1 | |
for wall in self.walls: | |
self.space.add_collision_handler(0, c, begin=collision) | |
wall.collision_type = c | |
c += 1 | |
wall.friction = 1.0 | |
wall.group = 1 | |
wall.color = pygame.color.THECOLORS["black"] | |
self.space.add(self.walls) | |
self.force_l = pymunk.Body(1,1) | |
self.force_r = pymunk.Body(1,1) | |
self.force_line_l = pymunk.Segment(self.force_l, (0,0), (0,0), 1) | |
self.force_line_r = pymunk.Segment(self.force_r, (0,0), (0,0), 1) | |
self.force_space = pymunk.Space() | |
self.force_space.add(self.force_line_l, self.force_line_r) | |
self.body = pymunk.Body(mass=5, moment=1000) # tweak these to decrease spinning out of control | |
self.body.velocity_limit = self.VELOCITY_LIMIT | |
self.body.angular_velocity_limit = self.ANGULAR_VELOCITY_LIMIT | |
self.body_shape = pymunk.Circle(self.body, self.RADIUS, (0, 0)) | |
self.sensor_left = pymunk.Circle(self.body, self.SENSOR_RADIUS, (-self.RADIUS, 0)) | |
self.sensor_right = pymunk.Circle(self.body, self.SENSOR_RADIUS, (self.RADIUS, 0)) | |
self.body_shape.collision_type = 0 | |
self.body_shape.color = pygame.color.THECOLORS["black"] | |
self.sensor_left.color = pygame.color.THECOLORS["red"] | |
self.sensor_right.color = pygame.color.THECOLORS["red"] | |
self.space.add(self.body, self.body_shape, self.sensor_left, self.sensor_right) | |
self.reset() | |
def orientate_sensors(self, left_sensor_relative_pos, right_sensor_relative_pos): | |
self.space.remove(self.sensor_left, self.sensor_right) | |
self.sensor_left = pymunk.Circle(self.body, self.SENSOR_RADIUS, left_sensor_relative_pos) | |
self.sensor_right = pymunk.Circle(self.body, self.SENSOR_RADIUS, right_sensor_relative_pos) | |
self.space.add(self.sensor_left, self.sensor_right) | |
def update_force_lines(self, force_l_start, force_l_end, force_r_start, force_r_end): | |
self.force_space.remove(self.force_line_l, self.force_line_r) | |
self.force_line_l = pymunk.Segment(self.force_l, force_l_start, force_l_end, 1) | |
self.force_line_r = pymunk.Segment(self.force_r, force_r_start, force_r_end, 1) | |
self.force_space.add(self.force_line_l, self.force_line_r) | |
def position_body(self, pos=None): | |
from random import random | |
if not pos: | |
pos_range = SCREEN_SIZE-self.WALL_WIDTH # lower maximum | |
pos = random()*pos_range + self.WALL_WIDTH # raise minimum | |
self.body.position = (pos, pos) | |
else: | |
self.body.position = pos | |
def reset_velocity(self): | |
self.body.velocity = (0,0) | |
self.body.angular_velocity = 0 | |
def update(self, surface): | |
#lx,ly,rx,ry = self.calcSensorPositions() | |
self.space.step(self.STEP_TIME) | |
# wrap sloppily around edges | |
#if (self.body.position.x < 0): | |
# self.position_body((SCREEN_SIZE, self.body.position.y)) | |
#if (self.body.position.x > SCREEN_SIZE): | |
# self.position_body((0, self.body.position.y)) | |
#if (self.body.position.y < 0): | |
# self.position_body((self.body.position.x, SCREEN_SIZE)) | |
#if (self.body.position.y > SCREEN_SIZE): | |
# self.position_body((self.body.position.x, 0)) | |
#print self.body.position | |
pygame_util.draw_space(surface, self.space) | |
pygame_util.draw_space(surface, self.force_space) | |
def reset(self): | |
self.position_body() | |
self.reset_velocity() | |
def calcRelativeSensorPositions(self): | |
""" Calculate locations of sensors using the magic of trigonometry. """ | |
from math import sin, cos | |
lx = self.RADIUS * sin(self.body.angle) | |
ly = self.RADIUS * -cos(self.body.angle) | |
rx = self.RADIUS * -sin(self.body.angle) | |
ry = self.RADIUS * cos(self.body.angle) | |
return lx, ly, rx, ry | |
def calcAbsoluteSensorPositions(self): | |
lx, ly, rx, ry = self.calcRelativeSensorPositions() | |
lx += self.body.position[0] | |
ly += self.body.position[1] | |
rx += self.body.position[0] | |
ry += self.body.position[1] | |
return lx, ly, rx, ry | |
def constrain_angle(self, angle): | |
""" keep angle between 0 and 2*pi """ | |
if angle > 2*pi: | |
angle = angle - 2*pi | |
elif angle < 0: | |
angle = 2*pi - angle | |
return angle | |
def move_body(self, la, ra): | |
""" apply forces to the seeker's body """ | |
from math import sin, cos | |
print la, ra | |
self.body.angle = self.constrain_angle(self.body.angle) | |
lx, ly, rx, ry = self.calcRelativeSensorPositions() | |
self.orientate_sensors((lx,ly), (rx,ry)) | |
self.body.reset_forces() | |
a = self.body.angle | |
# break down forces into their x and y components | |
force_lx = la * cos(a) | |
force_ly = la * sin(a) | |
force_rx = ra * cos(a) | |
force_ry = ra * sin(a) | |
# apply forces at the sensor locations in the direction of the seeker | |
self.body.apply_force((force_lx, force_ly), (lx, ly)) | |
self.body.apply_force((force_rx, force_ry), (rx, ry)) | |
lx, ly, rx, ry = self.calcAbsoluteSensorPositions() | |
self.update_force_lines((lx, ly), (lx+force_lx, ly+force_ly), (rx, ry), (rx+force_rx, ry+force_ry)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment