Skip to content

Instantly share code, notes, and snippets.

@rafiq
Last active December 3, 2021 16:33
Show Gist options
  • Save rafiq/e9f108ed9ad22c38c6f9f0c828c44e95 to your computer and use it in GitHub Desktop.
Save rafiq/e9f108ed9ad22c38c6f9f0c828c44e95 to your computer and use it in GitHub Desktop.
Common Patterns in Real Data
import pandas as pd
import numpy as np
import KC
import matplotlib.pyplot as plt
windowSize = 5 #This is the length of a "good data" string
num_of_dots_allowed = 0 #How many dots are allowed in the "good data"
step = 1 #How many data points we skip (granulate the data)
fileName = "/Users/rafiqkamal/Desktop/Data_Science/timeSeries/Energy and Mining 50 series 200 plus entities World Bank.xlsx"#insert file name here
originalData = pd.read_excel(fileName)
df = originalData.copy()
# CLEAN DATA
df = df.drop(range(10850,10855))#removing Nan cells#! How can I do this automatically
df = df.iloc[:,::2]#Remove every other column
# print(df)
arrayOfArrays = df.to_numpy(copy=True)
# print(arrayOfArrays)
# test = np.array(range(25)).reshape(5,5)
# print(test)
def convertToBinary(arr):#find the difference between each value, and if it is > 0 then we write '1' and otherwise we write '0'
result = []
for row in arr:
# print(row)
temp = []
limit = len(row) - step
for i, x in enumerate(row):
if i == limit:
continue
if x == '..':
temp.append(".")
if type(x) != str and type(row[i + step]) != str:
diff = 0 if x - row[i + step] > 0 else 1
# print(x,row[i + step])
temp.append(diff)
result.append(temp)
return result
# newDF = pd.DataFrame(test)
# print("converted list to an data frame\n",newDF)
# # newDF.drop(newDF.columns[list(range(0,len(newDF[0]),2))],axis=1,inplace=True)
# newDF = newDF.iloc[:,::2]
# print("delete every other row\n",newDF)
# newDF = newDF.to_numpy()
# print("convert the data frame to a numpy array\n",newDF)
# testResult = convertToBinary(newDF)
# print("getting all the binary strings from the np array\n",testResult)
# print(arrayOfArrays)
binaryArrays = convertToBinary(arrayOfArrays)#! uncomment
# print(binaryArrays)
def is_good_data(list):
return False if list.count(".") > num_of_dots_allowed else True
def sliding_window(listArray,window_size):
reversed_array = list(reversed(listArray))
for i in range(len(reversed_array) - window_size + 1):
temp = list(reversed_array[i:i + window_size])
if is_good_data(temp) and type(temp) == list:
return reversed(temp)
return False
def get_good_string(arr):#search each row for most recent instance of good data
result = []
for row in arr:
string_array = [str(int) for int in row]#Convert arrays to strings
if sliding_window(string_array,windowSize):
good_binary_list = sliding_window(string_array,windowSize)
else:
continue
binaryString = "".join(good_binary_list)
result.append(binaryString)
return result
stringOfNums = get_good_string(binaryArrays)#! uncomment
# print(stringOfNums)
def get_frequencies(twoD_array):
return dict((x,twoD_array.count(x)) for x in set(twoD_array))
def sort_dictionary(dict):
return sorted(dict.items(),key=lambda x:x[1])
dictionary = get_frequencies(stringOfNums)#! uncomment
sortedDict = sort_dictionary(dictionary)#[...("01010",52),("0101",55),...]#! uncomment
# print(sortedDict)
def get_frequencies_list(twoDList):
result = []
for tuple in twoDList:
result.append(tuple[1])
return result
frequency_list = get_frequencies_list(sortedDict)#[...1,...59, 62, 288, 1370]#! uncomment
# print(frequency_list)
def get_complexities(list):
result = []
for i, x in enumerate(list):
k = KC.calc_KC(x[0])
k = np.round(k,1)
result.append(k)
list[i] = x + (k,)
return result
# string freq complx
#sortedDict mutated=[... ('101111111111111', 48, 13.7), ('111111111111111', 895, 3.9)]
complexities = get_complexities(sortedDict)#[53.1,41.1,35.1,...]#! uncomment
# print(complexities)
# print(sortedDict)
def get_log10(list):
newArray = np.array(list)
result = []
for x in newArray:
result.append(np.log10(x))
return result
def get_probabilities(list):
result = []
N = sum(list)
for num in list:
temp = num / N
result.append(temp)
return result
def get_Up_Bound(list):
newList = np.array(list)
Up_Bound = 2**-newList
return Up_Bound
def get_kscaled(list):
K = np.array(list)
a_set = set(K)
# N = len(a_set)#!THis looks worst
N = 2**windowSize
K_scaled = np.log2(N) * (K - np.min(K)) / (np.max(K) - np.min(K))
return K_scaled
def make_plot(list1,list2):#plots np.log10(frequency) vs complexity
probability = get_probabilities(list1)
scaledComplexities = get_kscaled(list2)#make upper from complex
upperBound = get_Up_Bound(scaledComplexities)
log10List = get_log10(probability)
plt.plot(scaledComplexities, np.log10(upperBound), "--", color="black", label="Upper Bound")
plt.plot(scaledComplexities,log10List, linestyle="" ,marker="o",color="blue")
plt.title("{0}Chars{1}Dots".format(windowSize,num_of_dots_allowed))
plt.xlabel("Complexities")
plt.ylabel("log10 P(x)")
plt.show()
# log10_of_frequencies = get_log10(sortedDict)#[1.1231,2.23,3.13,...]
# print(get_Up_Bound(complexities))
make_plot(frequency_list,complexities)#! uncomment
def add_complexities_to_string(listOfFrequencies,listOfComplexities):
result = []
for i, x in enumerate(listOfFrequencies):
temp = x + (listOfComplexities[i],)
result.append(temp)
return result
def get_strings_of_tuples(twoDArray):
result = []
for x in twoDArray:
temp = " ".join(map(str,x))
result.append(temp)
result = "\n".join(result)
return result
final_string_log10freq_complexity = add_complexities_to_string(sortedDict,complexities)#! uncomment
final_string_log10freq_complexity = get_strings_of_tuples(final_string_log10freq_complexity)#000000000000000 1632 3.9\n000000000000000 1632 3.9...#! uncomment
with open("{0}Chars{1}Dots.txt".format(windowSize,num_of_dots_allowed), 'w') as f: #! uncomment
f.write(final_string_log10freq_complexity)#! uncomment
# np.save("list of lists",arrayOfArrays)
# np.savetxt('listData2', arrayOfArrays,fmt='%s')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment