Browse Source

Created a file with the non jupyter version of the code.

Moved all imports to the top as well as all function definitions.
Changed the paths to work with relative paths within the repo.
Otherwise the code is unchanged.
master
Robert Rabbe 2 days ago
parent
commit
bf73e4b8b8
  1. BIN
      Datasets/ALLUSERS32_15MIN_WITHOUTTHREHOLD.xlsx
  2. 0
      main.py
  3. 296
      non_jupyter_version.py

BIN
Datasets/ALLUSERS32_15MIN_WITHOUTTHREHOLD.xlsx

0
main.py

296
non_jupyter_version.py

@ -0,0 +1,296 @@
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from pandas import ExcelWriter
import shutil
import os
import keras_tuner as kt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout,GRU,Bidirectional
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from keras_tuner import RandomSearch
from sklearn.metrics import accuracy_score
# === Clean previous tuning directory ===
shutil.rmtree("./working/tuner", ignore_errors=True)
# === Load dataset ===
file_path = './Datasets/ALLUSERS32_15MIN_WITHOUTTHREHOLD.xlsx'
df = pd.read_excel(file_path)
# === Helper functions for scenario selection ===
def get_user_input_for_scenario(scenario_type):
print(f"\nPlease define your custom {scenario_type} scenario:")
years_input = input(f"Enter {scenario_type} years (comma-separated, e.g., 2017,2018): ").strip()
years = list(map(int, years_input.split(',')))
years_months = []
for year in years:
months_input = input(f"Enter months for year {year} (comma-separated, e.g., 1,2,3): ").strip()
months = list(map(int, months_input.split(',')))
years_months.append((year, months))
return years_months
def display_warning_about_2020_data():
print("\n⚠️ Warning: 2020 data after February is excluded due to COVID-19.")
print("✅ Only Jan and Feb 2020 are used for testing. Do not use them in training/validation.")
def display_warnings_for_scenarios(scenario_type):
if scenario_type == "training":
print("\n⚠️ Predefined Training Scenarios (for reference only):")
for name, scenario in predefined_training_scenarios.items():
parts = [f"{year}-{months}" for year, months in scenario['years_months']]
print(f" {name}: {', '.join(parts)}")
elif scenario_type == "validation":
print("\n⚠️ Predefined Validation Scenario:")
for name, scenario in predefined_validation_scenarios.items():
parts = [f"{year}-{months}" for year, months in scenario['years_months']]
print(f" {name}: {', '.join(parts)}")
print(" - This uses Oct, Nov, Dec of 2019")
predefined_training_scenarios = {
"Scenario 1": {"years_months": [(2018, list(range(1, 13))), (2019, list(range(1, 10)))]},
"Scenario 2": {"years_months": [(2017, list(range(1, 13))), (2018, list(range(1, 13))), (2019, list(range(1, 10)))]}
}
predefined_validation_scenarios = {
"Scenario A": {"years_months": [(2019, [10, 11, 12])]}
}
# === Filter and preprocess data ===
def filter_data(df, scenario):
filtered = pd.DataFrame()
for year, months in scenario:
filtered = pd.concat([filtered, df[(df['Year'] == year) & (df['Month'].isin(months))]])
return filtered.drop(columns=['Month', 'Year', 'date', 'DayOfWeek'])
# === Get test scenario input ===
def get_user_input_for_test():
print("\n=== Testing Scenario Setup ===")
print("⚠️ Only January and February of 2020 were used for testing in predefined setup.")
print("⚠️ Avoid using 2020 data after February due to COVID-19 impact.\n")
years_input = input("Enter test years (comma-separated, e.g., 2020): ").strip()
years = list(map(int, years_input.split(',')))
years_months = []
for year in years:
months_input = input(f"Enter months for year {year} (comma-separated, e.g., 1,2): ").strip()
months = list(map(int, months_input.split(',')))
years_months.append((year, months))
return years_months
def filter_test_data(df, scenario):
data_parts = []
for year, months in scenario:
part = df[(df['Year'] == year) & (df['Month'].isin(months))]
data_parts.append(part)
return pd.concat(data_parts, ignore_index=True)
def evaluate_model_on_test_data(model, test_df, sequence_length, excel_writer):
print("\n🧪 Evaluating on Test Data...")
test_df = test_df.drop(columns=['Month', 'Year', 'date', 'DayOfWeek'])
test_df = test_df.sort_values(by='user').reset_index(drop=True)
users = test_df['user'].unique()
results = []
accuracy_above_50 = 0
for user in users:
user_df = test_df[test_df['user'] == user]
X, y_true = [], []
user_features = user_df.drop(columns=['user']).values
user_labels = user_df['user'].values
if len(user_df) <= sequence_length:
print(f"Skipping User {user} (not enough data for sequence length {sequence_length})")
continue
for i in range(len(user_df) - sequence_length):
seq_x = user_features[i:i + sequence_length]
seq_y = user_labels[i + sequence_length]
X.append(seq_x)
y_true.append(seq_y)
X = np.array(X)
y_true = np.array(y_true)
if len(X) == 0:
continue
y_pred = model.predict(X, verbose=0)
y_pred_classes = np.argmax(y_pred, axis=1)
unique_pred, counts_pred = np.unique(y_pred_classes, return_counts=True)
label_counts_pred = dict(zip(unique_pred, counts_pred))
unique_true, counts_true = np.unique(y_true, return_counts=True)
label_counts_true = dict(zip(unique_true, counts_true))
acc = accuracy_score(y_true, y_pred_classes)
if acc > 0.5:
accuracy_above_50 += 1
# Append result to list
results.append({
'User': user,
'Accuracy (%)': acc * 100,
'Predicted Class Distribution': str(label_counts_pred),
'Actual Class Distribution': str(label_counts_true)
})
print(f"\n=== User {user} ===")
print(f"✅ Accuracy: {acc * 100:.2f}%")
print("📊 Predicted Class Distribution:", label_counts_pred)
print("📌 Actual Class Distribution: ", label_counts_true)
final_accuracy_percent = (accuracy_above_50 / 32) * 100
print(f"\n🟩 Final Evaluation Summary for Sequence Length {sequence_length}:")
print(f"Users with >50% Accuracy: {accuracy_above_50} / 32")
print(f"✅ Final Success Rate: {final_accuracy_percent:.2f}%")
# Append overall stats as a new row
results.append({
'User': 'TOTAL',
'Accuracy (%)': '',
'Predicted Class Distribution': f'Users >50% Acc: {accuracy_above_50}/32',
'Actual Class Distribution': f'Success Rate: {final_accuracy_percent:.2f}%'
})
# Save results to Excel sheet
df_results = pd.DataFrame(results)
df_results.to_excel(excel_writer, sheet_name=f"SeqLen_{sequence_length}", index=False)
# === Get user-defined training and validation scenarios ===
print("=== Training Scenario Setup ===")
display_warning_about_2020_data()
display_warnings_for_scenarios("training")
training_scenario = get_user_input_for_scenario("training")
print("\n=== Validation Scenario Setup ===")
display_warning_about_2020_data()
display_warnings_for_scenarios("validation")
validation_scenario = get_user_input_for_scenario("validation")
data = filter_data(df, training_scenario)
data_val = filter_data(df, validation_scenario)
# === Organize by user ===
df_sorted = data.sort_values(by='user').reset_index(drop=True)
df_sorted_val = data_val.sort_values(by='user').reset_index(drop=True)
users = df_sorted['user'].unique()
users_val = df_sorted_val['user'].unique()
user_data = {user: df_sorted[df_sorted['user'] == user] for user in users}
user_data_val = {user: df_sorted_val[df_sorted_val['user'] == user] for user in users_val}
# === Callbacks ===
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, verbose=1)
# === Model tuning and training loop ===
best_models = {}
for sequence_length in range(20, 30, 5):
print(f"\n=== Training for Sequence Length: {sequence_length} ===")
# Training data
X, y = [], []
for user, data in user_data.items():
features = data.drop('user', axis=1).values
labels = data['user'].values
for i in range(len(features) - sequence_length):
X.append(features[i:i + sequence_length])
y.append(labels[i + sequence_length])
X = np.array(X)
y = np.array(y)
# Validation data
X_val, y_val = [], []
for user, data in user_data_val.items():
features = data.drop('user', axis=1).values
labels = data['user'].values
for i in range(len(features) - sequence_length):
X_val.append(features[i:i + sequence_length])
y_val.append(labels[i + sequence_length])
X_val = np.array(X_val)
y_val = np.array(y_val)
if X.shape[0] == 0 or X_val.shape[0] == 0:
print(f"⚠️ Skipped sequence length {sequence_length} due to insufficient data.")
continue
n_features = X.shape[2]
def build_model(hp):
model = Sequential()
model.add(Bidirectional(LSTM(units=hp.Int('units', 32, 256, step=2),
input_shape=(sequence_length, n_features))))
model.add(Dropout(hp.Float('dropout_rate', 0.1, 0.5, step=0.1)))
model.add(Dense(len(users), activation='softmax'))
model.compile(
optimizer=Adam(learning_rate=hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
tuner = RandomSearch(
build_model,
objective='val_loss',
max_trials=30,
executions_per_trial=2,
directory='./working/tuner',
project_name=f'lstm_seq_{sequence_length}'
)
tuner.search(X, y, epochs=30, validation_data=(X_val, y_val),
callbacks=[early_stopping, lr_scheduler], verbose=1)
best_hps = tuner.get_best_hyperparameters(1)[0]
best_model = tuner.hypermodel.build(best_hps)
best_model.fit(X, y, epochs=30, validation_data=(X_val, y_val),
callbacks=[early_stopping, lr_scheduler], verbose=0)
best_models[sequence_length] = {
'model': best_model,
'best_hyperparameters': {
'units': best_hps.get('units'),
'dropout_rate': best_hps.get('dropout_rate'),
'learning_rate': best_hps.get('learning_rate')
}
}
# === Run evaluation for each trained sequence length ===
test_scenario = get_user_input_for_test()
test_data = filter_test_data(df, test_scenario)
output_excel_path = "./working/evaluation_results.xlsx"
with ExcelWriter(output_excel_path) as writer:
for sequence_length, result in best_models.items():
print(f"\n🔍 Testing Model for Sequence Length: {sequence_length}")
evaluate_model_on_test_data(
result['model'],
test_data.copy(),
sequence_length,
writer # 👈 pass the writer
)
print(f"\n✅ All evaluations completed. Results saved to: {output_excel_path}")
Loading…
Cancel
Save