You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

217 lines
8.7 KiB

import numpy as np
import pandas as pd
import shutil
import os
from pandas import ExcelWriter
import keras_tuner as kt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from keras_tuner import RandomSearch
from sklearn.metrics import accuracy_score
# === Display functions ===
def display_warning_about_2020_data():
print("\n⚠️ Warning: 2020 data after February is excluded due to COVID-19.")
print("✅ Only Jan and Feb 2020 are used for testing. Do not use them in training/validation.")
def display_warnings_for_scenarios(scenario_type, predefined_training_scenarios, predefined_validation_scenarios):
if scenario_type == "training":
print("\n⚠️ Predefined Training Scenarios (for reference only):")
for name, scenario in predefined_training_scenarios.items():
parts = [f"{year}-{months}" for year, months in scenario['years_months']]
print(f" {name}: {', '.join(parts)}")
elif scenario_type == "validation":
print("\n⚠️ Predefined Validation Scenario:")
for name, scenario in predefined_validation_scenarios.items():
parts = [f"{year}-{months}" for year, months in scenario['years_months']]
print(f" {name}: {', '.join(parts)}")
# === Data functions ===
def load_dataset(file_path):
return pd.read_excel(file_path)
def filter_data(df, scenario, ALLUSERS32_15MIN_WITHOUTREHOLD):
filtered = pd.DataFrame()
for year, months in scenario:
filtered = pd.concat([filtered, df[(df['Year'] == year) & (df['Month'].isin(months))]])
if ALLUSERS32_15MIN_WITHOUTREHOLD:
return filtered.drop(columns=['Month', 'Year', 'date', 'DayOfWeek'])
else:
return filtered.drop(columns=['Month', 'Year', 'date'])
def filter_test_data(df, scenario):
data_parts = []
for year, months in scenario:
part = df[(df['Year'] == year) & (df['Month'].isin(months))]
data_parts.append(part)
return pd.concat(data_parts, ignore_index=True)
def prepare_user_data(df):
df_sorted = df.sort_values(by='user').reset_index(drop=True)
users = df_sorted['user'].unique()
return {user: df_sorted[df_sorted['user'] == user] for user in users}
# === Training & Validation ===
def train_models(user_data, user_data_val, sequence_lengths=[20], tuner_dir="./working/tuner"):
best_models = {}
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, verbose=1)
users = list(user_data.keys())
shutil.rmtree(tuner_dir, ignore_errors=True)
for sequence_length in sequence_lengths:
print(f"\n=== Training for Sequence Length: {sequence_length} ===")
X, y = [], []
for user, data in user_data.items():
features = data.drop('user', axis=1).values
labels = data['user'].values
for i in range(len(features) - sequence_length):
X.append(features[i:i + sequence_length])
y.append(labels[i + sequence_length])
X = np.array(X)
y = np.array(y)
X_val, y_val = [], []
for user, data in user_data_val.items():
features = data.drop('user', axis=1).values
labels = data['user'].values
for i in range(len(features) - sequence_length):
X_val.append(features[i:i + sequence_length])
y_val.append(labels[i + sequence_length])
X_val = np.array(X_val)
y_val = np.array(y_val)
if X.shape[0] == 0 or X_val.shape[0] == 0:
print(f"⚠️ Skipped sequence length {sequence_length} due to insufficient data.")
continue
n_features = X.shape[2]
def build_model(hp):
model = Sequential()
model.add(Bidirectional(LSTM(units=hp.Int('units', 32, 256, step=2),
input_shape=(sequence_length, n_features))))
model.add(Dropout(hp.Float('dropout_rate', 0.1, 0.5, step=0.1)))
model.add(Dense(len(users), activation='softmax'))
model.compile(
optimizer=Adam(learning_rate=hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])),
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
return model
tuner = RandomSearch(
build_model,
objective='val_loss',
max_trials=30,
executions_per_trial=2,
directory=tuner_dir,
project_name=f'lstm_seq_{sequence_length}'
)
tuner.search(X, y, epochs=30, validation_data=(X_val, y_val),
callbacks=[early_stopping, lr_scheduler], verbose=1)
best_hps = tuner.get_best_hyperparameters(1)[0]
best_model = tuner.hypermodel.build(best_hps)
best_model.fit(X, y, epochs=30, validation_data=(X_val, y_val),
callbacks=[early_stopping, lr_scheduler], verbose=0)
best_models[sequence_length] = {
'model': best_model,
'best_hyperparameters': {
'units': best_hps.get('units'),
'dropout_rate': best_hps.get('dropout_rate'),
'learning_rate': best_hps.get('learning_rate')
}
}
return best_models
# === Evaluation ===
def evaluate_models(best_models, df_test, sequence_lengths, output_excel_path, ALLUSERS32_15MIN_WITHOUTTHREHOLD):
print("\n🧪 Evaluating on Test Data...")
with ExcelWriter(output_excel_path) as writer:
for sequence_length in sequence_lengths:
if sequence_length not in best_models:
continue
evaluate_model_on_test_data(best_models[sequence_length]['model'], df_test.copy(),
sequence_length, writer, ALLUSERS32_15MIN_WITHOUTTHREHOLD)
def evaluate_model_on_test_data(model, test_df, sequence_length, excel_writer, ALLUSERS32_15MIN_WITHOUTTHREHOLD):
if(ALLUSERS32_15MIN_WITHOUTTHREHOLD):
test_df = test_df.drop(columns=['Month', 'Year', 'date', 'DayOfWeek'])
else:
test_df = test_df.drop(columns=['Month', 'Year', 'date'])
test_df = test_df.sort_values(by='user').reset_index(drop=True)
users = test_df['user'].unique()
results = []
accuracy_above_50 = 0
for user in users:
user_df = test_df[test_df['user'] == user]
X, y_true = [], []
user_features = user_df.drop(columns=['user']).values
user_labels = user_df['user'].values
if len(user_df) <= sequence_length:
print(f"Skipping User {user} (not enough data for sequence length {sequence_length})")
continue
for i in range(len(user_df) - sequence_length):
seq_x = user_features[i:i + sequence_length]
seq_y = user_labels[i + sequence_length]
X.append(seq_x)
y_true.append(seq_y)
X = np.array(X)
y_true = np.array(y_true)
if len(X) == 0:
continue
y_pred = model.predict(X, verbose=0)
y_pred_classes = np.argmax(y_pred, axis=1)
unique_pred, counts_pred = np.unique(y_pred_classes, return_counts=True)
label_counts_pred = dict(zip(unique_pred, counts_pred))
unique_true, counts_true = np.unique(y_true, return_counts=True)
label_counts_true = dict(zip(unique_true, counts_true))
acc = accuracy_score(y_true, y_pred_classes)
if acc > 0.5:
accuracy_above_50 += 1
results.append({
'User': user,
'Accuracy (%)': acc * 100,
'Predicted Class Distribution': str(label_counts_pred),
'Actual Class Distribution': str(label_counts_true)
})
print(f"\n=== User {user} ===")
print(f"✅ Accuracy: {acc * 100:.2f}%")
print("📊 Predicted Class Distribution:", label_counts_pred)
print("📌 Actual Class Distribution: ", label_counts_true)
final_accuracy_percent = (accuracy_above_50 / 32) * 100
print(f"\n🟩 Final Evaluation Summary for Sequence Length {sequence_length}:")
print(f"Users with >50% Accuracy: {accuracy_above_50} / 32")
print(f"✅ Final Success Rate: {final_accuracy_percent:.2f}%")
results.append({
'User': 'TOTAL',
'Accuracy (%)': '',
'Predicted Class Distribution': f'Users >50% Acc: {accuracy_above_50}/32',
'Actual Class Distribution': f'Success Rate: {final_accuracy_percent:.2f}%'
})
df_results = pd.DataFrame(results)
df_results.to_excel(excel_writer, sheet_name=f"SeqLen_{sequence_length}", index=False)