Skip to content

Instantly share code, notes, and snippets.

@rkreddyp
Created October 27, 2023 01:26
Show Gist options
  • Save rkreddyp/f92701e5fc3ab3624087c9593ab9f377 to your computer and use it in GitHub Desktop.
Save rkreddyp/f92701e5fc3ab3624087c9593ab9f377 to your computer and use it in GitHub Desktop.
pineconeall
import json
import pandas as pd
from functools import wraps
from typing import Any, Callable
from pydantic import validate_arguments, BaseModel, validate_call, model_validator, create_model, TypeAdapter
import requests, time, pinecone
from bs4 import BeautifulSoup
from langchain.indexes import VectorstoreIndexCreator
from langchain.document_loaders import UnstructuredURLLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Pinecone
from langchain import OpenAI
from langchain.chains import RetrievalQAWithSourcesChain
import os, tempfile, openai
from langchain.llms.openai import OpenAI
from langchain.vectorstores.pinecone import Pinecone
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.chains.summarize import load_summarize_chain
from langchain.document_loaders import PyPDFLoader
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import ElasticVectorSearch, Pinecone, Weaviate, FAISS
from langchain.chains.question_answering import load_qa_chain
import pinecone
import os,pypdf
from langchain.document_loaders import UnstructuredPDFLoader, OnlinePDFLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
os.environ["OPENAI_API_KEY"] = 'xxx'
PINECONE_API_KEY = "xxx"
PINECONE_ENV='us-west4-gcp-free'
pinecone.init(
api_key=PINECONE_API_KEY,
environment=PINECONE_ENV
)
def get_all_above_score (items, score):
above_score_items = ' '.join([item["metadata"]['text'] for item in items if item["score"] > score])
return above_score_items
def get_all_items (items):
all_items = ' '.join([item["metadata"]['text'] for item in items ])
return all_items
def get_highest_score_url(items):
highest_score_item = max(items, key=lambda item: item["score"])
print (highest_score_item)
if highest_score_item["score"] > 0.7:
return highest_score_item["metadata"]['text']
else:
return ""
def chat_complete_simple (question: str) :
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo-16k",
messages=[
{"role": "user", "content": question}
]
)
return response
def chat_complete_and_run (question: str) :
response = openai.ChatCompletion.create(
#model="gpt-3.5-turbo-0613",
model = "gpt-4",
messages=[
{"role": "system", "content": "Only use get_events_df function you have been provided with."},
{"role": "user", "content": question}
],
#functions=[weather_function_description, okta_function_description ]
functions = [get_events_df.openai_schema],
function_call="auto", # = "get_events_df",
)
print ('here')
print (response)
function_call = response['choices'][0]['message']['function_call']
function_arguments = json.loads(function_call['arguments'])
print (function_call.name, function_arguments)
func = globals()[function_call.name]
return func.from_response(response)
class openai_function:
def __init__(self, func: Callable) -> None:
self.func = func
#self.validate_func = validate_arguments(func)
self.validate_call = validate_call(func)
#parameters = self.validate_func.model.schema()
parameters = TypeAdapter (func).json_schema()
#parameters = self.model.schema()
parameters["properties"] = {
k: v
for k, v in parameters["properties"].items()
if k not in ("v__duplicate_kwargs", "args", "kwargs")
}
parameters["required"] = sorted(
parameters["properties"]
) # bug workaround see lc
self.openai_schema = {
#"name": self.__class__.__name__+"."+self.func.__name__,
"name": self.func.__name__,
"description": self.func.__doc__,
"parameters": parameters,
}
#self.model = self.validate_func.model
def __call__(self, *args: Any, **kwargs: Any) -> Any:
@wraps(self.func)
def wrapper(*args, **kwargs):
return self.validate_call(*args, **kwargs)
return wrapper(*args, **kwargs)
def from_response(self, completion, throw_error=True,class_name=None):
"""Execute the function from the response of an openai chat completion"""
message = completion.choices[0].message
print (message["function_call"]["name"] , self.openai_schema["name"] )
if throw_error:
assert "function_call" in message, "No function call detected"
assert (
message["function_call"]["name"] == self.openai_schema["name"]
), "Function name does not match"
if class_name:
arguments = json.loads(function_call["arguments"])
function_call = class_name + "." + message["function_call"]["name"]
else :
function_call = message["function_call"]
arguments = json.loads(function_call["arguments"])
#return self.validate_func(**arguments)
return self.validate_call(**arguments)
@openai_function
def okta_filters(filter_expression: str, reason: str):
""" filter experssion and a reason
"""
filter_expression: str = Field(..., description="The filter expression")
reason: str = Field(..., description="The reason for the filter")
@openai_function
def get_events_df(event_filter: str):
""" gets events from Okta that match to a filter
some filter examples
| Use Case | event_filter |
|-----------------------|------------------------------------------------|
| Password resets for users | eventType eq "user.account.reset_password" |
| Find Rate Limit errors | displayMessage eq "Rate limit violation" |
| Application Assignment | application.user_membership.add |
"""
from datetime import timedelta
import datetime
now = datetime.datetime.utcnow()
startTime = now - timedelta(minutes=1440)
logs_df_arr = []
startTime = now - timedelta(minutes=43200)
# Okta API breaks with microsecondsp
api_token = os.environ['okta_api_key']
org = os.environ['okta_org_name']
params = {'since': startTime.isoformat()[:-3] + 'Z'}
url = 'https://' + org + '.okta.com/api/v1/logs?filter={event_filter}'.format(event_filter=event_filter)
api_request_headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': "SSWS {}".format(api_token)
}
events_response = requests.get(
url=url,
params=params,
headers=api_request_headers
)
print (events_response.json())
logs_df = pd.DataFrame(events_response.json())
logs_df_arr.append(logs_df)
events = requests.get(url, headers=api_request_headers, params=params)
print ("xxxx", events)
if 'self' in events.links and 'next' in events.links :
while 'self' in events.links and (events.links['self']['url'] != events.links['next']['url']) :
events = requests.get(events.links['next']['url'], headers=api_request_headers)
if 'next' in events.links :
url = events.links['next']['url']
events_response = requests.get(
url=url,
params=params,
headers=api_request_headers
)
logs_df = pd.DataFrame(events_response.json())
logs_df_arr.append(logs_df)
logs_df = pd.concat(logs_df_arr)
return logs_df
def get_embedding_from_pinecone(query):
query = """ give me the filter password resets
"""
#query = 'find the filter expression for suspicious events from the text.'
index_name = 'oktaevents'
index = pinecone.Index(index_name)
embed = OpenAIEmbeddings()
embedding = openai.Embedding.create(
input=query,
model="text-embedding-ada-002"
)
vector = embedding["data"][0]["embedding"]
index = pinecone.Index(index_name)
search_response = index.query(
top_k=2,
vector=vector,
include_metadata=True
)
items = search_response['matches']
return items
def ask_llm_to_get_filter (items, query):
all_text = get_all_items (items)
#all_text = get_all_above_score (items, 0.9)
role = 'you are an expert at going through text and finding filter expressions inside text given for specific task. Only use the functions you have been provided with.'
desc = """
# how to find the filter expresison
- the filter expressions have characters with dots inside them.
- the task that the filter expression will help with is right around the that filter expression
- you must pick one filter expression that best suits the task , the event filter must not have OR or AND
for example , in the below text, system.org.captcha.deactivate is the filter expression for
disabling captcha for all users across the company.
'system.org.captcha.deactivate\n'
'Disable org-wide captcha support. Indicates when '
'org-wide captcha support is disabled. This event is '
'fired when org admin disables org-wide captcha support '
'for all pages.\n'
"""
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo-16k",
messages=[
{ "role": "system", "content": role + "\n task:" + query },
{ "role": "user", "content": desc + "\ntext:" + all_text}
],
functions = [okta_filters.openai_schema],
temperature=0.5,
max_tokens=8000
)
print (response)
return response
def run_okta_events_function (response) :
from datetime import timedelta
import datetime
now = datetime.datetime.utcnow()
startTime = now - timedelta(minutes=1440)
function_args = response['choices'][0]['message']['function_call']['arguments']
print(function_args)
df = chat_complete_and_run (str( json.loads( function_args)["filter_expression"]) )
return df
def hey_okta(okta_query) :
query = okta_query
items = get_embedding_from_pinecone(query)
response = ask_llm_to_get_filter (items,query)
df = run_okta_events_function (response)
return df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment