Skip to content

Instantly share code, notes, and snippets.

@rafiq
Created December 10, 2021 17:53
Show Gist options
  • Save rafiq/38e6d232cd3ab7acc267d57691457e57 to your computer and use it in GitHub Desktop.
Save rafiq/38e6d232cd3ab7acc267d57691457e57 to your computer and use it in GitHub Desktop.
from numpy.core import numeric
import pandas as pd
import random
# from random import randrange
import numpy as np
import KC
import matplotlib.pyplot as plt
windowSize = 10 #This is the length of a "good data" string
num_of_dots_allowed = 0 #How many dots are allowed in the "good data"
years_skipped = 2
step = 1 #How many data points we skip (granulate the data)
fileName = "/Users/rafiqkamal/Desktop/Data_Science/timeSeries/Energy and Mining 50 series 200 plus entities World Bank.xlsx"#insert file name here
originalData = pd.read_excel(fileName)
df = originalData.copy()
# print(df)
def get_clean_data(arr):
result = []
# temp = list(arr)
for row in arr:
series = []
for item in row:
# print(item)
if (type(item) == int or type(item) == float):
series.append(item)
if (item == '..'):
series.append(".")
result.append(series)
series = []
return result
# print(np.array(get_clean_data(arrayOfArrays)).shape)
def insert_to_random_index(array, characters, no_of_reps):#Just for test data
for _ in range(no_of_reps):
array = list(array)
array.insert(random.randint(0, len(array)), characters)
return array
def traverseTwoDArray (twoDArr):#Just for test data
result = []
for row in twoDArr:
temp = insert_to_random_index(row,".",2)
result.append(temp)
return result# Modifies array in place
def convert_to_int_and_leave_dots(arr):
result = []
row = list(arr)
for x in row:
if (x == "."):
result.append(x)
else:
result.append(int(float(x)))
return result
def convertToBinary(arr):#find the difference between each value, and if it is > 0 then we write '1' and otherwise we write '0'
result = []
for row in arr:
# print(row)
temp = []
limit = len(row) - step
# newRow = convert_to_int_and_leave_dots(row)
# newRow.astype(int)
for i, x in enumerate(row):
if i == limit:
continue
# if x == '..':
# temp.append(".")
# elif type(x) != str and type(newRow[i + step]) != str:
diff = 0 if x - row[i + step] > 0 else 1
temp.append(diff)
result.append(temp)
return result
def is_good_data(list):
return False if list.count(".") > num_of_dots_allowed else True
# print(binaryArrays)
def sliding_window(listArray,window_size):
reversed_array = list(reversed(listArray))
for i in range(len(reversed_array) - window_size + 1):
temp = list(reversed_array[i:i + window_size])
if is_good_data(temp):#? Can I take off second condition here?
# print(temp)
return list(reversed(temp))
return False
def get_good_string(arr):#search each row for most recent instance of good data
result = []
for row in arr:
if sliding_window(row,windowSize + 1):
good_list = sliding_window(row,windowSize + 1)
# print(good_list)
result.append(good_list)
else:
continue
return result
def join_list_into_string(npArray):
result = []
for row in npArray:
temp = "".join(map(str,row))
result.append(temp)
return result
def get_frequencies(twoD_array):
return dict((x,twoD_array.count(x)) for x in set(twoD_array))
def sort_dictionary(dict):
return sorted(dict.items(),key=lambda x:x)
def get_frequencies_list(twoDList):
result = []
for tuple in twoDList:
result.append(tuple[1])
return result
def get_complexities(list):
result = []
for i, x in enumerate(list):
k = KC.calc_KC(x[0])
k = np.round(k,1)
result.append(k)
# print(list)
list[i] = x + (k,)
return result
# string freq complx
#sortedDict mutated=[... ('101111111111111', 48, 13.7), ('111111111111111', 895, 3.9)]
def get_log10(list):
newArray = np.array(list)
result = []
for x in newArray:
result.append(np.log10(x))
return result
def get_probabilities(list):
result = []
N = sum(list)
for num in list:
temp = num / N
result.append(temp)
return result
def get_Up_Bound(list):
newList = np.array(list)
Up_Bound = 2**-newList
return Up_Bound
def get_kscaled(list):
K = np.array(list)
#a_set = set(K)
# N = len(a_set)
N = 2**windowSize
K_scaled = np.log2(N) * (K - np.min(K)) / (np.max(K) - np.min(K))
return K_scaled
def make_plot(list1,list2):#plots np.log10(frequency) vs complexity
probability = get_probabilities(list1)
scaledComplexities = get_kscaled(list2)#make upper from complex
upperBound = get_Up_Bound(scaledComplexities)
log10List = get_log10(probability)
plt.plot(scaledComplexities, np.log10(upperBound), "--", color="black", label="Upper Bound")
plt.plot(scaledComplexities,log10List, linestyle="" ,marker="o",color="blue")
plt.title("{0}Chars, {1}years skipped, Number of series:{2}".format(windowSize,years_skipped,length_of_series))
plt.xlabel("Complexities")
plt.ylabel("log10 P(x)")
plt.show()
def add_complexities_to_string(listOfFrequencies,listOfComplexities):
result = []
for i, x in enumerate(listOfFrequencies):
temp = x + (listOfComplexities[i],)
result.append(temp)
return result
def get_strings_of_tuples(twoDArray):
result = []
for x in twoDArray:
temp = " ".join(map(str,x))
result.append(temp)
result = "\n".join(result)
return result
#1. remove labels and non key data
# 2. delete columns
# 3. sliding window to get good strings
# 4. convert np.array to binary
# 5. make plots
#! Test Suite: uncomment these and comment the line below to see some test data
#1. remove labels and non key data
# test = np.array(random.sample(range(100),100)).reshape(10,10)
# print(test)
# test = traverseTwoDArray(test)# Modifies array in place and adds dots
# print(test,"\n\n")
# newDF = pd.DataFrame(test)
# print("converted list to a data frame\n",newDF,"\n\n")
# # 2. delete columns
# newDF = newDF.iloc[:,::years_skipped]
# print("delete every other row\n",newDF,"\n\n")
# newDF = newDF.to_numpy()
# print("convert the data frame to a numpy array\n",newDF,"\n\n")
# # 3. sliding window to get good strings
# goodStringsArray = get_good_string(newDF)
# print("Got the good strings out of the array\n",np.array(goodStringsArray),"\n\n")
# # 4. convert np.array to binary
# testResult = convertToBinary(goodStringsArray)
# print("getting all the binary strings from the np array\n",np.array(testResult),"Number of time series: ",len(testResult))
# print("Data Frame with every {years_skipped} column skipped:\n",df)
df.dropna(inplace=True)
df = df.iloc[:,::years_skipped]#Remove every other column to make data more coarse
#! SHOULD I DELETE COLUMNS BEFORE OR AFTER gettting rid of the ".."
arrayOfArrays = df.to_numpy(copy=True)
# 2. remove labels and non key data
test = get_clean_data(arrayOfArrays)# Modifies array in place and adds dots
# print("Cleaned data:\n" test,"\n\n","Number of serier",len(test))
# print(type(test))
# 1. delete columns
# test = (pd.DataFrame(test))
# print(test)
# test = test.to_numpy()
#? newDF = pd.DataFrame(test)
# print("converted list to a data frame\n",newDF,"\n\n")
# ? newDF = newDF.to_numpy()
# print("convert the data frame to a numpy array\n",test,"\n\n",type(test))
# 3. sliding window to get good strings
goodStringsArray = get_good_string(test)
np.array(goodStringsArray)
# print("Got the good strings out of the array\n",goodStringsArray,"\n\n",goodStringsArray[0] ,"Number of series: ", len(goodStringsArray))
#! I think we should remove the arrays of all zeros too because they are skewing the data
# 4. convert to binary
testResult = convertToBinary(goodStringsArray)
# print("getting all the binary strings from the np array\n",np.array(testResult),"\nNumber of time series: ",len(testResult))
# Make list of strings of binary
listOfbinaryStrings = join_list_into_string(testResult)
# print("List of binary strings:\n",listOfbinaryStrings,"\nNumber of series: ",len(listOfbinaryStrings))
# Get frequencies and complexities
frequencies = get_frequencies(listOfbinaryStrings)
# print(frequencies)
# Sort frequencies
sortedDict = dict(sorted(frequencies.items(),key=lambda item: item[1]))
sortedDict = list(sortedDict.items())
# print(sortedDict)
# ! get complexities
complexities = get_complexities(sortedDict)#[53.1,41.1,35.1,...]#! uncomment
# print(complexities)
# print(sortedDict)#Mutated
# string freq complx
#sortedDict mutated=[... ('101111111111111', 48, 13.7), ('111111111111111', 895, 3.9)]
# ! sort object of frequencies
frequency_list = get_frequencies_list(sortedDict)#[...1,...59, 62, 288, 1370]#! uncomment
length_of_series = sum(frequency_list)
# print("\nList of frequencies: ",frequency_list, "Number of Series:", sum(frequency_list), "| Character Size:",windowSize,"| Step is:",years_skipped)
#! 5. make plots
for x in [5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25]:
windowSize = x
for y in [1,2,3,4]:
years_skipped = y
make_plot(frequency_list,complexities)#! uncomment
# final_string_log10freq_complexity = add_complexities_to_string(sortedDict,complexities)#! uncomment
# final_string_log10freq_complexity = get_strings_of_tuples(final_string_log10freq_complexity)#000000000000000 1632 3.9\n000000000000000 1632 3.9...#! uncomment
# print(final_string_log10freq_complexity)
# with open("{0}Chars, {1}years skipped".format(windowSize,years_skipped), 'w') as f: #! uncomment
# f.write(final_string_log10freq_complexity)#! uncomment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment