All the code snippets for the Medium post Link
Last active
February 3, 2022 07:19
-
-
Save stephenleo/e3f985954dde430f9595dd57fb9b201e to your computer and use it in GitHub Desktop.
[Medium] Approximate Nearest Neighbors on Elastic Search with Docker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
df = pd.read_pickle("df.pkl") | |
embedding_col = "emb" | |
print(df.shape) | |
df.head() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Imports | |
from elasticsearch import Elasticsearch | |
# ES constants | |
index_name = "amazon-500k" | |
es = Elasticsearch(["<ip_address_of_any_node>:9200"], | |
http_auth=(<username>, <password>), | |
verify_certs=False) | |
# ES settings | |
body = { | |
"settings": { | |
"index": { | |
"knn": True, | |
"knn.space_type": "l2", | |
"knn.algo_param.ef_construction": 100, | |
"knn.algo_param.m": 16 | |
}, | |
"number_of_shards": 10, | |
"number_of_replicas": 0, | |
"refresh_interval": -1, | |
"translog.flush_threshold_size": "10gb" | |
} | |
} | |
mapping = { | |
"properties": { | |
embedding_col: { | |
"type": "knn_vector", | |
"dimension": len(df.loc[0,embedding_col]) | |
} | |
} | |
} | |
# Create the Index | |
es.indices.create(index_name, body=body) | |
es.indices.put_mapping(mapping, index_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Imports | |
from elasticsearch.helpers import bulk | |
from tqdm import tqdm | |
# Data Generator | |
def gen(): | |
for row in tqdm(df.itertuples(), total=len(df)): | |
output_yield = { | |
"_op_type": "index", | |
"_index": index_name | |
} | |
output_yield.update(row._asdict()) | |
output_yield.update({ | |
embedding_col: output_yield[embedding_col].tolist() | |
}) | |
yield output_yield | |
# Upload data to ES in bulk | |
_, errors = bulk(es, gen(), chunk_size=500, max_retries=2) | |
assert len(errors) == 0, errors | |
# Refresh the data | |
es.indices.refresh(index_name, request_timeout=1000) | |
# Warmup API | |
res = requests.get("<ip_address>:9200/_opendistro/_knn/warmup/"+index_name+"?pretty", auth=(<username>, <password>), verify=False) | |
print(res.text) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Query parameters | |
query_df = <dataframe of items to query, same schema as df> | |
K = 5 # Number of neighbors | |
step = 200 # Number of items to query at a time | |
cols_to_query = ["Index", "title"] | |
# Update the search settings | |
body = { | |
"settings": { | |
"index": {"knn.algo_param.ef_search": 100} | |
} | |
} | |
es.indices.put_settings(body=body, index=index_name) | |
# Run the Query | |
responses = [] | |
for n in tqdm(range(0, len(query_df), step)): | |
subset_df = query_df.iloc[n:n+step,:] | |
request = [] | |
for row in subset_df.itertuples(): | |
req_head = {"index": index_name} | |
req_body = { | |
"query": { | |
"knn": { | |
embedding_col: { | |
"vector": getattr(row, embedding_col).tolist(), | |
"k": K | |
} | |
} | |
}, | |
"size": K, | |
"_source": cols_to_query | |
} | |
request.extend([req_head, req_body]) | |
r = es.msearch(body=request) | |
responses.extend(r['responses']) | |
# Convert the responses to dataframe columns | |
nn_data = {f'es_neighbors_{key}': [] for key in cols_to_query} | |
for item in tqdm(responses): | |
nn = pd.concat(map(pd.DataFrame.from_dict, item['hits']['hits']), axis=1)['_source'].T.reset_index(drop=True) | |
for key in cols_to_query: | |
nn_data[f'es_neighbors_{key}'].append(nn[key].values.tolist()) | |
query_df = query_df.assign(**nn_data) | |
query_df.head() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query_df["es_neighbors_title"].iloc[0] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
es.indices.delete(index=index_name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# To check status of startup script: grep Step /var/log/syslog | |
echo 'Step 1, Install Docker' | |
# Install Docker | |
# Instructions from here: https://docs.docker.com/engine/install/debian/ | |
sudo apt-get update | |
sudo apt-get install -y \ | |
apt-transport-https \ | |
ca-certificates \ | |
curl \ | |
gnupg-agent \ | |
software-properties-common | |
curl -fsSL https://download.docker.com/linux/debian/gpg | sudo apt-key add - | |
sudo add-apt-repository \ | |
"deb [arch=amd64] https://download.docker.com/linux/debian \ | |
$(lsb_release -cs) \ | |
stable" | |
sudo apt-get update | |
sudo apt-get install -y docker-ce docker-ce-cli containerd.io | |
echo 'Step 2, Install Docker Compose' | |
# Install Docker Compose | |
# Instructions from here: https://docs.docker.com/compose/install/ | |
sudo curl -L "https://github.com/docker/compose/releases/download/1.27.4/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose | |
sudo chmod +x /usr/local/bin/docker-compose | |
echo 'Step 3 Fix missing packages' | |
sudo dpkg --configure -a && sudo apt update --fix-missing -y && sudo apt install -f -y && sudo apt full-upgrade -y | |
echo 'Step 4, Pull OpenDistro' | |
# Pull the OpenDistro Docker container | |
# Instructions from here: https://hub.docker.com/r/amazon/opendistro-for-elasticsearch | |
sudo docker pull amazon/opendistro-for-elasticsearch | |
echo 'Step 5, Small Updates' | |
# Update the Virtual Memory as per https://github.com/opendistro-for-elasticsearch/opendistro-build/issues/329 | |
sudo sysctl -w vm.max_map_count=262144 | |
# Install tmux if needed | |
sudo apt-get install tmux -y | |
echo 'Step 6, Start the ES service in a tmux window' | |
tmux new-session -d -s es 'sudo docker-compose -f /root/docker-compose.yml up' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# Update these fields based on your requirements | |
name_prefix=leo-od-test | |
num_nodes=3 | |
Xms=35g | |
Xmx=35g | |
# Fixed | |
cluster_name=odfe-cluster01 | |
initial_master=odfe-node01 | |
zone=us-west1-b | |
# remove IP Address file | |
rm ip_list.txt | |
# Create Cluster | |
for ((i=1; i<=$num_nodes; i++)) | |
do | |
suffix=$(printf "%02d" $i) | |
instance_name=$name_prefix-es$suffix | |
# Create an Instance | |
gcloud compute instances create $instance_name \ | |
--image-family debian-10 \ | |
--image-project debian-cloud \ | |
--network-interface no-address \ | |
--zone $zone \ | |
--machine-type n1-highmem-8 \ | |
--boot-disk-size 200GB \ | |
--metadata proxy-mode=service_account | |
# Find the IP Address of the instance | |
internal_ip=$(gcloud compute instances describe $instance_name --format='get(networkInterfaces[0].networkIP)' --zone=$zone) | |
# Write IP Addresses to file | |
echo " - $internal_ip" >> ip_list.txt | |
done | |
# Wait for the security protcols to startup | |
sleep 60 | |
# Create Docker related files and start ES | |
seed_hosts=$(cat ip_list.txt) | |
for ((i=1; i<=$num_nodes; i++)) | |
do | |
suffix=$(printf "%02d" $i) | |
instance_name=$name_prefix-es$suffix | |
node_name=odfe-node$suffix | |
internal_ip=$(gcloud compute instances describe $instance_name --format='get(networkInterfaces[0].networkIP)' --zone=$zone) | |
# Create docker-compose.yml | |
echo '# Instructions from | |
# https://opendistro.github.io/for-elasticsearch-docs/docs/install/docker/ | |
# https://www.elastic.co/guide/en/elastic-stack-get-started/current/get-started-docker.html | |
version: "3" | |
services: | |
'$node_name': | |
image: amazon/opendistro-for-elasticsearch:1.11.0 | |
container_name: odfe-node | |
environment: | |
- cluster.name='$cluster_name' | |
- bootstrap.memory_lock=true # along with the memlock settings below, disables swapping | |
- "ES_JAVA_OPTS=-Xms'$Xms' -Xmx'$Xmx'" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM | |
ulimits: | |
memlock: | |
soft: -1 | |
hard: -1 | |
nofile: | |
soft: 65536 # maximum number of open files for the Elasticsearch user, set to at least 65536 on modern systems | |
hard: 65536 | |
volumes: | |
- odfe-data:/usr/share/elasticsearch/data | |
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml | |
ports: | |
- 9200:9200 | |
- 9300:9300 # required for adding more nodes | |
- 9600:9600 # required for Performance Analyzer | |
networks: | |
- odfe-net | |
volumes: | |
odfe-data: | |
driver: local | |
networks: | |
odfe-net: | |
driver: bridge' > docker-compose.yml | |
# Copy the docker-compose to the VM | |
sudo gcloud compute scp docker-compose.yml $instance_name:/root/ --zone=$zone --tunnel-through-iap | |
# Create elasticsearch.yml | |
echo 'cluster.name: '$cluster_name' | |
node.name: '$node_name' | |
node.roles: [master, data] | |
opendistro_security.disabled: true | |
http.host: 0.0.0.0 | |
transport.host: 0.0.0.0 | |
transport.publish_host: '$internal_ip' | |
http.publish_host: '$internal_ip' | |
http.port: 9200 | |
transport.tcp.port: 9300 | |
network.host: [127.0.0.1, '$internal_ip'] | |
cluster.initial_master_nodes: | |
- '$initial_master' | |
discovery.seed_hosts: | |
'"$seed_hosts"' | |
path: | |
data: /usr/share/elasticsearch/data | |
' > elasticsearch.yml | |
# Copy the elasticsearch.yml to the VM | |
sudo gcloud compute scp elasticsearch.yml $instance_name:/root/ --zone=$zone --tunnel-through-iap | |
# Copy the startup script to the VM | |
sudo gcloud compute scp es_startup.sh $instance_name:/root/ --zone=$zone --tunnel-through-iap | |
# Run the startup script | |
sudo gcloud compute ssh $instance_name --zone=$zone --tunnel-through-iap -- 'bash /root/es_startup.sh' | |
done |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment