import numpy as np import pandas as pd import shutil import os from pandas import ExcelWriter import keras_tuner as kt from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping from keras_tuner import RandomSearch from sklearn.metrics import accuracy_score # === Display functions === def display_warning_about_2020_data(): print("\n⚠️ Warning: 2020 data after February is excluded due to COVID-19.") print("✅ Only Jan and Feb 2020 are used for testing. Do not use them in training/validation.") def display_warnings_for_scenarios(scenario_type, predefined_training_scenarios, predefined_validation_scenarios): if scenario_type == "training": print("\n⚠️ Predefined Training Scenarios (for reference only):") for name, scenario in predefined_training_scenarios.items(): parts = [f"{year}-{months}" for year, months in scenario['years_months']] print(f" {name}: {', '.join(parts)}") elif scenario_type == "validation": print("\n⚠️ Predefined Validation Scenario:") for name, scenario in predefined_validation_scenarios.items(): parts = [f"{year}-{months}" for year, months in scenario['years_months']] print(f" {name}: {', '.join(parts)}") # === Data functions === def load_dataset(file_path): return pd.read_excel(file_path) def filter_data(df, scenario, ALLUSERS32_15MIN_WITHOUTREHOLD): filtered = pd.DataFrame() for year, months in scenario: filtered = pd.concat([filtered, df[(df['Year'] == year) & (df['Month'].isin(months))]]) if ALLUSERS32_15MIN_WITHOUTREHOLD: return filtered.drop(columns=['Month', 'Year', 'date', 'DayOfWeek']) else: return filtered.drop(columns=['Month', 'Year', 'date']) def filter_test_data(df, scenario): data_parts = [] for year, months in scenario: part = df[(df['Year'] == year) & (df['Month'].isin(months))] data_parts.append(part) return pd.concat(data_parts, ignore_index=True) def prepare_user_data(df): df_sorted = df.sort_values(by='user').reset_index(drop=True) users = df_sorted['user'].unique() return {user: df_sorted[df_sorted['user'] == user] for user in users} def prepare_data_for_model(user_data, sequence_length): X, y = [], [] for user, data in user_data.items(): features = data.drop('user', axis=1).values labels = data['user'].values for i in range(len(features) - sequence_length): X.append(features[i:i + sequence_length]) y.append(labels[i + sequence_length]) X = np.array(X) y = np.array(y) return X,y # === Training & Validation === def train_models(user_data, user_data_val, sequence_lengths=[20], tuner_dir="./working/tuner"): best_models = {} early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True) lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, verbose=1) users = list(user_data.keys()) shutil.rmtree(tuner_dir, ignore_errors=True) for sequence_length in sequence_lengths: print(f"\n=== Training for Sequence Length: {sequence_length} ===") X, y = prepare_data_for_model(user_data=user_data, sequence_length=sequence_length) X_val, y_val = prepare_data_for_model(user_data=user_data_val, sequence_length=sequence_length) if X.shape[0] == 0 or X_val.shape[0] == 0: print(f"⚠️ Skipped sequence length {sequence_length} due to insufficient data.") continue n_features = X.shape[2] def build_model(hp): model = Sequential() model.add(Bidirectional(LSTM(units=hp.Int('units', 32, 256, step=2), input_shape=(sequence_length, n_features)))) model.add(Dropout(hp.Float('dropout_rate', 0.1, 0.5, step=0.1))) model.add(Dense(len(users), activation='softmax')) model.compile( optimizer=Adam(learning_rate=hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])), loss='sparse_categorical_crossentropy', metrics=['accuracy'] ) return model tuner = RandomSearch( build_model, objective='val_loss', max_trials=30, executions_per_trial=2, directory=tuner_dir, project_name=f'lstm_seq_{sequence_length}' ) tuner.search(X, y, epochs=30, validation_data=(X_val, y_val), callbacks=[early_stopping, lr_scheduler], verbose=1) best_hps = tuner.get_best_hyperparameters(1)[0] best_model = tuner.hypermodel.build(best_hps) best_model.fit(X, y, epochs=30, validation_data=(X_val, y_val), callbacks=[early_stopping, lr_scheduler], verbose=0) best_models[sequence_length] = { 'model': best_model, 'best_hyperparameters': { 'units': best_hps.get('units'), 'dropout_rate': best_hps.get('dropout_rate'), 'learning_rate': best_hps.get('learning_rate') } } return best_models # === Evaluation === def evaluate_models(best_models, df_test, sequence_lengths, output_excel_path, ALLUSERS32_15MIN_WITHOUTTHREHOLD): print("\n🧪 Evaluating on Test Data...") with ExcelWriter(output_excel_path) as writer: for sequence_length in sequence_lengths: if sequence_length not in best_models: continue evaluate_model_on_test_data(best_models[sequence_length]['model'], df_test.copy(), sequence_length, writer, ALLUSERS32_15MIN_WITHOUTTHREHOLD) def evaluate_model_on_test_data(model, test_df, sequence_length, excel_writer, ALLUSERS32_15MIN_WITHOUTTHREHOLD): if(ALLUSERS32_15MIN_WITHOUTTHREHOLD): test_df = test_df.drop(columns=['Month', 'Year', 'date', 'DayOfWeek']) else: test_df = test_df.drop(columns=['Month', 'Year', 'date']) test_df = test_df.sort_values(by='user').reset_index(drop=True) users = test_df['user'].unique() results = [] accuracy_above_50 = 0 for user in users: user_df = test_df[test_df['user'] == user] X, y_true = [], [] user_features = user_df.drop(columns=['user']).values user_labels = user_df['user'].values if len(user_df) <= sequence_length: print(f"Skipping User {user} (not enough data for sequence length {sequence_length})") continue for i in range(len(user_df) - sequence_length): seq_x = user_features[i:i + sequence_length] seq_y = user_labels[i + sequence_length] X.append(seq_x) y_true.append(seq_y) X = np.array(X) y_true = np.array(y_true) if len(X) == 0: continue y_pred = model.predict(X, verbose=0) y_pred_classes = np.argmax(y_pred, axis=1) unique_pred, counts_pred = np.unique(y_pred_classes, return_counts=True) label_counts_pred = dict(zip(unique_pred, counts_pred)) unique_true, counts_true = np.unique(y_true, return_counts=True) label_counts_true = dict(zip(unique_true, counts_true)) acc = accuracy_score(y_true, y_pred_classes) if acc > 0.5: accuracy_above_50 += 1 results.append({ 'User': user, 'Accuracy (%)': acc * 100, 'Predicted Class Distribution': str(label_counts_pred), 'Actual Class Distribution': str(label_counts_true) }) print(f"\n=== User {user} ===") print(f"✅ Accuracy: {acc * 100:.2f}%") print("📊 Predicted Class Distribution:", label_counts_pred) print("📌 Actual Class Distribution: ", label_counts_true) final_accuracy_percent = (accuracy_above_50 / 32) * 100 print(f"\n🟩 Final Evaluation Summary for Sequence Length {sequence_length}:") print(f"Users with >50% Accuracy: {accuracy_above_50} / 32") print(f"✅ Final Success Rate: {final_accuracy_percent:.2f}%") results.append({ 'User': 'TOTAL', 'Accuracy (%)': '', 'Predicted Class Distribution': f'Users >50% Acc: {accuracy_above_50}/32', 'Actual Class Distribution': f'Success Rate: {final_accuracy_percent:.2f}%' }) df_results = pd.DataFrame(results) df_results.to_excel(excel_writer, sheet_name=f"SeqLen_{sequence_length}", index=False)