Created
May 20, 2023 20:49
-
-
Save zoharbabin/07febcfe52b64116c9e3ba1a392b59a0 to your computer and use it in GitHub Desktop.
An example use of the Kaltura eSearch Reader for LlamaIndex showing searching for list of Kaltura video entries, and querying ChatGPT against them
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sys | |
from llama_index import ( | |
download_loader, | |
GPTVectorStoreIndex, | |
LLMPredictor, | |
ServiceContext, | |
PromptHelper, | |
load_index_from_storage, | |
StorageContext | |
) | |
from llama_index.indices.knowledge_graph import GPTKnowledgeGraphIndex | |
from llama_index.logger import LlamaLogger | |
from langchain.chat_models import ChatOpenAI | |
from KalturaClient.Plugins.Core import KalturaMediaType | |
from KalturaClient.Plugins.ElasticSearch import ( | |
KalturaESearchSortOrder, KalturaESearchEntryOrderByFieldName, | |
KalturaESearchOrderBy, KalturaESearchEntryOrderByItem, KalturaESearchCaptionItem, | |
KalturaESearchEntryItem, KalturaESearchEntryFieldName, KalturaESearchCaptionFieldName, | |
KalturaESearchEntryParams, KalturaESearchCategoryEntryItem, KalturaESearchEntryOperator, | |
KalturaESearchOperatorType, KalturaESearchItemType, KalturaCategoryEntryStatus, KalturaESearchCategoryEntryFieldName | |
) | |
from llama_index.output_parsers import LangchainOutputParser | |
from llama_index.llm_predictor import StructuredLLMPredictor | |
from langchain.output_parsers import StructuredOutputParser, ResponseSchema | |
from llama_index.prompts.prompts import QuestionAnswerPrompt, RefinePrompt | |
from llama_index.prompts.default_prompts import DEFAULT_TEXT_QA_PROMPT_TMPL, DEFAULT_REFINE_PROMPT_TMPL | |
from pyvis.network import Network | |
pid = 000000 # <-- replace this with the Partner ID from KMC Integration Settings view (this is your Kaltura account ID) | |
admin_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # <-- replace this with the Admin Secret from KMC Integration Settings view (aka API key) | |
user_id = "llamareader-tester" # <-- all Kaltura API actions will be associated with this user ID | |
category_name_filter = "agenda_category" # <-- replace this to your category name | |
max_entries = 5 | |
max_input_size = 4096 # set maximum input size | |
num_output = 2000 # set number of output tokens | |
max_chunk_overlap = 100 # set maximum chunk overlap | |
openai_model_name = 'gpt-3.5-turbo' # set the openAI model name | |
chunk_size_limit = 600 | |
top_k = 5 # how many top results should openAI process? | |
streaming_enabled = False # should stream results? | |
# Set up logging | |
logging.basicConfig(stream=sys.stdout, level=logging.WARNING) | |
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout)) | |
# Initialize the ChatGPT model configurations | |
chatgptllm = ChatOpenAI(temperature=0, | |
model_name=openai_model_name, | |
max_tokens=num_output, | |
streaming=streaming_enabled) # type: ignore | |
llm_predictor = LLMPredictor(llm=chatgptllm) | |
prompt_helper = PromptHelper(max_input_size, num_output, max_chunk_overlap, chunk_size_limit=chunk_size_limit) | |
llama_logger = LlamaLogger() | |
service_context = ServiceContext.from_defaults( | |
llm_predictor=llm_predictor, | |
prompt_helper=prompt_helper, | |
llama_logger=llama_logger, | |
) | |
# Try to load the index from local storage. If it fails, rebuild the index from scratch. | |
try: | |
# Check if we have a local storage index, if yes, let's load it instead of building the index | |
storage_context = StorageContext.from_defaults(persist_dir='./storage') | |
index = load_index_from_storage(storage_context, index_id="kaltura_virtuallylive_vector_index", service_context=service_context) | |
#kg_index = load_index_from_storage(storage_context, index_id="kaltura_virtuallylive_knowgraph_index") | |
except Exception as e: | |
# If the local storage index doesn't exist (or there is an issue with it) - let's rebuild the index | |
print(f"Error loading index from storage: {str(e)}") | |
print("Building index from scratch instead...") | |
KalturaESearchReader = download_loader(loader_class="KalturaESearchReader") | |
# To load the Loader from a different repo (instead of llamahub.ai use: | |
# KalturaESearchReader = download_loader(loader_class="KalturaESearchReader", | |
# custom_path="../llama-hub-fork/loader_hub", | |
# loader_hub_url="https://raw.githubusercontent.com/zoharbabin/llama-hub/main/loader_hub/") | |
loader = KalturaESearchReader( | |
partnerId=pid, | |
apiSecret=admin_secret, | |
userId=user_id, | |
) # type: ignore KalturaESearchReader | |
search_params = KalturaESearchEntryParams() | |
# Sort the search results in descending order by entry last updated - | |
search_params.orderBy = KalturaESearchOrderBy() | |
search_params.orderBy.orderItems = [] | |
search_params.orderBy.orderItems.append(KalturaESearchEntryOrderByItem()) | |
search_params.orderBy.orderItems[0].sortField = KalturaESearchEntryOrderByFieldName.UPDATED_AT | |
search_params.orderBy.orderItems[0].sortOrder = KalturaESearchSortOrder.ORDER_BY_DESC | |
# Create an AND relationship between the following search queries - | |
search_params.searchOperator = KalturaESearchEntryOperator() | |
search_params.searchOperator.operator = KalturaESearchOperatorType.AND_OP | |
search_params.searchOperator.searchItems = [] | |
# Find only entries that have captions - | |
caption_item = KalturaESearchCaptionItem() | |
caption_item.fieldName = KalturaESearchCaptionFieldName.CONTENT | |
caption_item.itemType = KalturaESearchItemType.EXISTS | |
search_params.searchOperator.searchItems.append(caption_item) | |
# Find only entries that are inside an exact category name - | |
category_item = KalturaESearchCategoryEntryItem() | |
category_item.categoryEntryStatus = KalturaCategoryEntryStatus.ACTIVE | |
category_item.fieldName = KalturaESearchCategoryEntryFieldName.NAME | |
category_item.addHighlight = False | |
category_item.itemType = KalturaESearchItemType.EXACT_MATCH | |
category_item.searchTerm = category_name_filter | |
search_params.searchOperator.searchItems.append(category_item) | |
# Find only video entries (KalturaMediaType.VIDEO) - | |
entry_item = KalturaESearchEntryItem() | |
entry_item.fieldName = KalturaESearchEntryFieldName.MEDIA_TYPE | |
entry_item.addHighlight = False | |
entry_item.itemType = KalturaESearchItemType.EXACT_MATCH | |
entry_item.searchTerm = KalturaMediaType.VIDEO | |
search_params.searchOperator.searchItems.append(entry_item) | |
# Get the top Kaltura entries (per max_entries) and include Captions in the result - | |
entry_docs = loader.load_data(search_params, True, max_entries) | |
# Build the GPT Vector Index | |
index = GPTVectorStoreIndex.from_documents( | |
entry_docs, | |
service_context=service_context | |
) | |
# Build the Knowledge Graph Index | |
#kg_index = GPTKnowledgeGraphIndex.from_documents( | |
# entry_docs, | |
# max_triplets_per_chunk=2, | |
# service_context=service_context | |
#) | |
# Store the GPT Vector Index locally | |
index.set_index_id("kaltura_virtuallylive_vector_index") | |
index.storage_context.persist('./storage') | |
# Store the Knowledge Graph Index locally | |
#kg_index.set_index_id("kaltura_virtuallylive_knowgraph_index") | |
#kg_index.storage_context.persist('./storage') | |
#g = kg_index.get_networkx_graph() | |
#net = Network(notebook=False, cdn_resources="in_line", directed=True, height="1200px") | |
#net.from_nx(g) | |
#html = net.generate_html() | |
#with open("videosgraph.html", mode='w', encoding='utf-8') as fp: | |
# fp.write(html) | |
# Set up response schemas for parsing the output | |
response_schemas = [ | |
ResponseSchema(name="Top10", description="bullet list of the top 10 key lessons to learn from these videos"), | |
ResponseSchema(name="Speakers", description="who are the primary speakers in these videos?"), | |
ResponseSchema(name="Keywords", description="What are the primary semantic topics there are discussed in these videos?"), | |
ResponseSchema(name="References", description="Which entry_ids were most relevant to the answer?"), | |
ResponseSchema(name="TimedReferences", description='In what segments of these entry_ids can I find the most relevant references? formatted as json array of: {entryId: ID, segments: [{start: 126, end: 8768}]}'), | |
] | |
lc_output_parser = StructuredOutputParser.from_response_schemas(response_schemas) | |
output_parser = LangchainOutputParser(lc_output_parser) | |
fmt_qa_tmpl = output_parser.format(DEFAULT_TEXT_QA_PROMPT_TMPL) | |
fmt_refine_tmpl = output_parser.format(DEFAULT_REFINE_PROMPT_TMPL) | |
qa_prompt = QuestionAnswerPrompt(fmt_qa_tmpl, output_parser=output_parser) | |
refine_prompt = RefinePrompt(fmt_refine_tmpl, output_parser=output_parser) | |
structured_llm_predictor = StructuredLLMPredictor() | |
query_engine = index.as_query_engine( | |
text_qa_template=qa_prompt, | |
refine_template=refine_prompt, | |
llm_predictor=structured_llm_predictor, | |
similarity_top_k=top_k, | |
) | |
response = query_engine.query( | |
"What are the event hacks for education?", | |
) | |
#print("\nChatGPT's response:") | |
print(response) | |
#print("\nFormatted Sources:") | |
#print(response.get_formatted_sources()) | |
#print("\nLogs:") | |
#print(service_context.llama_logger.get_logs()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment