Created
June 6, 2022 08:40
-
-
Save mhugo/b65a887613cd1238308673810b6dbe92 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
ES = "http://localhost:9200" | |
INDEX = "test_shard_doc" | |
class ESClient: | |
def __init__(self, host: str, index_name: str): | |
self._host = host | |
self._index_name = index_name | |
def delete(self, path: str, *args, **kwargs): | |
return requests.delete( | |
"{}/{}".format(self._host, self._index_name) + path, *args, **kwargs | |
) | |
def get(self, path: str, *args, **kwargs): | |
return requests.get( | |
"{}/{}".format(self._host, self._index_name) + path, *args, **kwargs | |
) | |
def post(self, path: str, *args, **kwargs): | |
return requests.post( | |
"{}/{}".format(self._host, self._index_name) + path, *args, **kwargs | |
) | |
def put(self, path: str, *args, **kwargs): | |
return requests.put( | |
"{}/{}".format(self._host, self._index_name) + path, *args, **kwargs | |
) | |
def delete_index(self): | |
self.delete("") | |
def create_index(self): | |
r = self.put( | |
"", | |
json={ | |
"settings": { | |
"index": { | |
"number_of_replicas": 0, | |
"soft_deletes": {"enabled": False}, | |
} | |
} | |
}, | |
) | |
assert r.status_code == 200 | |
def create_doc(self, d: dict): | |
r = self.post(f"/_doc", json=d) | |
assert r.status_code >= 200 | |
def refresh(self): | |
# r = self.post(f"/_refresh") | |
# print("search_segments before forcemerge", self.search_segments()) | |
print("**force merge**") | |
r = self.post(f"/_forcemerge") | |
# print("search_segments after forcemerge", self.search_segments()) | |
assert r.status_code == 200 | |
def delete_by_match(self, match: dict): | |
r = self.post(f"/_delete_by_query", json={"query": {"match": match}}) | |
assert r.status_code == 200 | |
def open_pit(self) -> str: | |
r = self.post( | |
f"/_pit?keep_alive=1m", | |
headers={"Content-Type": "application/json"}, | |
) | |
assert r.status_code >= 200 | |
return r.json()["id"] | |
def close_pit(self, pit_id: str): | |
r = self.delete("/_pit", json={"id": pit_id}) | |
assert r.status_code >= 200 | |
def search_segments(self): | |
r = self.get("/_segments") | |
segs = ( | |
seg["num_docs"] | |
for seg in r.json()["indices"][INDEX]["shards"]["0"][0]["segments"].values() | |
if seg["search"] | |
) | |
return list(sorted(segs, reverse=True)) | |
if __name__ == "__main__": | |
client = ESClient(ES, INDEX) | |
client.delete_index() | |
client.create_index() | |
N = 100 | |
# dataset_id = 1 | |
print("add docs with dataset=1") | |
for a in range(N): | |
client.create_doc({"dataset_id": 1, "a": a}) | |
# dataset_id = 2 | |
print("add docs with dataset=2") | |
for a in range(N): | |
client.create_doc({"dataset_id": 2, "a": a}) | |
client.refresh() | |
def search_after_keys(dataset_id: int): | |
pit_id = client.open_pit() | |
r = requests.post( | |
f"{ES}/_search", | |
json={ | |
"pit": {"keep_alive": "1m", "id": pit_id}, | |
"query": {"match": {"dataset_id": dataset_id}}, | |
"sort": [{"_shard_doc": "asc"}], | |
}, | |
) | |
keys = [h["sort"][0] for h in r.json()["hits"]["hits"]] | |
client.close_pit(pit_id) | |
return keys | |
print("search_after keys for dataset=2", search_after_keys(2)) | |
print("delete all docs with dataset_id=1") | |
client.delete_by_match({"dataset_id": 1}) | |
client.refresh() | |
print("search_after keys for dataset=2", search_after_keys(2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example output: