Created
October 27, 2023 01:26
-
-
Save rkreddyp/f92701e5fc3ab3624087c9593ab9f377 to your computer and use it in GitHub Desktop.
pineconeall
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import pandas as pd | |
from functools import wraps | |
from typing import Any, Callable | |
from pydantic import validate_arguments, BaseModel, validate_call, model_validator, create_model, TypeAdapter | |
import requests, time, pinecone | |
from bs4 import BeautifulSoup | |
from langchain.indexes import VectorstoreIndexCreator | |
from langchain.document_loaders import UnstructuredURLLoader | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.vectorstores import Pinecone | |
from langchain import OpenAI | |
from langchain.chains import RetrievalQAWithSourcesChain | |
import os, tempfile, openai | |
from langchain.llms.openai import OpenAI | |
from langchain.vectorstores.pinecone import Pinecone | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.chains.summarize import load_summarize_chain | |
from langchain.document_loaders import PyPDFLoader | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.vectorstores import ElasticVectorSearch, Pinecone, Weaviate, FAISS | |
from langchain.chains.question_answering import load_qa_chain | |
import pinecone | |
import os,pypdf | |
from langchain.document_loaders import UnstructuredPDFLoader, OnlinePDFLoader, PyPDFLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
os.environ["OPENAI_API_KEY"] = 'xxx' | |
PINECONE_API_KEY = "xxx" | |
PINECONE_ENV='us-west4-gcp-free' | |
pinecone.init( | |
api_key=PINECONE_API_KEY, | |
environment=PINECONE_ENV | |
) | |
def get_all_above_score (items, score): | |
above_score_items = ' '.join([item["metadata"]['text'] for item in items if item["score"] > score]) | |
return above_score_items | |
def get_all_items (items): | |
all_items = ' '.join([item["metadata"]['text'] for item in items ]) | |
return all_items | |
def get_highest_score_url(items): | |
highest_score_item = max(items, key=lambda item: item["score"]) | |
print (highest_score_item) | |
if highest_score_item["score"] > 0.7: | |
return highest_score_item["metadata"]['text'] | |
else: | |
return "" | |
def chat_complete_simple (question: str) : | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo-16k", | |
messages=[ | |
{"role": "user", "content": question} | |
] | |
) | |
return response | |
def chat_complete_and_run (question: str) : | |
response = openai.ChatCompletion.create( | |
#model="gpt-3.5-turbo-0613", | |
model = "gpt-4", | |
messages=[ | |
{"role": "system", "content": "Only use get_events_df function you have been provided with."}, | |
{"role": "user", "content": question} | |
], | |
#functions=[weather_function_description, okta_function_description ] | |
functions = [get_events_df.openai_schema], | |
function_call="auto", # = "get_events_df", | |
) | |
print ('here') | |
print (response) | |
function_call = response['choices'][0]['message']['function_call'] | |
function_arguments = json.loads(function_call['arguments']) | |
print (function_call.name, function_arguments) | |
func = globals()[function_call.name] | |
return func.from_response(response) | |
class openai_function: | |
def __init__(self, func: Callable) -> None: | |
self.func = func | |
#self.validate_func = validate_arguments(func) | |
self.validate_call = validate_call(func) | |
#parameters = self.validate_func.model.schema() | |
parameters = TypeAdapter (func).json_schema() | |
#parameters = self.model.schema() | |
parameters["properties"] = { | |
k: v | |
for k, v in parameters["properties"].items() | |
if k not in ("v__duplicate_kwargs", "args", "kwargs") | |
} | |
parameters["required"] = sorted( | |
parameters["properties"] | |
) # bug workaround see lc | |
self.openai_schema = { | |
#"name": self.__class__.__name__+"."+self.func.__name__, | |
"name": self.func.__name__, | |
"description": self.func.__doc__, | |
"parameters": parameters, | |
} | |
#self.model = self.validate_func.model | |
def __call__(self, *args: Any, **kwargs: Any) -> Any: | |
@wraps(self.func) | |
def wrapper(*args, **kwargs): | |
return self.validate_call(*args, **kwargs) | |
return wrapper(*args, **kwargs) | |
def from_response(self, completion, throw_error=True,class_name=None): | |
"""Execute the function from the response of an openai chat completion""" | |
message = completion.choices[0].message | |
print (message["function_call"]["name"] , self.openai_schema["name"] ) | |
if throw_error: | |
assert "function_call" in message, "No function call detected" | |
assert ( | |
message["function_call"]["name"] == self.openai_schema["name"] | |
), "Function name does not match" | |
if class_name: | |
arguments = json.loads(function_call["arguments"]) | |
function_call = class_name + "." + message["function_call"]["name"] | |
else : | |
function_call = message["function_call"] | |
arguments = json.loads(function_call["arguments"]) | |
#return self.validate_func(**arguments) | |
return self.validate_call(**arguments) | |
@openai_function | |
def okta_filters(filter_expression: str, reason: str): | |
""" filter experssion and a reason | |
""" | |
filter_expression: str = Field(..., description="The filter expression") | |
reason: str = Field(..., description="The reason for the filter") | |
@openai_function | |
def get_events_df(event_filter: str): | |
""" gets events from Okta that match to a filter | |
some filter examples | |
| Use Case | event_filter | | |
|-----------------------|------------------------------------------------| | |
| Password resets for users | eventType eq "user.account.reset_password" | | |
| Find Rate Limit errors | displayMessage eq "Rate limit violation" | | |
| Application Assignment | application.user_membership.add | | |
""" | |
from datetime import timedelta | |
import datetime | |
now = datetime.datetime.utcnow() | |
startTime = now - timedelta(minutes=1440) | |
logs_df_arr = [] | |
startTime = now - timedelta(minutes=43200) | |
# Okta API breaks with microsecondsp | |
api_token = os.environ['okta_api_key'] | |
org = os.environ['okta_org_name'] | |
params = {'since': startTime.isoformat()[:-3] + 'Z'} | |
url = 'https://' + org + '.okta.com/api/v1/logs?filter={event_filter}'.format(event_filter=event_filter) | |
api_request_headers = { | |
'Accept': 'application/json', | |
'Content-Type': 'application/json', | |
'Authorization': "SSWS {}".format(api_token) | |
} | |
events_response = requests.get( | |
url=url, | |
params=params, | |
headers=api_request_headers | |
) | |
print (events_response.json()) | |
logs_df = pd.DataFrame(events_response.json()) | |
logs_df_arr.append(logs_df) | |
events = requests.get(url, headers=api_request_headers, params=params) | |
print ("xxxx", events) | |
if 'self' in events.links and 'next' in events.links : | |
while 'self' in events.links and (events.links['self']['url'] != events.links['next']['url']) : | |
events = requests.get(events.links['next']['url'], headers=api_request_headers) | |
if 'next' in events.links : | |
url = events.links['next']['url'] | |
events_response = requests.get( | |
url=url, | |
params=params, | |
headers=api_request_headers | |
) | |
logs_df = pd.DataFrame(events_response.json()) | |
logs_df_arr.append(logs_df) | |
logs_df = pd.concat(logs_df_arr) | |
return logs_df | |
def get_embedding_from_pinecone(query): | |
query = """ give me the filter password resets | |
""" | |
#query = 'find the filter expression for suspicious events from the text.' | |
index_name = 'oktaevents' | |
index = pinecone.Index(index_name) | |
embed = OpenAIEmbeddings() | |
embedding = openai.Embedding.create( | |
input=query, | |
model="text-embedding-ada-002" | |
) | |
vector = embedding["data"][0]["embedding"] | |
index = pinecone.Index(index_name) | |
search_response = index.query( | |
top_k=2, | |
vector=vector, | |
include_metadata=True | |
) | |
items = search_response['matches'] | |
return items | |
def ask_llm_to_get_filter (items, query): | |
all_text = get_all_items (items) | |
#all_text = get_all_above_score (items, 0.9) | |
role = 'you are an expert at going through text and finding filter expressions inside text given for specific task. Only use the functions you have been provided with.' | |
desc = """ | |
# how to find the filter expresison | |
- the filter expressions have characters with dots inside them. | |
- the task that the filter expression will help with is right around the that filter expression | |
- you must pick one filter expression that best suits the task , the event filter must not have OR or AND | |
for example , in the below text, system.org.captcha.deactivate is the filter expression for | |
disabling captcha for all users across the company. | |
'system.org.captcha.deactivate\n' | |
'Disable org-wide captcha support. Indicates when ' | |
'org-wide captcha support is disabled. This event is ' | |
'fired when org admin disables org-wide captcha support ' | |
'for all pages.\n' | |
""" | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo-16k", | |
messages=[ | |
{ "role": "system", "content": role + "\n task:" + query }, | |
{ "role": "user", "content": desc + "\ntext:" + all_text} | |
], | |
functions = [okta_filters.openai_schema], | |
temperature=0.5, | |
max_tokens=8000 | |
) | |
print (response) | |
return response | |
def run_okta_events_function (response) : | |
from datetime import timedelta | |
import datetime | |
now = datetime.datetime.utcnow() | |
startTime = now - timedelta(minutes=1440) | |
function_args = response['choices'][0]['message']['function_call']['arguments'] | |
print(function_args) | |
df = chat_complete_and_run (str( json.loads( function_args)["filter_expression"]) ) | |
return df | |
def hey_okta(okta_query) : | |
query = okta_query | |
items = get_embedding_from_pinecone(query) | |
response = ask_llm_to_get_filter (items,query) | |
df = run_okta_events_function (response) | |
return df |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment