Skip to content

Instantly share code, notes, and snippets.

@reefwing
Last active June 30, 2024 02:08
Show Gist options
  • Save reefwing/160b467a8a0e69809cb97f5a2bfefe22 to your computer and use it in GitHub Desktop.
Save reefwing/160b467a8a0e69809cb97f5a2bfefe22 to your computer and use it in GitHub Desktop.
Downloads and re-samples the LGHG2@n10C_to_25degC battery data
# Copyright (c) 2024 David Such
#
# This software is released under the MIT License.
# https://opensource.org/licenses/MIT
import os
import requests
import zipfile
import scipy.io
import pandas as pd
import numpy as np
# URL of the file to download
url = "https://data.mendeley.com/public-files/datasets/cp3473x7xv/files/ad7ac5c9-2b9e-458a-a91f-6f3da449bdfb/file_downloaded"
# Output folder contains the extracted ZIP files
output_folder = os.path.expanduser("~/Documents/GitHub/Embedded-AI/data/LGHG2@n10C_to_25degC")
os.makedirs(output_folder, exist_ok=True)
# Download and extract the data set
train_folder = os.path.join(output_folder, "Train")
test_folder = os.path.join(output_folder, "Test")
preprocessed_folder = os.path.join(output_folder, 'Preprocessed')
os.makedirs(preprocessed_folder, exist_ok=True)
if not os.path.exists(train_folder) or not os.path.exists(test_folder):
print("Downloading LGHG2@n10C_to_25degC.zip (56 MB) ... ")
download_folder = os.path.dirname(output_folder)
filename = os.path.join(download_folder, "LGHG2@n10C_to_25degC.zip")
response = requests.get(url)
with open(filename, 'wb') as file:
file.write(response.content)
with zipfile.ZipFile(filename, 'r') as zip_ref:
zip_ref.extractall(output_folder)
# Define helper function to read .mat files
def read_mat_files(folder):
data = []
for filename in os.listdir(folder):
if filename.endswith(".mat"):
filepath = os.path.join(folder, filename)
mat_data = scipy.io.loadmat(filepath)
data.append(mat_data)
return data
# Create a file datastore for both the training data and the test data
fds_train = read_mat_files(train_folder)
fds_test = read_mat_files(test_folder)
# Read all data in the datastores
train_data_full = fds_train[0]
test_data_full_n10deg = fds_test[0]
test_data_full_0deg = fds_test[1]
test_data_full_10deg = fds_test[2]
test_data_full_25deg = fds_test[3]
# Print the shapes of the data arrays to understand their structure
print("Shape of train_data_full['X']: ", train_data_full['X'].shape)
print("Shape of train_data_full['Y']: ", train_data_full['Y'].shape)
print("Shape of test_data_full_n10deg['X']: ", test_data_full_n10deg['X'].shape)
print("Shape of test_data_full_n10deg['Y']: ", test_data_full_n10deg['Y'].shape)
# Extract X and Y from train_data_full
X_train = train_data_full['X']
Y_train = train_data_full['Y']
# Define the index ranges
idx0 = slice(0, 184257)
idx10 = slice(184257, 337973)
idx25 = slice(337973, 510530)
idxN10 = slice(510530, 669956)
# Extract data segments
X_idx0 = X_train[:, idx0]
Y_idx0 = Y_train[:, idx0]
X_idx10 = X_train[:, idx10]
Y_idx10 = Y_train[:, idx10]
X_idx25 = X_train[:, idx25]
Y_idx25 = Y_train[:, idx25]
X_idxN10 = X_train[:, idxN10]
Y_idxN10 = Y_train[:, idxN10]
# Print shapes to verify extraction
print(f'X_idx0 shape: {X_idx0.shape}, Y_idx0 shape: {Y_idx0.shape}')
print(f'X_idx10 shape: {X_idx10.shape}, Y_idx10 shape: {Y_idx10.shape}')
print(f'X_idx25 shape: {X_idx25.shape}, Y_idx25 shape: {Y_idx25.shape}')
print(f'X_idxN10 shape: {X_idxN10.shape}, Y_idxN10 shape: {Y_idxN10.shape}')
# Resample and compute new moving averages
def resample_and_compute_moving_averages(X, Y, step=100):
# Resample the data (take every `step`-th point)
X_resampled = X[:, ::step]
Y_resampled = Y[:, ::step]
# Compute new moving averages
n = X_resampled.shape[1]
avg_voltage_idx = 3 # The 4th row (index 3) is average voltage
avg_current_idx = 4 # The 5th row (index 4) is average current
new_avg_voltage = np.empty(n)
new_avg_current = np.empty(n)
for i in range(n):
new_avg_voltage[i] = np.mean(X_resampled[0, max(0, i-5):i+1])
new_avg_current[i] = np.mean(X_resampled[1, max(0, i-5):i+1])
X_resampled[avg_voltage_idx, :n] = new_avg_voltage
X_resampled[avg_current_idx, :n] = new_avg_current
return X_resampled, Y_resampled
# Resample and compute new moving averages for training data
X_train_resampled, Y_train_resampled = resample_and_compute_moving_averages(X_train, Y_train)
# Create DataFrame and save to CSV
train_df = pd.DataFrame(np.vstack((X_train_resampled, Y_train_resampled)).T,
columns=['Voltage', 'Current', 'Temperature', 'Average Voltage', 'Average Current', 'SOC'])
train_df.to_csv(os.path.join(preprocessed_folder, 'resampled_training_data.csv'), index=False)
# Extract and resample test data
test_data_files = ['n10degC', '0degC', '10degC', '25degC']
resampled_test_data_shapes = {}
for i, test_data_full in enumerate(fds_test):
X_test = test_data_full['X']
Y_test = test_data_full['Y']
X_test_resampled, Y_test_resampled = resample_and_compute_moving_averages(X_test, Y_test)
test_df = pd.DataFrame(np.vstack((X_test_resampled, Y_test_resampled)).T,
columns=['Voltage', 'Current', 'Temperature', 'Average Voltage', 'Average Current', 'SOC'])
test_df.to_csv(os.path.join(preprocessed_folder, f'resampled_test_data_{test_data_files[i]}.csv'), index=False)
resampled_test_data_shapes[test_data_files[i]] = (X_test_resampled.shape, Y_test_resampled.shape)
# Print shapes to verify resampling
print(f'Training data shape after resampling: X={X_train_resampled.shape}, Y={Y_train_resampled.shape}')
for test_file, shapes in resampled_test_data_shapes.items():
print(f'{test_file} test data shape after resampling: X={shapes[0]}, Y={shapes[1]}')
# Combine X and Y into a single DataFrame
data_resampled = np.vstack((X_train_resampled, Y_train_resampled))
df_resampled = pd.DataFrame(data_resampled.T, columns=['Voltage', 'Current', 'Temperature', 'Average Voltage', 'Average Current', 'SOC'])
# Display the first 8 rows
print(df_resampled.head(8).to_string(index=False))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment