Skip to content

Instantly share code, notes, and snippets.

@mhugo
Created June 6, 2022 08:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mhugo/b65a887613cd1238308673810b6dbe92 to your computer and use it in GitHub Desktop.
Save mhugo/b65a887613cd1238308673810b6dbe92 to your computer and use it in GitHub Desktop.
import json
import requests
ES = "http://localhost:9200"
INDEX = "test_shard_doc"
class ESClient:
def __init__(self, host: str, index_name: str):
self._host = host
self._index_name = index_name
def delete(self, path: str, *args, **kwargs):
return requests.delete(
"{}/{}".format(self._host, self._index_name) + path, *args, **kwargs
)
def get(self, path: str, *args, **kwargs):
return requests.get(
"{}/{}".format(self._host, self._index_name) + path, *args, **kwargs
)
def post(self, path: str, *args, **kwargs):
return requests.post(
"{}/{}".format(self._host, self._index_name) + path, *args, **kwargs
)
def put(self, path: str, *args, **kwargs):
return requests.put(
"{}/{}".format(self._host, self._index_name) + path, *args, **kwargs
)
def delete_index(self):
self.delete("")
def create_index(self):
r = self.put(
"",
json={
"settings": {
"index": {
"number_of_replicas": 0,
"soft_deletes": {"enabled": False},
}
}
},
)
assert r.status_code == 200
def create_doc(self, d: dict):
r = self.post(f"/_doc", json=d)
assert r.status_code >= 200
def refresh(self):
# r = self.post(f"/_refresh")
# print("search_segments before forcemerge", self.search_segments())
print("**force merge**")
r = self.post(f"/_forcemerge")
# print("search_segments after forcemerge", self.search_segments())
assert r.status_code == 200
def delete_by_match(self, match: dict):
r = self.post(f"/_delete_by_query", json={"query": {"match": match}})
assert r.status_code == 200
def open_pit(self) -> str:
r = self.post(
f"/_pit?keep_alive=1m",
headers={"Content-Type": "application/json"},
)
assert r.status_code >= 200
return r.json()["id"]
def close_pit(self, pit_id: str):
r = self.delete("/_pit", json={"id": pit_id})
assert r.status_code >= 200
def search_segments(self):
r = self.get("/_segments")
segs = (
seg["num_docs"]
for seg in r.json()["indices"][INDEX]["shards"]["0"][0]["segments"].values()
if seg["search"]
)
return list(sorted(segs, reverse=True))
if __name__ == "__main__":
client = ESClient(ES, INDEX)
client.delete_index()
client.create_index()
N = 100
# dataset_id = 1
print("add docs with dataset=1")
for a in range(N):
client.create_doc({"dataset_id": 1, "a": a})
# dataset_id = 2
print("add docs with dataset=2")
for a in range(N):
client.create_doc({"dataset_id": 2, "a": a})
client.refresh()
def search_after_keys(dataset_id: int):
pit_id = client.open_pit()
r = requests.post(
f"{ES}/_search",
json={
"pit": {"keep_alive": "1m", "id": pit_id},
"query": {"match": {"dataset_id": dataset_id}},
"sort": [{"_shard_doc": "asc"}],
},
)
keys = [h["sort"][0] for h in r.json()["hits"]["hits"]]
client.close_pit(pit_id)
return keys
print("search_after keys for dataset=2", search_after_keys(2))
print("delete all docs with dataset_id=1")
client.delete_by_match({"dataset_id": 1})
client.refresh()
print("search_after keys for dataset=2", search_after_keys(2))
@mhugo
Copy link
Author

mhugo commented Jun 6, 2022

Example output:

add docs with dataset=1
add docs with dataset=2
**force merge**
search_after keys for dataset=2 [100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
delete all docs with dataset_id=1
**force merge**
search_after keys for dataset=2 [15, 16, 17, 18, 19, 20, 21, 22, 23, 24]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment