Created
September 18, 2023 20:02
-
-
Save mayneyao/50a9a7b874c9d52bd8e1fd443033dbd9 to your computer and use it in GitHub Desktop.
eidos python sdk (wip)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
import requests | |
BASE_URL = "http://localhost:3333" | |
class DataSpace: | |
def __init__(self, space_id): | |
self.space_id = space_id | |
def table(self, table_id): | |
return Table(table_id, self) | |
class Table: | |
def __init__(self, table_id, space): | |
self.space_id = space.space_id | |
self.table_id = table_id | |
self.raw_table_name = f"tb_{table_id}" | |
self.field_id_name_map, self.field_name_id_map = self.get_table_field_map() | |
def get_table_field_map(self): | |
field_resp = requests.post( | |
f"{BASE_URL}/rpc", | |
json={ | |
"space": self.space_id, | |
"method": "listUiColumns", | |
"params": [self.raw_table_name], | |
}, | |
timeout=3, | |
) | |
field_res = field_resp.json() | |
fields = field_res["data"]["result"] | |
field_id_name_map = {} | |
field_name_id_map = {} | |
for field in fields: | |
field_id_name_map[field["table_column_name"]] = field["name"] | |
field_name_id_map[field["name"]] = field["table_column_name"] | |
return field_id_name_map, field_name_id_map | |
def transform_row(self, row): | |
new_row = {} | |
for key, value in row.items(): | |
if key == "_id": | |
new_row[key] = value | |
else: | |
field_name = self.field_id_name_map.get(key, None) | |
if field_name: | |
new_row[field_name] = value | |
return new_row | |
def rows(self): | |
r = requests.post( | |
f"{BASE_URL}/rpc", | |
json={ | |
"space": self.space_id, | |
"method": "sql4mainThread", | |
"params": [f"select * from {self.raw_table_name}", [], "object"], | |
}, | |
) | |
res = r.json() | |
return [self.transform_row(row) for row in res["data"]["result"]] | |
def transform_data(self, data): | |
new_data = {} | |
for key, value in data.items(): | |
if key == "_id": | |
new_data[key] = value | |
else: | |
field_id = self.field_name_id_map.get(key, None) | |
if field_id: | |
new_data[self.field_name_id_map[key]] = value | |
return new_data | |
def bulk_add(self, datas): | |
datas = [self.transform_data(data) for data in datas] | |
datas = [{"_id": str(uuid.uuid4()), **data} for data in datas] | |
sql = f"insert into {self.raw_table_name} ({','.join(datas[0].keys())}) values ({','.join(['?' for _ in datas[0].keys()])});" | |
sql *= len(datas) | |
json = { | |
"space": self.space_id, | |
"method": "sql4mainThread", | |
"params": [ | |
sql, | |
[value for data in datas for value in data.values()], | |
"object", | |
], | |
} | |
print(json) | |
requests.post(f"{BASE_URL}/rpc", json=json) | |
def add(self, data): | |
data = self.transform_data(data) | |
data = {"_id": str(uuid.uuid4()), **data} | |
json = { | |
"space": self.space_id, | |
"method": "sql4mainThread", | |
"params": [ | |
f"insert into {self.raw_table_name} ({','.join(data.keys())}) values ({','.join(['?' for _ in data.keys()])})", | |
list(data.values()), | |
"object", | |
], | |
} | |
requests.post(f"{BASE_URL}/rpc", json=json, timeout=1) | |
def update(self, row_id, data): | |
update_data = self.transform_data(data) | |
json = { | |
"space": self.space_id, | |
"method": "sql4mainThread", | |
"params": [ | |
f"update {self.raw_table_name} set {','.join([f'{key} = ?' for key in update_data.keys()])} where _id = ?", | |
list(update_data.values()) + [row_id], | |
"object", | |
], | |
} | |
requests.post(f"{BASE_URL}/rpc", json=json) | |
if __name__ == "__main__": | |
# example beblow | |
# spaceName/tableUuid | |
table = DataSpace("yourSpaceName").table("44d6193623544426873507e10f3cfd7f") | |
rows = table.rows() | |
for row in rows: | |
url = row["profileUrl"] | |
print(url) | |
if url: | |
is_profile = url.split("/")[-2] == "profile" | |
table.update( | |
row["_id"], | |
{ | |
"username": url.split("/")[-1] if is_profile else None, | |
}, | |
) | |
# add row | |
table.add({"fieldName": "fieldValue"}) | |
# bulk add | |
table.bulk_add([{"fieldName": "fieldValue"}]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment