Skip to content

Instantly share code, notes, and snippets.

@gcsfred
Created February 3, 2020 16:18
Show Gist options
  • Save gcsfred/82093324e623ed582b603cb3d89543ad to your computer and use it in GitHub Desktop.
Save gcsfred/82093324e623ed582b603cb3d89543ad to your computer and use it in GitHub Desktop.
Sample integration between Elasticsearch and Amazon Personalize - entire file
[DEFAULT]
product_ranking_start = 10
product_ranking_steps_down = 0.2
cat_ranking_start = 10
cat_ranking_steps_down = 0.2
product_recommendations_campaignArn=arn:aws:personalize:us-east-2:11123456:campaign/es-test03-hrnn
product_rankings_campaignArn=arn:aws:personalize:us-east-2:222789:campaign/es-test03-rank
property1_recommendations_campaignArn=arn:aws:personalize:us-east-2:3333456:campaign/es-test03-cat-hrnn
property1_rankings_campaignArn=arn:aws:personalize:us-east-2:444789:campaign/es-test03-cat-rank
import boto3
from elasticsearch import Elasticsearch
import sys
import getopt
import configparser
import os
os.environ['AWS_PROFILE'] = "default"
os.environ['AWS_DEFAULT_REGION'] = "us-east-2"
# Call with arguments -u <user> -s <search_text>
def main(argv):
opts, args = getopt.getopt(argv, "u:s:")
user = "_"
search = "_"
for opt, arg in opts:
if opt == '-u':
user = arg
if opt == '-s':
search = arg
search_with_personalization(user, search)
def search_with_personalization(user, search):
config = configparser.ConfigParser()
config.read('config.conf')
categories = get_category_recommendations(config, user)
ranked_categories = get_category_ranking(config, user, categories)
products = get_product_recommendations(config, user)
ranked_products = get_product_ranking(config, user, products)
query_es(search, ranked_categories, ranked_products)
def _log_info(msg):
print(msg)
def get_product_recommendations(config, user):
_log_info('Retrieving product recommendations')
return _get_recommendations(user, config['DEFAULT']['product_recommendations_campaignArn'])
def get_category_recommendations(config, user):
_log_info('Retrieving category (category) recommendations')
return _get_recommendations(user, config['DEFAULT']['category_recommendations_campaignArn'])
def _get_recommendations(user, campaign):
personalize = boto3.client('personalize-runtime', 'us-east-2')
response = personalize.get_recommendations(campaignArn=campaign, userId=user)
answer = []
_log_info('Recommended items:')
for item in response['itemList']:
item_id = item['itemId']
answer.append(item_id)
_log_info('itemId:' + item_id)
return answer
def get_product_ranking(config, user, input_list):
_log_info('Retrieving product ranking')
answer = _get_ranking(config['DEFAULT']['product_rankings_campaignArn'], user, input_list,
float(config['DEFAULT']['product_ranking_start']),
float(config['DEFAULT']['product_ranking_steps_down']))
return answer
def get_category_ranking(config, user, input_list):
_log_info('Retrieving category (category) ranking')
answer = _get_ranking(config['DEFAULT']['category_rankings_campaignArn'], user, input_list,
float(config['DEFAULT']['cat_ranking_start']), int(config['DEFAULT']['cat_ranking_steps_down']))
return answer
def _get_ranking(campaign, user, input_list, start, steps_down):
personalize = boto3.client('personalize-runtime', 'us-east-2')
response = personalize.get_personalized_ranking(campaignArn=campaign, userId=user, inputList=input_list)
answer = {}
i = 0
_log_info('Ranked items:')
for item in response['personalizedRanking']:
k = start - (i*steps_down)
answer[item['itemId']] = k
_log_info('Ranking ' + item['itemId'] + ' to ' + str(k))
i = i+1
return answer
def transform_category_boost(category_boost_pairs):
root = []
for key in category_boost_pairs.keys():
root.append({"match": {
"category": {
"query": key,
"boost": category_boost_pairs[key]
}
}})
return root
def transform_product_id_weights(product_id_weight_pairs):
root = []
for key in product_id_weight_pairs.keys():
root.append(
{
"filter": {
"ids": {
"values": [
key
]
}
},
"weight": product_id_weight_pairs[key]
}
)
return root
def arrange_json_array(a, b):
a.append(b)
return a
def query_es(text_search, category_boost_pairs, product_id_weight_pairs):
client = Elasticsearch()
the_body = {
"query": {
"function_score": {
"query": {
"bool": {
"should": arrange_json_array(
transform_category_boost(category_boost_pairs), {
"match": {
"keywords": text_search
}
})
}
},
"boost": "5",
"functions": transform_product_id_weights(product_id_weight_pairs),
"score_mode": "max",
"boost_mode": "multiply"
}
}
}
response = client.search(
index="bank",
body=the_body
)
if response is None or response['hits'] is None or response['hits']['hits'] is None\
or len(response['hits']['hits']) == 0:
_log_info('No search results.')
else:
_log_info('There are ' + str(len(response['hits']['hits'])) + ' search results.')
i = 1
for hit in response['hits']['hits']:
_log_info('#' + str(i) + ': Score:' + str(hit['_score']) + ', search result:' + str(hit['_source']))
i = i + 1
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment