Skip to content

Instantly share code, notes, and snippets.

@mayneyao
Created September 18, 2023 20:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mayneyao/50a9a7b874c9d52bd8e1fd443033dbd9 to your computer and use it in GitHub Desktop.
Save mayneyao/50a9a7b874c9d52bd8e1fd443033dbd9 to your computer and use it in GitHub Desktop.
eidos python sdk (wip)
import uuid
import requests
BASE_URL = "http://localhost:3333"
class DataSpace:
def __init__(self, space_id):
self.space_id = space_id
def table(self, table_id):
return Table(table_id, self)
class Table:
def __init__(self, table_id, space):
self.space_id = space.space_id
self.table_id = table_id
self.raw_table_name = f"tb_{table_id}"
self.field_id_name_map, self.field_name_id_map = self.get_table_field_map()
def get_table_field_map(self):
field_resp = requests.post(
f"{BASE_URL}/rpc",
json={
"space": self.space_id,
"method": "listUiColumns",
"params": [self.raw_table_name],
},
timeout=3,
)
field_res = field_resp.json()
fields = field_res["data"]["result"]
field_id_name_map = {}
field_name_id_map = {}
for field in fields:
field_id_name_map[field["table_column_name"]] = field["name"]
field_name_id_map[field["name"]] = field["table_column_name"]
return field_id_name_map, field_name_id_map
def transform_row(self, row):
new_row = {}
for key, value in row.items():
if key == "_id":
new_row[key] = value
else:
field_name = self.field_id_name_map.get(key, None)
if field_name:
new_row[field_name] = value
return new_row
def rows(self):
r = requests.post(
f"{BASE_URL}/rpc",
json={
"space": self.space_id,
"method": "sql4mainThread",
"params": [f"select * from {self.raw_table_name}", [], "object"],
},
)
res = r.json()
return [self.transform_row(row) for row in res["data"]["result"]]
def transform_data(self, data):
new_data = {}
for key, value in data.items():
if key == "_id":
new_data[key] = value
else:
field_id = self.field_name_id_map.get(key, None)
if field_id:
new_data[self.field_name_id_map[key]] = value
return new_data
def bulk_add(self, datas):
datas = [self.transform_data(data) for data in datas]
datas = [{"_id": str(uuid.uuid4()), **data} for data in datas]
sql = f"insert into {self.raw_table_name} ({','.join(datas[0].keys())}) values ({','.join(['?' for _ in datas[0].keys()])});"
sql *= len(datas)
json = {
"space": self.space_id,
"method": "sql4mainThread",
"params": [
sql,
[value for data in datas for value in data.values()],
"object",
],
}
print(json)
requests.post(f"{BASE_URL}/rpc", json=json)
def add(self, data):
data = self.transform_data(data)
data = {"_id": str(uuid.uuid4()), **data}
json = {
"space": self.space_id,
"method": "sql4mainThread",
"params": [
f"insert into {self.raw_table_name} ({','.join(data.keys())}) values ({','.join(['?' for _ in data.keys()])})",
list(data.values()),
"object",
],
}
requests.post(f"{BASE_URL}/rpc", json=json, timeout=1)
def update(self, row_id, data):
update_data = self.transform_data(data)
json = {
"space": self.space_id,
"method": "sql4mainThread",
"params": [
f"update {self.raw_table_name} set {','.join([f'{key} = ?' for key in update_data.keys()])} where _id = ?",
list(update_data.values()) + [row_id],
"object",
],
}
requests.post(f"{BASE_URL}/rpc", json=json)
if __name__ == "__main__":
# example beblow
# spaceName/tableUuid
table = DataSpace("yourSpaceName").table("44d6193623544426873507e10f3cfd7f")
rows = table.rows()
for row in rows:
url = row["profileUrl"]
print(url)
if url:
is_profile = url.split("/")[-2] == "profile"
table.update(
row["_id"],
{
"username": url.split("/")[-1] if is_profile else None,
},
)
# add row
table.add({"fieldName": "fieldValue"})
# bulk add
table.bulk_add([{"fieldName": "fieldValue"}])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment