Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save stephenleo/e3f985954dde430f9595dd57fb9b201e to your computer and use it in GitHub Desktop.
Save stephenleo/e3f985954dde430f9595dd57fb9b201e to your computer and use it in GitHub Desktop.
[Medium] Approximate Nearest Neighbors on Elastic Search with Docker

Approximate Nearest Neighbors on Elastic Search with Docker

All the code snippets for the Medium post Link

import pandas as pd
df = pd.read_pickle("df.pkl")
embedding_col = "emb"
print(df.shape)
df.head()
# Imports
from elasticsearch import Elasticsearch
# ES constants
index_name = "amazon-500k"
es = Elasticsearch(["<ip_address_of_any_node>:9200"],
http_auth=(<username>, <password>),
verify_certs=False)
# ES settings
body = {
"settings": {
"index": {
"knn": True,
"knn.space_type": "l2",
"knn.algo_param.ef_construction": 100,
"knn.algo_param.m": 16
},
"number_of_shards": 10,
"number_of_replicas": 0,
"refresh_interval": -1,
"translog.flush_threshold_size": "10gb"
}
}
mapping = {
"properties": {
embedding_col: {
"type": "knn_vector",
"dimension": len(df.loc[0,embedding_col])
}
}
}
# Create the Index
es.indices.create(index_name, body=body)
es.indices.put_mapping(mapping, index_name)
# Imports
from elasticsearch.helpers import bulk
from tqdm import tqdm
# Data Generator
def gen():
for row in tqdm(df.itertuples(), total=len(df)):
output_yield = {
"_op_type": "index",
"_index": index_name
}
output_yield.update(row._asdict())
output_yield.update({
embedding_col: output_yield[embedding_col].tolist()
})
yield output_yield
# Upload data to ES in bulk
_, errors = bulk(es, gen(), chunk_size=500, max_retries=2)
assert len(errors) == 0, errors
# Refresh the data
es.indices.refresh(index_name, request_timeout=1000)
# Warmup API
res = requests.get("<ip_address>:9200/_opendistro/_knn/warmup/"+index_name+"?pretty", auth=(<username>, <password>), verify=False)
print(res.text)
# Query parameters
query_df = <dataframe of items to query, same schema as df>
K = 5 # Number of neighbors
step = 200 # Number of items to query at a time
cols_to_query = ["Index", "title"]
# Update the search settings
body = {
"settings": {
"index": {"knn.algo_param.ef_search": 100}
}
}
es.indices.put_settings(body=body, index=index_name)
# Run the Query
responses = []
for n in tqdm(range(0, len(query_df), step)):
subset_df = query_df.iloc[n:n+step,:]
request = []
for row in subset_df.itertuples():
req_head = {"index": index_name}
req_body = {
"query": {
"knn": {
embedding_col: {
"vector": getattr(row, embedding_col).tolist(),
"k": K
}
}
},
"size": K,
"_source": cols_to_query
}
request.extend([req_head, req_body])
r = es.msearch(body=request)
responses.extend(r['responses'])
# Convert the responses to dataframe columns
nn_data = {f'es_neighbors_{key}': [] for key in cols_to_query}
for item in tqdm(responses):
nn = pd.concat(map(pd.DataFrame.from_dict, item['hits']['hits']), axis=1)['_source'].T.reset_index(drop=True)
for key in cols_to_query:
nn_data[f'es_neighbors_{key}'].append(nn[key].values.tolist())
query_df = query_df.assign(**nn_data)
query_df.head()
query_df["es_neighbors_title"].iloc[0]
es.indices.delete(index=index_name)
#!/bin/bash
# To check status of startup script: grep Step /var/log/syslog
echo 'Step 1, Install Docker'
# Install Docker
# Instructions from here: https://docs.docker.com/engine/install/debian/
sudo apt-get update
sudo apt-get install -y \
apt-transport-https \
ca-certificates \
curl \
gnupg-agent \
software-properties-common
curl -fsSL https://download.docker.com/linux/debian/gpg | sudo apt-key add -
sudo add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/debian \
$(lsb_release -cs) \
stable"
sudo apt-get update
sudo apt-get install -y docker-ce docker-ce-cli containerd.io
echo 'Step 2, Install Docker Compose'
# Install Docker Compose
# Instructions from here: https://docs.docker.com/compose/install/
sudo curl -L "https://github.com/docker/compose/releases/download/1.27.4/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
echo 'Step 3 Fix missing packages'
sudo dpkg --configure -a && sudo apt update --fix-missing -y && sudo apt install -f -y && sudo apt full-upgrade -y
echo 'Step 4, Pull OpenDistro'
# Pull the OpenDistro Docker container
# Instructions from here: https://hub.docker.com/r/amazon/opendistro-for-elasticsearch
sudo docker pull amazon/opendistro-for-elasticsearch
echo 'Step 5, Small Updates'
# Update the Virtual Memory as per https://github.com/opendistro-for-elasticsearch/opendistro-build/issues/329
sudo sysctl -w vm.max_map_count=262144
# Install tmux if needed
sudo apt-get install tmux -y
echo 'Step 6, Start the ES service in a tmux window'
tmux new-session -d -s es 'sudo docker-compose -f /root/docker-compose.yml up'
#!/bin/bash
# Update these fields based on your requirements
name_prefix=leo-od-test
num_nodes=3
Xms=35g
Xmx=35g
# Fixed
cluster_name=odfe-cluster01
initial_master=odfe-node01
zone=us-west1-b
# remove IP Address file
rm ip_list.txt
# Create Cluster
for ((i=1; i<=$num_nodes; i++))
do
suffix=$(printf "%02d" $i)
instance_name=$name_prefix-es$suffix
# Create an Instance
gcloud compute instances create $instance_name \
--image-family debian-10 \
--image-project debian-cloud \
--network-interface no-address \
--zone $zone \
--machine-type n1-highmem-8 \
--boot-disk-size 200GB \
--metadata proxy-mode=service_account
# Find the IP Address of the instance
internal_ip=$(gcloud compute instances describe $instance_name --format='get(networkInterfaces[0].networkIP)' --zone=$zone)
# Write IP Addresses to file
echo " - $internal_ip" >> ip_list.txt
done
# Wait for the security protcols to startup
sleep 60
# Create Docker related files and start ES
seed_hosts=$(cat ip_list.txt)
for ((i=1; i<=$num_nodes; i++))
do
suffix=$(printf "%02d" $i)
instance_name=$name_prefix-es$suffix
node_name=odfe-node$suffix
internal_ip=$(gcloud compute instances describe $instance_name --format='get(networkInterfaces[0].networkIP)' --zone=$zone)
# Create docker-compose.yml
echo '# Instructions from
# https://opendistro.github.io/for-elasticsearch-docs/docs/install/docker/
# https://www.elastic.co/guide/en/elastic-stack-get-started/current/get-started-docker.html
version: "3"
services:
'$node_name':
image: amazon/opendistro-for-elasticsearch:1.11.0
container_name: odfe-node
environment:
- cluster.name='$cluster_name'
- bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
- "ES_JAVA_OPTS=-Xms'$Xms' -Xmx'$Xmx'" # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536 # maximum number of open files for the Elasticsearch user, set to at least 65536 on modern systems
hard: 65536
volumes:
- odfe-data:/usr/share/elasticsearch/data
- ./elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- 9200:9200
- 9300:9300 # required for adding more nodes
- 9600:9600 # required for Performance Analyzer
networks:
- odfe-net
volumes:
odfe-data:
driver: local
networks:
odfe-net:
driver: bridge' > docker-compose.yml
# Copy the docker-compose to the VM
sudo gcloud compute scp docker-compose.yml $instance_name:/root/ --zone=$zone --tunnel-through-iap
# Create elasticsearch.yml
echo 'cluster.name: '$cluster_name'
node.name: '$node_name'
node.roles: [master, data]
opendistro_security.disabled: true
http.host: 0.0.0.0
transport.host: 0.0.0.0
transport.publish_host: '$internal_ip'
http.publish_host: '$internal_ip'
http.port: 9200
transport.tcp.port: 9300
network.host: [127.0.0.1, '$internal_ip']
cluster.initial_master_nodes:
- '$initial_master'
discovery.seed_hosts:
'"$seed_hosts"'
path:
data: /usr/share/elasticsearch/data
' > elasticsearch.yml
# Copy the elasticsearch.yml to the VM
sudo gcloud compute scp elasticsearch.yml $instance_name:/root/ --zone=$zone --tunnel-through-iap
# Copy the startup script to the VM
sudo gcloud compute scp es_startup.sh $instance_name:/root/ --zone=$zone --tunnel-through-iap
# Run the startup script
sudo gcloud compute ssh $instance_name --zone=$zone --tunnel-through-iap -- 'bash /root/es_startup.sh'
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment