Skip to content

Instantly share code, notes, and snippets.

@rafiq
Created January 9, 2022 12:51
Show Gist options
  • Save rafiq/cca12390935994bb45fb709a9c6a0a66 to your computer and use it in GitHub Desktop.
Save rafiq/cca12390935994bb45fb709a9c6a0a66 to your computer and use it in GitHub Desktop.
from datetime import date
import pandas as pd
import random
import numpy as np
import KC
import matplotlib.pyplot as plt
today = date.today()
current_date = today.strftime("%b-%d-%Y")
windowSize = 11 #This is the length of a "good data" string
num_of_dots_allowed = 0 #How many dots are allowed in the "good data"
years_skipped = 1
step = 1 #How many data points we skip (granulate the data)
file_paths_list = ["/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/World_Health_Data/Data_Extract_From_Health_Nutrition_and_Population_Statistics.xlsx","/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/World_Development_Data/Data_Extract_From_World_Development_Indicators.xlsx","/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/Nitrous_Oxide_Data/Data_Extract_From_World_Development_Indicators.xlsx","/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/Jobs_Data/Data_Extract_From_Jobs.xlsx","/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/Global_Economics_Data/Data_Extract_From_Global_Economic_Monitor_(GEM).xlsx","/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/Energy_Data/Energy and Mining 50 series 200 plus entities World Bank.xlsx","/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/Education_Statistics/Data_Extract_From_Education_Statistics_-_All_Indicators.xlsx","/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/ALLDATACOMBINED/Combined Data Jan-03-2022.xlsx"]
def combine_csv_data(OneDArrayOfPathNames):
files = [pd.ExcelFile(name) for name in OneDArrayOfPathNames]
frames = [x.parse(x.sheet_names[0],header=None,index_col=None) for x in files]
frames[1:] = [df[1:] for df in frames[1:]]
combined = pd.concat(frames)
combined.to_excel("{1}Combined Data {0}.xlsx".format(current_date,save_path),header=False,index=False)
# combine_csv_data(file_paths_list)#! Uncomment to make a combined excel file
def get_clean_data(arr):
result = []
for row in arr:
series = []
for item in row:
if (type(item) == int or type(item) == float):
series.append(item)
if (item == '..'):
series.append(".")
result.append(series)
series = []
return result
def insert_to_random_index(array, characters, no_of_reps):#Just for test data
for _ in range(no_of_reps):
array = list(array)
array.insert(random.randint(0, len(array)), characters)
return array
def traverseTwoDArray (twoDArr):#Just for test data
result = []
for row in twoDArr:
temp = insert_to_random_index(row,"..",2)
result.append(temp)
return result# Modifies array in place
def convertToBinary(arr):#find the difference between each value, and if it is > 0 then we write '1' and otherwise we write '0'
result = []
for row in arr:
temp = []
limit = len(row) - 1
for i, x in enumerate(row):
if i == limit:
continue
diff = 0 if x - row[i + 1] > 0 else 1
temp.append(diff)
result.append(temp)
return result
# Series value becomes 1 if value is >mean of the series, 0 otherwise
def MoreLessMean(X):
binary_series = []
for row in X:
row_ = row - np.mean(row)
b = ''.join([ str(int(r>0)) for r in row_])
binary_series.append(b)
return binary_series
# Series value becomes 1 if value is >median of the series, 0 otherwise
def MoreLessMedian(X):
binary_series = []
for row in X:
row_ = row - np.median(row)
b = ''.join([ str(int(r>0)) for r in row_])
binary_series.append(b)
return binary_series
# normalise the series to be between 0 and 1, then set "1" id >0.5
def MoreLessHalf(X):
binary_series = []
for row in X:
row_ = row - np.min(row)
row_ = row_/np.max(row_)# row_ has values between 0 and 1 now
b = ''.join([ str(int(r>0.5)) for r in row_])
binary_series.append(b)
return binary_series
# subtract the linear trend
def LinearDetrendDiff(X):
binary_series = []
for row in X:
t = np.arange(len(row))
fit = np.polyfit(t,row,1)# fit linear func y = mx+b to series
row_ = row - (fit[1]*t + fit[0])
b = ''.join([str(int(val>0)) for val in np.diff(row_)])
binary_series.append(b)
return binary_series
# subtract the linear trend
def AboveLinearDetrend(X):
binary_series = []
for row in X:
t = np.arange(len(row))
fit = np.polyfit(t,row,1)# fit linear func y = mx+b to series
row_ = row - (fit[1]*t + fit[0])
b = ''.join([ str(int(r>0)) for r in row_])
binary_series.append(b)
return binary_series
def K_scaled_all_series(listOfTuples):
result = []
for tuple in listOfTuples:
result.append(tuple[2])
return result
def is_good_data(list):
for element in list:
if isinstance(element, str) or np.isnan(float(element)) or not isinstance(element, (int,float)):
return False
return True
def sliding_window(listArray,window_size):
reversed_array = list(reversed(listArray))
for i in range(len(reversed_array) - window_size + 1):
temp = list(reversed_array[i:i + window_size + 1])
if is_good_data(temp):
return list(reversed(temp))
return False
def has_string(list):
for x in list:
if type(x) == str:
return True
def get_good_string(arr):#search each row for most recent instance of good data
result = []
for row in arr:
good_list = sliding_window(row,windowSize )
if good_list:
result.append(good_list)
else:
continue
return result
def join_list_into_string(npArray):
result = []
for row in npArray:
temp = "".join(map(str,row))
result.append(temp)
return result
def get_frequencies(twoD_array):
return dict((x,twoD_array.count(x)) for x in set(twoD_array))
def sort_dictionary(dict):
return sorted(dict.items(),key=lambda x:x)
def get_frequencies_list(twoDList):
result = []
for tuple in twoDList:
result.append(tuple[1])
return result
def get_complexities(list):
result = []
for i, x in enumerate(list):
k = KC.calc_KC(x[0])
k = np.round(k,1)
result.append(k)
list[i] = x + (k,)
return result
# string freq complx
#sortedDict mutated=[... ('101111111111111', 48, 13.7), ('111111111111111', 895, 3.9)]
def get_log10(list):
newArray = np.array(list)
result = []
for x in newArray:
result.append(np.log10(x))
return result
def get_probabilities(list):
result = []
N = sum(list)
for num in list:
temp = num / N
result.append(temp)
return result
def get_Up_Bound(list):
newList = np.array(list)
Up_Bound = 2**-newList
return Up_Bound
def get_kscaled(list):#? Does this change the order of the output?
K = np.array(list)
# a_set = set(K)
# N = len(a_set)
N = 2**windowSize
K_scaled = np.log2(N) * (K - np.min(K)) / (np.max(K) - np.min(K))
return K_scaled
def get_min_max_freq_patterns(listArray):
result = []
for tuple in listArray:
temp = []
temp2 = []
complexity = tuple[0]
pattern_of_max = tuple[1][0][0]
pattern_of_min = tuple[1][1][0]
max_freq = tuple[1][0][1]
min_freq = tuple[1][1][1]
temp.append(complexity)
temp.append(pattern_of_max)
temp.append(max_freq)
temp2.append(complexity)
temp2.append(pattern_of_min)
temp2.append(min_freq)
result.append(temp)
result.append(temp2)
return result
def make_plot(list1,list2,function,with_annotations=False):#plots np.log10(frequency) vs complexity
probability = get_probabilities(list1)
scaledComplexities = np.round(get_kscaled(list2),1)#make upper from complex
upperBound = get_Up_Bound(scaledComplexities)
log10List = np.round(get_log10(probability),3)
normalListOfFrequencies = np.round(probability,3)
upperBound = np.log10(upperBound)
prediction_success_rate = PredictWhichIsHigherProb(probability,list2)
prediction_success_rate_list.append(prediction_success_rate)
prediction_success_rate_list.append("\n")
temp = []
for i,row in enumerate(scaledComplexities):
temp.append([sortedDict[i][0],normalListOfFrequencies[i],scaledComplexities[i]])
unique_complexities = get_unique_complexities(temp)
temp2 = []
for i,row in enumerate(scaledComplexities):
temp2.append([sortedDict[i][0],log10List[i],scaledComplexities[i]])
log10_unique_complexities = get_unique_complexities(temp2)
font = {#make a copy of the things that will change
"size":15,
"weight":"bold",
}
plt.rc("font",**font)
plt.plot(scaledComplexities, upperBound, "-", color="black", label=f'Success Rate: {prediction_success_rate}')
plt.plot(scaledComplexities,log10List, linestyle="" ,marker="o",color="blue", ms=12)
plt.gcf().set_size_inches(15, 9)
plt.ylim(( min(log10List) - 0.25 ,0 ))
plt.title("{0}Chars, {1}years skipped, Number of series:{2} {3}".format(windowSize,years_skipped,length_of_series,data_short_name),fontsize=label_fontsize)
plt.xlabel("Complexities [Binarise Method: {0}]".format(function.__name__),fontsize=label_fontsize)
plt.ylabel(r'$\log_{10}P(x)$',fontsize=label_fontsize)
unique_complexities.sort(key=lambda x: x[0])
list_of_pattern_complex_freq = get_min_max_freq_patterns(unique_complexities)
#Returns list of [pattern,complexity,max,min]
string_unique_complexities = get_strings_of_tuples(list_of_pattern_complex_freq)#! uncomment
if windowSize == 20:
every_other_complexity = log10_unique_complexities[::3]
rotation_degrees = 60
elif windowSize >= 10:
every_other_complexity = log10_unique_complexities[::2]
rotation_degrees = 60
else:
every_other_complexity = log10_unique_complexities[::2]
rotation_degrees = 0
if with_annotations:
for tuple in every_other_complexity:#?THIS MAY BE THE PROBLEM TO WHY THE ANNOTATIONS WERE ALL PUT INTO THE SAME PLACE. MAYBE I SHOULD PUT THAT CODE IN THIS LOOP OR REMOVE THE LOOP.
complexity = tuple[0]
pattern_of_max = tuple[1][0][0]
pattern_of_min = tuple[1][1][0]
max_freq = tuple[1][0][1]
min_freq = tuple[1][1][1]
text1 = plt.annotate(
pattern_of_max,
xy=(complexity, max_freq), xytext=(complexity + 15, max_freq + 15),
textcoords='offset points', ha='left', va='bottom',
bbox=dict(boxstyle='round,pad=0.5', fc='white', alpha=1),
arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0.3',),
annotation_clip=False,rotation=rotation_degrees
)
text2 = plt.annotate(
pattern_of_min,
xy=(complexity, min_freq), xytext=(complexity - 25, min_freq - 25),
textcoords='offset points', ha='right', va='bottom',
bbox=dict(boxstyle='round,pad=0.5', fc='0.9', alpha=1),
arrowprops=dict(arrowstyle='->', connectionstyle='angle3,angleA=0,angleB=90'),
annotation_clip=False,rotation=rotation_degrees
)
text1.set_fontsize(label_fontsize)
text2.set_fontsize(label_fontsize)
# text1.set_fontsize(annotation_fontsize)
# text2.set_fontsize(annotation_fontsize)
all_data_labels = []
labels = []
x_array = []
y_array = []
for tuple in sortedDict:
all_data_labels.append(tuple[0])
for x,y in every_other_complexity:#unique_complexities
x_array.append(x)
y_array.append(y[0][1])
labels.append(y[0][0])
for x,y in every_other_complexity:#unique_complexities
x_array.append(x)
y_array.append(y[1][1])
labels.append(y[1][0])
###############[['11111111111111111111', 0.0, -0.382, -1.791]
# with open("{4}{0}Chars_{1}_Pattern_Complexity_Max_and_Min_{3}_{2}.txt".format(windowSize,func.__name__,current_date,data_short_name,save_path), 'w') as f: #! uncomment
# f.write(string_unique_complexities)#! uncomment
# print(get_strings_of_tuples(list_of_pattern_complex_freq))
# plt.savefig('{4}Scatter_Plot_{0}chars_{1}_{2}_{3}_Every {5} Year'.format(windowSize,func.__name__,data_short_name,current_date,save_path,years_skipped))#! Uncomment
plt.show()#! Uncomment
def make_histogram(listOfAllComplexities):
plt.hist(listOfAllComplexities,ec='black',color='orange',log=True,density=True,bins=5)
plt.xlabel(r'$\tilde{K}(x)$')
plt.ylabel('Frequency',labelpad=20)
ax = plt.gca()
ax.tick_params(axis="both", which="both", labelsize=11)
plt.title('Hist_{0}chars_{1}_{2}'.format(windowSize,func.__name__,data_short_name))
# plt.savefig('{4}Hist_{0}chars_{1}_{2}_{3}_Every {5} Year'.format(windowSize,func.__name__,data_short_name,current_date,save_path,years_skipped))#! Uncomment
plt.show()
def add_complexities_to_string(listOfFrequencies,listOfComplexities):
result = []
for i, x in enumerate(listOfFrequencies):
temp = x + (listOfComplexities[i],)
result.append(temp)
return result
def get_strings_of_tuples(twoDArray):
result = []
for x in twoDArray:
temp = " ".join(map(str,x))
result.append(temp)
result = "\n".join(result)
return result
def get_unique_complexities(listOfTuples):
seen = {}
for listArray in listOfTuples:
currComplexity = listArray[2]
if currComplexity not in seen:
# complexity key max min
seen[currComplexity] = [[listArray[0],listArray[1]],[listArray[0],listArray[1]]]
else:
seen[currComplexity][0] = [listArray[0],listArray[1]] if seen[currComplexity][0][1] < listArray[1] else seen[currComplexity][0]
seen[currComplexity][1] = [listArray[0],listArray[1]] if seen[currComplexity][1][1] > listArray[1] else seen[currComplexity][1]
return list(seen.items())
def PredictWhichIsHigherProb(P,K):
assert(len(P)==len(K))
assert(np.abs(sum(P)-1)<0.001)
success_rate = []
for samps in range(10000):# do samples to test our predictions
# pick a random series x and series y, according to their probabilities, and record their probabilities
indx = np.random.choice(np.arange(len(K)),p=P)
indy = np.random.choice(np.arange(len(K)),p=P)
if K[indx] < K[indy]:
#predict x is more likely that y (or equal)
success_rate.append(1*(P[indx]>=P[indy]))
elif K[indy] < K[indx]:
success_rate.append(1*(P[indy]>=P[indx]))
elif K[indy] == K[indx]:
#if the complexities are the same, flip a coin to predict which has higher probability
success_rate.append(1*(np.random.rand()>0.5))
return np.sum(success_rate)/len(success_rate)
for file in file_paths_list:
fileArray = file.split("/")
fileName = file
data_short_name = fileArray[len(fileArray) - 2]
save_path = "/".join(fileArray[:-1])
print("file name: ",fileName,"\ndata short name: ",data_short_name,"\nsave path: ",save_path)
# fileName = "/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/Energy_Data/Energy and Mining 50 series 200 plus entities World Bank.xlsx"#insert file name here
# save_path = "/Users/rafiqkamal/Desktop/Data_Science/timeSeries/timeSerierEnv/Energy_Data/"#Put desired file path here
# data_short_name = "Energy Data" #! Change with new data set
originalData = pd.read_excel(fileName)
df = originalData.copy()
label_fontsize = 20
prediction_success_rate_list = []
# CONTENTS:
#1. remove labels and non key data
# 2. delete columns
# 3. sliding window to get good strings
# 4. convert np.array to binary
# 5. make plots
#! Test Suite: uncomment these and comment the line below to see some test data
###1. remove labels and non key data
# test = np.array(random.sample(range(100),100)).reshape(10,10)
# print(test)
# test = traverseTwoDArray(test)# Modifies array in place and adds dots
# print("\n Added some '..' to the data to simulate the real data \n", test,"\n\n")
# #### # 2. sliding window to get good strings
# goodStringsArray = get_good_string(test)
# print("Got the good strings out of the array\n",np.array(goodStringsArray),"\n\n")
# ### # 3. delete columns
# newDF = pd.DataFrame(goodStringsArray)
# print("converted list to a data frame\n",newDF,"\n\n")
# newDF = newDF.iloc[:,::years_skipped]
# print("delete every other row\n",newDF,"\n\n")
# newDF = newDF.to_numpy()
# print("convert the data frame to a numpy array\n",newDF,"\n\n")
# ### # 4. convert np.array to binary
# testResult = get_good_string(newDF)
# print("getting all the binary strings from the np array\n",np.array(testResult),"Number of time series: ",len(testResult))
# df.dropna(inplace=True)#! This was deleted MOST(98%) of our data
arrayOfArrays = df.to_numpy(copy=True)
binarise_array = [MoreLessMean]#! MoreLessMean,LinearDetrendDiff,convertToBinary
# ,AboveLinearDetrend,MoreLessHalf,MoreLessMedian
# ,AboveLinearDetrend,MoreLessHalf,MoreLessMedian,
# for func in binarise_array:#! Uncomment
# for x in [11]:#! Uncomment 5,10,11,15,20
# for y in [1]:#! Uncomment
# years_skipped = y#! Uncomment
# windowSize = x#! Uncomment
# #! remove labels and non key data: delete columns
# #! sliding window to get good strings
# test = get_good_string(arrayOfArrays)# Finds strings with all numbers #! Uncomment
# # print("Cleaned data:\n", test,"\n\n","Number of serier",len(test))
# # print(test[0],test)
# newDF = (pd.DataFrame(test))#! uncomment
# # print("converted list to a data frame\n",newDF,"\n\n")\
# skippedDataArray = newDF.iloc[:,::years_skipped]#! uncomment #Remove every other column to make data more coarse
# new = skippedDataArray.to_numpy()
# # print("convert the data frame to a numpy array\n",test,"\n\n",type(test))
# # goodStringsArray = get_good_string(test)
# # print("Got the good strings out of the array\n",goodStringsArray,"\n\n",goodStringsArray[0] ,"Number of series: ", len(goodStringsArray))
# #! I think we should remove the arrays of all zeros too because they are skewing the data
# # stringArray = np.array(testResult)
# # print(stringArray.dtype.type is str)
# # print(get_good_string(testResult))#This shows that it is now checking for strings too
# #! convert to binary
# testResult = func(new)
# # print("getting all the binary strings from the np array\n","\nNumber of time series: ",len(testResult))
# # print(testResult)
# # Make list of strings of binary
# listOfbinaryStrings = join_list_into_string(testResult)
# # print("\nNumber of series: ",len(listOfbinaryStrings))
# #! Get frequencies and complexities
# frequencies = get_frequencies(listOfbinaryStrings)
# # print(frequencies)
# #! Sort frequencies
# sortedDict = dict(sorted(frequencies.items(),key=lambda item: item[1]))
# sortedDict = list(sortedDict.items())
# # print(sortedDict)#Mutated
# # string freq complx
# #sortedDict mutated=[... ('101111111111111', 48, 13.7), ('111111111111111', 895, 3.9)]
# # print("Length of the sorted Dictionary" ,len(sortedDict))
# # ! get complexities
# complexities = get_complexities(sortedDict)#[53.1,41.1,35.1,...]#! uncomment
# list_of_all_series_complexities = K_scaled_all_series(sortedDict)
# # ! sort object of frequencies
# frequency_list = get_frequencies_list(sortedDict)#[...1,...59, 62, 288, 1370]#! uncomment
# length_of_series = sum(frequency_list)
# # print( "Number of Series:", length_of_series, "| Character Size:",windowSize,"| Step is:",years_skipped)
# #! 5. make plots and histogram
# # make_plot(frequency_list,complexities,func,False)#! uncomment
# make_plot(frequency_list,complexities,func,True)#! uncomment
# make_histogram(list_of_all_series_complexities)#! uncomment
# # # print(list_of_all_series_complexities)
# # ! 6. Save results in a text file
# final_string_log10freq_complexity = get_strings_of_tuples(sortedDict)#! uncomment
# #000000000000000 1632 3.9\n000000000000000 1632 3.9...
# # print(final_string_log10freq_complexity)
# with open("{3}{0}Chars_{1}_Pattern_Freq_Complexity_{2}.txt".format(windowSize,func.__name__,current_date,save_path), 'w') as f: #! uncomment
# f.write(final_string_log10freq_complexity)#! uncomment
# with open("{4}Prediction_Success_Rate_{0} Chars_{1}_{3}_{2}.txt".format(windowSize,func.__name__,current_date,data_short_name,save_path), 'w') as f: #! uncomment
# f.write("".join(map(str,prediction_success_rate_list)))#! uncomment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment