Skip to content

Instantly share code, notes, and snippets.

@adamcunnington
Created November 10, 2013 03:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamcunnington/b24bba22f157de1ed9c0 to your computer and use it in GitHub Desktop.
Save adamcunnington/b24bba22f157de1ed9c0 to your computer and use it in GitHub Desktop.
# <path to game directory>/addons/eventscipts/admin/mods/rpg/
# rpg.py
# by Adam Cunnington
from __future__ import with_statement
import os
import psyco
psyco.full()
from sqlalchemy import (Column, create_engine, event, ForeignKey, Integer,
String, Unicode)
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import es
from esutils import menus, players, tools
__all__ = (
"RPGError",
"Player",
"PlayerPerk",
"Perk",
)
_Base = declarative_base()
class RPGError(Exception):
"""General error encountered relating to admin.mods.rpg"""
pass
class Player(_Base):
__tablename__ = "Players"
data = {}
ID = Column(Integer, primary_key=True, autoincrement=True)
steam_ID = Column(String, nullable=False)
name = Column(Unicode, nullable=False)
experience_points = Column(Integer, default=0, nullable=False)
level = Column(Integer, default=0, nullable=False)
credits = Column(Integer, default=10000, nullable=False)
def __init__(self, steam_ID, name):
self.steam_ID = steam_ID
self.name = name
@classmethod
def _add_player(cls, user_ID, steam_ID, name):
with _QuerySessionWrapper() as session:
player = session.query(Player).filter(Player.steam_ID ==
steam_ID).first()
if player is None:
player = cls(steam_ID, name)
with _CommitSessionWrapper() as session:
session.add(player)
else:
player.name = name.decode("UTF-8").encode("latin-1").decode("UTF-8") # Fix
cls.data[user_ID] = player
PlayerPerk._load_player_perks(user_ID, player.ID)
@classmethod
def _get_description(cls, user_ID):
return ":: Perk Buy Menu (%s Credits)" % cls.data[user_ID].credits
@classmethod
def _get_items(cls, user_ID):
player = cls.data[user_ID]
player_perks = PlayerPerk.data[user_ID]
for perk_ID in _Perk.data:
perk = _Perk.data[perk_ID]
perk_manager = PerkManager.data.get(perk.basename)
if perk_manager is None or not perk_manager.enabled:
yield menus.MenuOption("%s -> [DISABLED]" % perk.verbose_name,
selectable=False)
continue
else:
player_perk = player_perks.get(perk.basename)
if player_perk is None:
next_level = 1
else:
if player_perk.level >= perk_manager.max_level:
yield menus.MenuOption("%s -> %s [MAXED]" %(
perk.verbose_name,
player_perk.level),
selectable=False)
continue
else:
next_level = player_perk.level + 1
cost = perk_manager.cost_calculator(next_level)
selectable = cost <= player.credits
yield menus.MenuOption("%s -> %s [%s Credits]" %(
perk.verbose_name, next_level, cost),
(player, perk_manager, player_perk,
next_level, cost), selectable)
class _Perk(_Base):
__tablename__ = "Perks"
ID = Column(Integer, primary_key=True, autoincrement=True)
basename = Column(String, nullable=False)
verbose_name = Column(String, nullable=False)
def __init__(self, basename, verbose_name):
self.basename = basename
self.verbose_name = verbose_name
@classmethod
def add_perk(cls, basename, verbose_name):
if basename in cls.data:
return cls.data[basename]
with _QuerySessionWrapper() as session:
perk = session.query(_Perk).filter(_Perk.basename ==
basename).first()
if perk is None:
perk = cls(basename, verbose_name)
with _CommitSessionWrapper() as session:
session.add(perk)
cls.data[perk.ID] = perk
return perk
@classmethod
def load_perks(cls):
with _QuerySessionWrapper() as session:
cls.data = dict((perk.ID, perk.basename) for perk in
session.query(_Perk).all())
class PlayerPerk(_Base):
__tablename__ = "Players Perks"
data = {}
player_ID = Column(Integer, ForeignKey(Player.ID), primary_key=True)
perk_ID = Column(Integer, ForeignKey(_Perk.ID), primary_key=True)
level = Column(Integer, nullable=False)
players = relationship(Player, backref=Player.__tablename__)
perks = relationship(_Perk, backref=_Perk.__tablename__)
def __init__(self, user_ID, player_ID, perk_ID, perk_basename):
self.player_ID = player_ID
self.perk_ID = perk_ID
self.level = 1
self.data[user_ID][perk_basename] = self
@classmethod
def _load_player_perks(cls, user_ID, player_ID):
with _QuerySessionWrapper() as session:
player_perks = session.query(PlayerPerk).filter(
PlayerPerk.player_ID == player_ID).all()
cls.data[user_ID] = dict(
(PerkManager.data[_Perk.data[player_perk.perk_ID].basename],
player_perk) for player_perk in player_perks)
class PerkManager(object):
data = {}
def __init__(self, basename, max_level, perk_calculator,
cost_calculator, level_change_callable=None,
verbose_name=None):
if basename in self.data:
raise RPGError("a perk with basename, %s, is already registered"
% basename)
self.max_level = max_level
self.perk_calculator = perk_calculator
self.cost_calculator = cost_calculator
self.level_change_callable = level_change_callable
if verbose_name is None:
verbose_name = tools.format_verbose_name(basename)
self.perk = _Perk.add_perk(basename, verbose_name)
with _CommitSessionWrapper() as session:
session.add(self.perk)
self.data[basename] = self
self.enabled = True
def unload(self):
es.server.queuecmd("es_xunload rpg/perks/%s" % self.basename)
self.enabled = False
def load():
_Perk.load_perks()
for player in players.all_players():
Player._add_player(player.user_ID, player.steam_ID, player.name)
def player_connect(event_var):
# TO DO: Update Bot Perks with Median Values
Player._add_player(int(event_var["userid"]), event_var["networkid"],
event_var["name"])
def player_changename(event_var):
Player.data[int(event_var["userid"])].name = event_var["newname"]
def player_death(event_var):
user_ID = int(event_var["userid"])
with _CommitSessionWrapper() as session:
session.add(Player.data[user_ID])
session.add_all(PlayerPerk.data[user_ID].values())
def player_disconnect(event_var):
user_ID = int(event_var["userid"])
with _CommitSessionWrapper() as session:
session.add(Player.data[user_ID])
session.add_all(PlayerPerk.data[user_ID].values())
del Player.data[user_ID]
del PlayerPerk.data[user_ID]
def player_say(event_var):
if event_var["text"] == "rpgmenu":
_rpg_menu.send(int(event_var["userid"]))
def round_end(event_var):
objects_to_commit = []
for player in players.all_players():
objects_to_commit.append(Player.data[player.user_ID])
objects_to_commit.extend(PlayerPerk.data[player.user_ID].values())
with _CommitSessionWrapper() as session:
session.add_all(objects_to_commit)
def _rpg_menu_callback(user_ID, (player, perk_manager, player_perk, level,
cost)):
player.credits -= cost
if player_perk is None:
old_level = 0
player_perk = PlayerPerk(user_ID, player.ID, perk_manager.perk.ID,
perk_manager.perk.basename)
else:
old_level = player_perk.level
player_perk.level += 1
if perk_manager.level_change_callable is not None:
perk_manager.level_change_callable(user_ID, player, player_perk,
old_level, player_perk.level)
_rpg_menu = menus.Menu(_rpg_menu_callback,
get_description=Player._get_description,
get_items=Player._get_items, resend=True)
_rpg_menu.title = "OLYMPIAN# RPG"
def unload():
while PerkManager.data:
basename, perk = PerkManager.data.popitem()
perk.unload()
_Perk.data.clear()
Player.data.clear()
PlayerPerk.data.clear()
_engine = create_engine("sqlite:///%s" %
os.path.join(os.path.dirname(__file__), "players.db"))
_Base.metadata.create_all(_engine)
@event.listens_for(_engine, "connect")
def receive_connect(connection, connection_record):
connection.execute("PRAGMA JOURNAL_MODE = WAL")
connection.execute("PRAGMA SYNCHRONOUS = OFF")
connection.execute("PRAGMA FOREIGN_KEYS = ON")
class _QuerySessionWrapper(object):
Session = sessionmaker(bind=_engine, expire_on_commit=False)
def __enter__(self):
self.session = self.Session()
return self.session
def __exit__(self, exc_type, exc_value, traceback):
self.session.close()
class _CommitSessionWrapper(_QuerySessionWrapper):
def __exit__(self, exc_type, exc_value, traceback):
try:
if exc_value is None:
self.session.commit()
finally:
self.session.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment