Skip to content

Instantly share code, notes, and snippets.

@audiodude
Last active July 31, 2019 03:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save audiodude/c23566c48e4f77888ba281e90e8b814e to your computer and use it in GitHub Desktop.
Save audiodude/c23566c48e4f77888ba281e90e8b814e to your computer and use it in GitHub Desktop.

Implementatations of Regret Minimization and Counterfactual Regret Minimization in Python.

These are implemented as outlined in An Introduction to Counterfactual Regret Minimization (Neller and Lanctot) (paper here)

rps.py: finds a correlated equilibrium in Rocks-Paper-Scissors, which in this case happens to be the Nash equilibrium. It is very similar to the code found in this YouTube video

kuhn.py: finds a correlated equilibrium in Kuhn Poker which is a toy poker game described in the paper and previously.

kuhn_3.py: finds a correlated equilibrium in 3 player Kuhn Poker, as described in this paper

All examples run on Python 3 (3.6 to be exact).

import enum
import random
class Actions(enum.Enum):
PASS = 'p'
BET = 'b'
def __repr__(self):
return self.value
class Node:
def __init__(self, infoset):
self.infoset = infoset
# Initialize strategy to randomness
self.strategy = {}
for a in Actions:
self.strategy[a] = random.random()
normalizing_sum = sum(self.strategy.values())
for a in Actions:
self.strategy[a] /= normalizing_sum
self.strategy_sum = dict((a, 0) for a in Actions)
self.regret_sum = dict((a, 0) for a in Actions)
def __str__(self):
return '<%4s: %r>' % (self.infoset, self.get_average_strategy())
def update_strategy(self, realization_weight):
normalizing_sum = 0
for action, value in self.strategy.items():
self.strategy[action] = 0
if self.regret_sum[action] > 0:
self.strategy[action] = self.regret_sum[action]
normalizing_sum += self.strategy[action]
for action, value in self.strategy.items():
if normalizing_sum > 0:
self.strategy[action] /= normalizing_sum
else:
self.strategy[action] = 1.0 / len(self.strategy)
self.strategy_sum[action] += realization_weight * self.strategy[action]
def action(self):
rnd = random.random()
a = 0
cumulative_prob = 0
for action, prob in self.strategy.items():
cumulative_prob += prob
if rnd < cumulative_prob:
return action
def get_average_strategy(self):
normalizing_sum = 0
avg_strategy = dict((a, 0) for a in Actions)
normalizing_sum = sum(self.strategy_sum.values())
for a, sum_ in self.strategy_sum.items():
if normalizing_sum > 0:
avg_strategy[a] = sum_ / normalizing_sum
else:
avg_strategy[a] = -1
return avg_strategy
class KuhnTrainer:
def __init__(self):
self.nodemap = {}
def cfr(self, cards, history, p0, p1):
plays = len(history)
player = plays % 2
opponent = 1 - player
# Check for terminal states and return payoff
if plays > 1:
terminal_pass = history[plays - 1] == 'p'
double_bet = history[plays - 2:plays] == 'bb'
is_player_card_higher = cards[player] > cards[opponent];
if terminal_pass:
if history == 'pp':
return 1 if is_player_card_higher else -1
else:
return 1
elif double_bet:
return 2 if is_player_card_higher else -2
infoset = str(cards[player]) + history
# Get information set node or create if non-existant
node = self.nodemap.get(infoset, Node(infoset))
self.nodemap[infoset] = node
# For each action, recursively call cfr with additional history and
# probability
node.update_strategy(p0 if player == 0 else p1)
util = {}
node_util = 0
for a in Actions:
next_history = history + a.value
if player == 0:
util[a] = self.cfr(cards, next_history, p0 * node.strategy[a], p1)
else:
util[a] = self.cfr(cards, next_history, p0, p1 * node.strategy[a])
node_util += node.strategy[a] * util[a]
for a in Actions:
regret = util[a] - node_util
node.regret_sum[a] += (p0 if player == 0 else p1) * regret
return node_util
def train(self, iterations):
cards = [1, 2, 3]
util = 0
for _ in range(iterations):
random.shuffle(cards)
util += self.cfr(cards, '', 1, 1)
print('Average game value: %s' % (util / iterations))
for node in self.nodemap.values():
print(node)
if __name__ == '__main__':
kt = KuhnTrainer().train(10000)
import enum
import random
class Actions(enum.Enum):
CHECK = 'c'
BET = 'b'
FOLD = 'f'
def __repr__(self):
return self.value
class Node:
def __init__(self, infoset):
self.infoset = infoset
self.reset_strategy()
self.strategy_sum = dict((a, 0) for a in Actions)
self.regret_sum = dict((a, 0) for a in Actions)
def __str__(self):
return '<%4s: %r>' % (self.infoset, self.get_average_strategy())
def reset_strategy(self):
# Initialize strategy to randomness
self.strategy = {}
for a in Actions:
self.strategy[a] = random.random()
normalizing_sum = sum(self.strategy.values())
for a in Actions:
self.strategy[a] /= normalizing_sum
def update_strategy(self, realization_weight):
normalizing_sum = 0
for action, value in self.strategy.items():
self.strategy[action] = 0
if self.regret_sum[action] > 0:
self.strategy[action] = self.regret_sum[action]
normalizing_sum += self.strategy[action]
for action, value in self.strategy.items():
if normalizing_sum > 0:
self.strategy[action] /= normalizing_sum
else:
self.strategy[action] = 1.0 / len(self.strategy)
self.strategy_sum[action] += realization_weight * self.strategy[action]
def action(self):
rnd = random.random()
a = 0
cumulative_prob = 0
for action, prob in self.strategy.items():
cumulative_prob += prob
if rnd < cumulative_prob:
return action
def get_average_strategy(self):
normalizing_sum = 0
avg_strategy = dict((a, 0) for a in Actions)
normalizing_sum = sum(self.strategy_sum.values())
for a, sum_ in self.strategy_sum.items():
if normalizing_sum > 0:
avg_strategy[a] = sum_ / normalizing_sum
else:
avg_strategy[a] = -1
for a in Actions:
if avg_strategy[a] < 0.001:
avg_strategy[a] = 0
normalizing_sum = sum(avg_strategy.values())
for a in Actions:
avg_strategy[a] /= normalizing_sum
return avg_strategy
class KuhnTrainer:
def __init__(self):
self.nodemap = {}
# @memoize
def cfr(self, cards, history, p0, p1, p2):
player = len(history) % 3
# Check for terminal states and return payoff
if len(history) > 1:
last_three = history[-3:]
# Play is only terminal if a player hasn't checked in last three moves.
# Or, if all players have checked in the last three moves.
if 'c' not in last_three or last_three == 'ccc':
# Check who's folded
opponents = [0, 1, 2]
for i, h in enumerate(history):
if h == 'f':
opponents.remove(i % 3)
is_player_card_highest = True
for opp in opponents:
if cards[opp] > cards[player]:
is_player_card_highest = False
break
if last_three == 'ccc':
return 3 if is_player_card_highest else -1
elif last_three.count('f') == 2:
# Everyone else folded
return history.count('b') + 3
else:
return history.count('b') + 3 if is_player_card_highest else -2
infoset = str(cards[player]) + history
# Get information set node or create if non-existant
node = self.nodemap.get(infoset, Node(infoset))
self.nodemap[infoset] = node
# For each action, recursively call cfr with additional history and
# probability
probs = (p0, p1, p2)
node.update_strategy(probs[player])
util = {}
node_util = 0
available_actions = list(Actions)
if 'b' in history:
available_actions.remove(Actions.CHECK)
else:
available_actions.remove(Actions.FOLD)
for a in available_actions:
next_history = history + a.value
if player == 0:
util[a] = self.cfr(cards, next_history, p0 * node.strategy[a], p1, p2)
elif player == 1:
util[a] = self.cfr(cards, next_history, p0, p1 * node.strategy[a], p2)
elif player == 2:
util[a] = self.cfr(cards, next_history, p0, p1, p2 * node.strategy[a])
node_util += node.strategy[a] * util[a]
for a in available_actions:
regret = util[a] - node_util
node.regret_sum[a] += probs[player] * regret
return node_util
def train(self, iterations):
cards = [1, 2, 3, 4]
util = 0
has_reset = False
for i in range(iterations):
random.shuffle(cards)
util += self.cfr(tuple(cards), '', 1, 1, 1)
if not has_reset and i > iterations / 2:
for node in self.nodemap.values():
node.reset_strategy()
print('Average game value: %s' % (util / iterations))
for node in self.nodemap.values():
print(node)
if __name__ == '__main__':
kt = KuhnTrainer().train(50000)
import enum
import random
class Choice(enum.Enum):
ROCK = 1
PAPER = 2
SCIS = 3
def utility(choice_a, choice_b):
if choice_a == choice_b:
return 0
elif choice_a == Choice.ROCK:
if choice_b == Choice.PAPER:
return -1
else:
return 1
elif choice_a == Choice.PAPER:
if choice_b == Choice.SCIS:
return -1
else:
return 1
else:
if choice_b == Choice.ROCK:
return -1
else:
return 1
class RegretMatcher:
def __init__(self):
r, p, s = (random.random(), random.random(), random.random())
normalizing_sum = r + p + s
r /= normalizing_sum
p /= normalizing_sum
s /= normalizing_sum
self.strategy = dict(zip(Choice, (r, p, s)))
self.strategy_sum = dict((choice, 0) for choice in Choice)
self.regret_sum = dict((choice, 0) for choice in Choice)
def update_strategy(self):
normalizing_sum = 0
for choice, value in self.strategy.items():
self.strategy[choice] = 0
if self.regret_sum[choice] > 0:
self.strategy[choice] = self.regret_sum[choice]
normalizing_sum += self.strategy[choice]
for choice, value in self.strategy.items():
if normalizing_sum > 0:
self.strategy[choice] /= normalizing_sum
else:
self.strategy[choice] = 1.0 / len(self.strategy)
self.strategy_sum[choice] += self.strategy[choice]
def action(self):
rnd = random.random()
a = 0
cumulative_prob = 0
for choice, prob in self.strategy.items():
cumulative_prob += prob
if rnd < cumulative_prob:
return choice
def update_regret_sum(self, my_action, opp_action):
action_utility = dict((choice, utility(choice, opp_action))
for choice in Choice)
self.regret_sum = dict(
(choice, action_utility[choice] - action_utility[my_action])
for choice in Choice)
def get_average_strategy(self):
normalizing_sum = 0
avg_strategy = dict((choice, 0) for choice in Choice)
normalizing_sum = sum(self.strategy_sum.values())
for choice, sum_ in self.strategy_sum.items():
if normalizing_sum > 0:
avg_strategy[choice] = sum_ / normalizing_sum
else:
avg_strategy[choice] = -1
return avg_strategy
def train():
my_matcher = RegretMatcher()
opp_matcher = RegretMatcher()
for _ in range(10000):
my_action = my_matcher.action()
opp_action = opp_matcher.action()
my_matcher.update_regret_sum(my_action, opp_action)
my_matcher.update_strategy()
opp_matcher.update_regret_sum(opp_action, my_action)
opp_matcher.update_strategy()
print(my_matcher.get_average_strategy())
print(opp_matcher.get_average_strategy())
if __name__ == '__main__':
train()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment