Created
September 8, 2017 00:46
-
-
Save earthnuker/91fbb6e1ea02ea9d6899f47a7ba25ab6 to your computer and use it in GitHub Desktop.
http://paradise.xxiivv.com/ clone in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests as RQ | |
import re | |
from dateutil.parser import parse | |
from vessel import Vessel,engine,session | |
from tqdm import tqdm | |
import jinja2 | |
import jinja2.sandbox | |
import jinja2.meta | |
data=str(RQ.get("https://raw.githubusercontent.com/XXIIVV/vessel.paradise/master/memory/paradise.ma").content,"utf-8").splitlines() | |
Vessel.metadata.drop_all(engine) | |
Vessel.metadata.create_all(engine) | |
value_slices=[] | |
id_val=0 | |
print("Importing Snapshot...") | |
for line in tqdm(data,ascii=True): | |
if not line.strip(): | |
continue | |
if line.startswith("~"): | |
continue | |
offset=0 | |
if line.startswith("@ "): | |
line=line.strip("@ ").replace("CODE","CODE ") | |
for match in re.finditer("(\w+)\s*",line): | |
value_slices.append((match.groups()[0].lower(),list(match.span()))) | |
value_slices[-1][1][1]=None | |
value_slices=dict(value_slices) | |
for k,v in value_slices.items(): | |
value_slices[k]=slice(*v) | |
else: | |
data={} | |
for k,v in value_slices.items(): | |
data[k]=line[v].strip() | |
data['id']=id_val | |
state,parent,owner,created=data['code'].split("-") | |
data['parent_id']=int(parent.lstrip("0") or "0") | |
data['owner_id']=int(owner.lstrip("0") or "0") | |
data['created']=created.lstrip("0") or None | |
if data['created']: | |
data['created']=parse(data['created']) | |
state={attr:val=="1" for attr,val in zip(("locked","hidden","silent","tunnel"),state)} | |
data.update(state) | |
del data['code'] | |
Vessel(**data) | |
id_val+=1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import jinja2 | |
import jinja2.sandbox | |
import jinja2.meta | |
import cmd,shlex | |
from vessel import Vessel | |
from datetime import datetime | |
class Cmd_Visitor(jinja2.visitor.NodeTransformer): | |
def visit_Getattr(self,node): | |
orig_lineno=node.lineno | |
node=jinja2.nodes.Getitem(self.visit(node.node),jinja2.nodes.Const(node.attr),'load') | |
node=node.set_lineno(orig_lineno) | |
return node | |
jinja = jinja2.sandbox.SandboxedEnvironment() | |
jinja.undefined = jinja2.StrictUndefined | |
def parse_cmd(cmd): | |
cmd=cmd.replace("{{","").replace("}}","") | |
cmd=cmd.replace("{%","").replace("%}","") | |
cmd=cmd.replace("##","").replace("##","") | |
if "((" in cmd and "))" in cmd: | |
cmd=cmd.replace("((","{{ ").replace("))"," }}") | |
host=Vessel.find(Vessel.id==1).one() | |
now=datetime.today() | |
args={ | |
'vessel':host.parent, | |
'universe':Vessel.universe, | |
'host':host, | |
'atlas':Vessel.atlas, | |
'now':datetime.today(), | |
} | |
template=jinja.parse(cmd) | |
template=Cmd_Visitor().visit(template) | |
template=template.set_environment(jinja) | |
for var in jinja2.meta.find_undeclared_variables(template): | |
if var not in args: | |
raise jinja2.exceptions.UndefinedError("'{}' is undefined".format(var)) | |
return jinja.from_string(template).render(args) | |
def process_cmd(cmd): | |
while 1: | |
old_cmd=cmd | |
cmd=parse_cmd(cmd) | |
if cmd==old_cmd: | |
return cmd | |
class Cmd_Parser(cmd.Cmd): | |
intro="Universe v0.1" | |
prompt=">" | |
def __init__(self,vessel=None): | |
self.vessel=vessel | |
return super(type(self),self).__init__() | |
def cmdloop(self): | |
return super(type(self),self).cmdloop(self.intro) | |
def precmd(self,line): | |
Vessel.update() | |
if not line: | |
return line | |
cmd=line.split()[0] | |
if cmd=="program": | |
return line | |
try: | |
return process_cmd(line) | |
except Exception as e: | |
print("Template Error:",e) | |
return '' | |
def emptyline(self): | |
return | |
def default(self,cmd): | |
print("CMD:",cmd) | |
if not cmd: | |
return '' | |
super(type(self),self).default(cmd) | |
parser=Cmd_Parser() | |
parser.cmdloop() | |
""" | |
while 1: | |
try: | |
print(parse_cmd(input(">"))) | |
except Exception as e: | |
print(e) | |
""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy import create_engine | |
from sqlalchemy.orm import relationship, backref, validates | |
from sqlalchemy.orm.session import sessionmaker | |
from sqlalchemy.schema import ForeignKey | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy import Column, Integer, String, DateTime, Boolean | |
from datetime import datetime | |
import random | |
import inspect | |
import jinja2 | |
Base = declarative_base() | |
engine = create_engine('sqlite:///paradise.db', echo=False) | |
Session = sessionmaker(bind=engine) | |
session = Session() | |
class ClassProperty(property): | |
def __get__(self, cls, owner): | |
return self.fget.__get__(None, owner)() | |
class Vessel(Base): | |
__tablename__ = "vessels" | |
id = Column(Integer,primary_key=True) | |
name = Column(String) | |
attr = Column(String,default="") | |
note = Column(String,default="") | |
program = Column(String,default="") | |
parent_id = Column(Integer,ForeignKey('vessels.id'),onupdate=lambda context:context.current_parameters['id']) | |
_children = relationship("Vessel",primaryjoin =('Vessel.id == Vessel.parent_id'),backref=backref('parent', remote_side=[id])) | |
owner_id = Column(Integer,ForeignKey('vessels.id')) | |
owned = relationship("Vessel",primaryjoin =('Vessel.id == Vessel.owner_id'),backref=backref('owner', remote_side=[id])) | |
created = Column(DateTime,nullable=True,default=datetime.now) | |
locked = Column(Boolean,default=False) | |
hidden = Column(Boolean,default=False) | |
silent = Column(Boolean,default=False) | |
tunnel = Column(Boolean,default=False) | |
def __init__(self,*args,**kwargs): | |
ret=super().__init__(*args,**kwargs) | |
session.add(self) | |
session.commit() | |
@classmethod | |
def update(self): | |
return session.commit() | |
@property | |
def children(self): | |
query=( | |
(Vessel.parent_id==self.id) &\ | |
(Vessel.id != self.id) & \ | |
(((Vessel.owner_id == self.owner_id) | (Vessel.owner_id == self.id)) | (not self.silent)) | |
) | |
return Vessel.find(query) | |
@property | |
def siblings(self): | |
res=[] | |
query=( | |
(Vessel.parent_id==self.id) &\ | |
(Vessel.id != self.parent_id) & \ | |
(Vessel.id != self.id) & \ | |
(((Vessel.owner_id == self.owner_id) | (Vessel.owner_id == self.id)) | (not self.parent.silent)) | |
) | |
return Vessel.find(query) | |
def encode(self): | |
state="".join(str(int(v)) for v in (self.locked,self.hidden,self.silent,self.tunnel)) | |
parent_id=str(self.parent_id).zfill(5) | |
owner_id=str(self.owner_id).zfill(5) | |
TS=self.created.strftime("%Y%m%d%H%M%S") | |
return "{}-{}-{}-{}".format(state,parent_id,owner_id,TS) | |
def random(*query_t,num=1): | |
if not query_t: | |
query_t=(Vessel,) | |
cnt=session.query(Vessel).count() | |
ids=random.sample(range(cnt),num) | |
res=session.query(*query_t).filter(Vessel.id.in_(ids)).all() | |
if num==1: | |
return res[0] | |
return res | |
def random_child(self,num=1): | |
assert isinstance(self,Vessel) | |
if not self.children: | |
return None | |
res=random.sample(self.children,num) | |
if num==1: | |
return res[0] | |
return res | |
def __repr__(self): | |
return "<Vessel '{}' (ID {})>".format("{} {}".format(self.attr,self.name).strip(),self.id) | |
def dict(self,name): | |
cols={c.name: getattr(self, c.name) for c in self.__table__.columns} | |
cols['parent']=self.parent | |
cols['children']=self.children.all() | |
cols['num_children']=len(cols['children']) | |
cols['siblings']=self.siblings.all() | |
cols['num_siblings']=len(cols['siblings']) | |
cols['stem']=self.stem | |
cols['paradox']=self.paradox | |
cols['depth']=self.depth | |
cols['full_name']="{} {}".format(self.attr,self.name).strip() | |
return cols[name] | |
def __getitem__(self,name): | |
try: | |
return self.dict(name) | |
except KeyError as e: | |
raise jinja2.exceptions.UndefinedError("'{}' is undefined".format(name)) | |
@classmethod | |
def find(cls,*args,**kwargs): | |
return session.query(cls).filter(*args,**kwargs) | |
@classmethod | |
def get(cls,id): | |
return session.query(cls).get(id) | |
@ClassProperty | |
@classmethod | |
def atlas(cls): | |
return session.query(cls).filter(cls.id==cls.parent_id).all() | |
@ClassProperty | |
@classmethod | |
def tunnels(cls): | |
return session.query(cls).filter(cls.tunnel==True).all() | |
@ClassProperty | |
@classmethod | |
def universe(cls): | |
return session.query(cls).all() | |
@property | |
def paradox(self): | |
return self.id==self.parent_id | |
@property | |
def stem(self): | |
V=self | |
L=[] | |
while 1: | |
L.append(V.id) | |
if V.parent_id==V.id: | |
return V | |
if V.id in L: | |
return None | |
V=V.parent | |
return V | |
@property | |
def depth(self): | |
V=self | |
L=[] | |
d=0 | |
while 1: | |
L.append(V.id) | |
if V.parent_id==V.id: | |
return d | |
d+=1 | |
if V.id in L: | |
return float("inf") | |
V=V.parent | |
return V | |
@classmethod | |
def exists(cls,*args,**kwargs): | |
return session.query(session.query(cls).exists().where(*args,**kwargs)).scalar() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment