import random import keras_tuner import numpy as np import pandas as pd import shutil from keras import Input from keras.src.losses import SparseCategoricalCrossentropy from keras.src.metrics import F1Score, Precision, Recall, Accuracy, SparseCategoricalAccuracy from pandas import ExcelWriter, DataFrame from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional,GRU from tensorflow.keras.optimizers import Adam from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping from keras_tuner import RandomSearch from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score epochs = 5#50 model_type_gru = 'GRU' model_type_lstm = 'LSTM' model_type_bilstm = 'BiLSTM' # === Display functions === def display_warning_about_2020_data(): print("\n⚠️ Warning: 2020 data after February is excluded due to COVID-19.") print("✅ Only Jan and Feb 2020 are used for testing. Do not use them in training/validation.") def display_warnings_for_scenarios(scenario_type, predefined_training_scenarios, predefined_validation_scenarios): if scenario_type == "training": print("\n⚠️ Predefined Training Scenarios (for reference only):") for name, scenario in predefined_training_scenarios.items(): parts = [f"{year}-{months}" for year, months in scenario['years_months']] print(f" {name}: {', '.join(parts)}") elif scenario_type == "validation": print("\n⚠️ Predefined Validation Scenario:") for name, scenario in predefined_validation_scenarios.items(): parts = [f"{year}-{months}" for year, months in scenario['years_months']] print(f" {name}: {', '.join(parts)}") # === Data functions === def load_dataset(file_path): return pd.read_excel(file_path) def filter_data(df, scenario, ALLUSERS32_15MIN_WITHOUTREHOLD): filtered = pd.DataFrame() for year, months in scenario: filtered = pd.concat([filtered, df[(df['Year'] == year) & (df['Month'].isin(months))]]) if ALLUSERS32_15MIN_WITHOUTREHOLD: return filtered.drop(columns=['Month', 'Year', 'date', 'DayOfWeek']) else: return filtered.drop(columns=['Month', 'Year', 'date']) def filter_test_data(df, scenario): data_parts = [] for year, months in scenario: part = df[(df['Year'] == year) & (df['Month'].isin(months))] data_parts.append(part) return pd.concat(data_parts, ignore_index=True) def prepare_user_data(df): #df_sorted = df.sort_values(by='user').reset_index(drop=True) users = df['user'].unique() return {user: df[df['user'] == user] for user in users} def make_sequences(data, sequence_length): x, y = [], [] features = data.drop('user', axis=1).values features = features.astype(int) labels = data['user'].values for i in range(len(features) - sequence_length): x.append(features[i:i + sequence_length]) y.append(labels[i + sequence_length]) return x, y def prepare_data_for_model(user_data, sequence_length): x, y = [], [] for user, data in user_data.items(): x_new, y_new = make_sequences(data, sequence_length) x = x + x_new y = y + y_new random.Random(17).shuffle(x) random.Random(17).shuffle(y) x = np.array(x) y = np.array(y) return x,y # === Training & Validation === def train_models(user_data, user_data_val, sequence_lengths, tuner_dir="./working/tuner", model_type=model_type_lstm): best_models = {} early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True) lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, verbose=1) users = list(user_data.keys()) shutil.rmtree(tuner_dir, ignore_errors=True) for sequence_length in sequence_lengths: print(f"\n=== Training for Sequence Length: {sequence_length} ===") X, y = prepare_data_for_model(user_data=user_data, sequence_length=sequence_length) X_val, y_val = prepare_data_for_model(user_data=user_data_val, sequence_length=sequence_length) if X.shape[0] == 0 or X_val.shape[0] == 0: print(f"⚠️ Skipped sequence length {sequence_length} due to insufficient data.") continue n_features = X.shape[2] def build_model(hp): model = Sequential() if model_type==model_type_bilstm: model.add(Bidirectional(LSTM(units=hp.Int('units', 32, 256, step=2), input_shape=(sequence_length, n_features)))) if model_type==model_type_lstm: model.add(LSTM(units=hp.Int('units', 32, 256, step=2), input_shape=(sequence_length, n_features))) if model_type==model_type_gru: model.add(GRU(units=hp.Int('units', 32, 256, step=2), input_shape=(sequence_length, n_features))) model.add(Dropout(hp.Float('dropout_rate', 0.1, 0.5, step=0.1))) model.add(Dense(len(users), activation='softmax')) model.compile( optimizer=Adam(learning_rate=hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])), loss='sparse_categorical_crossentropy', metrics=['accuracy'] ) return model tuner = RandomSearch( build_model, objective='val_loss', max_trials=30, executions_per_trial=2, directory=tuner_dir, project_name=f'lstm_seq_{sequence_length}' ) tuner.search(X, y, epochs=epochs, validation_data=(X_val, y_val), callbacks=[early_stopping, lr_scheduler], verbose=0) best_hps = tuner.get_best_hyperparameters(1)[0] best_model = tuner.hypermodel.build(best_hps) best_model.fit(X, y, epochs=epochs, validation_data=(X_val, y_val), callbacks=[early_stopping, lr_scheduler], verbose=0) best_models[sequence_length] = { 'model': best_model, 'best_hyperparameters': { 'units': best_hps.get('units'), 'dropout_rate': best_hps.get('dropout_rate'), 'learning_rate': best_hps.get('learning_rate') } } return best_models # === Training & Validation === def train_models_v2(user_data, user_data_val, sequence_length, model_type): tuner_dir = "./working/tuner/"+model_type #val_metric = 'val_f1' val_metric = 'val_precision' early_stopping = EarlyStopping(monitor=val_metric, patience=3, restore_best_weights=True) lr_scheduler = ReduceLROnPlateau(monitor=val_metric, factor=0.5, patience=2) shutil.rmtree(tuner_dir, ignore_errors=True) x, y = prepare_data_for_model(user_data=user_data, sequence_length=sequence_length) x_val, y_val = prepare_data_for_model(user_data=user_data_val, sequence_length=sequence_length) n_features = x.shape[2] users = list(user_data.keys()) #y_val = np.array(y_val).reshape(-1, 1) #y = np.array(y).reshape(-1, 1) def build_model(hp): units_hp = hp.Int('units', 2, 8, step=2, sampling="log") # units_hp = hp.Int('units', 2, 256, step=2, sampling="log") model = Sequential() model.add(Input((sequence_length, n_features))) if model_type==model_type_bilstm: model.add(Bidirectional(LSTM(units=units_hp))) if model_type==model_type_lstm: model.add(LSTM(units=units_hp)) if model_type==model_type_gru: model.add(GRU(units=units_hp)) model.add(Dropout(hp.Float('dropout_rate', 0.1, 0.2, step=0.1))) model.add(Dense(len(users), activation='softmax')) model.compile( optimizer=Adam(learning_rate=hp.Choice('learning_rate', [1e-5])), loss='sparse_categorical_crossentropy', metrics=[#F1Score(name='f1', average='weighted'), Precision(), #Recall(), Accuracy() ] ) return model tuner = RandomSearch( build_model, objective=keras_tuner.Objective(val_metric, direction="max"), max_trials=120, directory=tuner_dir, ) tuner.search(x, y, epochs=epochs, validation_data=(x_val, y_val), callbacks=[early_stopping, lr_scheduler]) return tuner.get_best_models(num_models=1)[0] def train_one_model(train_data, val_data, n_batch, n_epochs, n_neurons, l_rate, sequence_length, model_type): x, y = prepare_data_for_model(user_data=train_data, sequence_length=sequence_length) n_features = x.shape[2] users = list(train_data.keys()) # prepare model def build_model(): model = Sequential() model.add(Input(shape=(sequence_length, n_features), batch_size=n_batch)) # if model_type == model_type_bilstm: # model.add(Bidirectional(units=units_hp)) if model_type == model_type_lstm: model.add(LSTM(n_neurons)) # if model_type == model_type_gru: # model.add(GRU(units=units_hp)) # TODO: add another dense layer #model.add(Dense(256, activation='relu')) # model.add(Dropout(hp.Float('dropout_rate', 0.1, 0.2, step=0.1))) model.add(Dense(len(users), activation='softmax')) model.compile( optimizer=Adam(learning_rate=l_rate), loss=SparseCategoricalCrossentropy(), metrics=[SparseCategoricalAccuracy()], ) return model model = build_model() # fit model train_acc, test_acc, train_p, test_p, train_r, test_r, train_f1, test_f1 = list(), list(),list(), list(),list(), list(),list(), list() for i in range(n_epochs): model.fit(x, y, batch_size=n_batch, epochs=1, verbose=0, shuffle=False) # evaluate model on train data acc, p, r, f1 = evaluate(model, train_data, sequence_length, n_batch) train_acc.append(acc) train_p.append(p) train_r.append(r) train_f1.append(f1) # evaluate model on test data acc, p, r, f1 = evaluate(model, val_data, sequence_length, n_batch) test_acc.append(acc) test_p.append(p) test_r.append(r) test_f1.append(f1) history = DataFrame() history['train_acc'], history['test_acc'] = train_acc, test_acc history['train_p'], history['test_p'] = train_p, test_p history['train_r'], history['test_r'] = train_r, test_r history['train_f1'], history['test_f1'] = train_f1, test_f1 return history def evaluate(model, df, sequence_length, batch_size): x, y = prepare_data_for_model(user_data=df, sequence_length=sequence_length) x = np.array(x) y_true = np.array(y) y_pred = model.predict(x, verbose=0, batch_size=batch_size) y_pred_classes = np.argmax(y_pred, axis=1) return eval_metrics(y_true=y_true, y_pred=y_pred_classes) def eval_metrics(y_true, y_pred): f1 = f1_score(y_true=y_true, y_pred=y_pred, average='weighted') p = precision_score(y_true=y_true, y_pred=y_pred, average='weighted') r = recall_score(y_true=y_true, y_pred=y_pred, average='weighted') acc = accuracy_score(y_true=y_true, y_pred=y_pred) return acc, p, r, f1 # === Evaluation === def evaluate_models(best_models, df_test, sequence_lengths, output_excel_path, ALLUSERS32_15MIN_WITHOUTTHREHOLD): print("\n🧪 Evaluating on Test Data...") with ExcelWriter(output_excel_path) as writer: for sequence_length in sequence_lengths: if sequence_length not in best_models: continue evaluate_model_on_test_data(best_models[sequence_length]['model'], df_test.copy(), sequence_length, writer, ALLUSERS32_15MIN_WITHOUTTHREHOLD) def evaluate_model_on_test_data(model, test_df, sequence_length, excel_writer, ALLUSERS32_15MIN_WITHOUTTHREHOLD): if(ALLUSERS32_15MIN_WITHOUTTHREHOLD): test_df = test_df.drop(columns=['Month', 'Year', 'date', 'DayOfWeek']) else: test_df = test_df.drop(columns=['Month', 'Year', 'date']) test_df = test_df.sort_values(by='user').reset_index(drop=True) users = test_df['user'].unique() results = [] accuracy_above_50 = 0 for user in users: user_df = test_df[test_df['user'] == user] X, y_true = [], [] user_features = user_df.drop(columns=['user']).values user_labels = user_df['user'].values if len(user_df) <= sequence_length: print(f"Skipping User {user} (not enough data for sequence length {sequence_length})") continue for i in range(len(user_df) - sequence_length): seq_x = user_features[i:i + sequence_length] seq_y = user_labels[i + sequence_length] X.append(seq_x) y_true.append(seq_y) X = np.array(X) y_true = np.array(y_true) if len(X) == 0: continue y_pred = model.predict(X, verbose=0) y_pred_classes = np.argmax(y_pred, axis=1) # counts which class was predicted how often unique_pred, counts_pred = np.unique(y_pred_classes, return_counts=True) label_counts_pred = dict(zip(unique_pred, counts_pred)) # counts which class should have been predicted how often (only one class for the user) unique_true, counts_true = np.unique(y_true, return_counts=True) label_counts_true = dict(zip(unique_true, counts_true)) # the fraction of correctly classified samples acc = accuracy_score(y_true, y_pred_classes) if acc > 0.5: accuracy_above_50 += 1 results.append({ 'User': user, 'Accuracy (%)': acc * 100, 'Predicted Class Distribution': str(label_counts_pred), 'Actual Class Distribution': str(label_counts_true) }) print(f"\n=== User {user} ===") print(f"✅ Accuracy: {acc * 100:.2f}%") print("📊 Predicted Class Distribution:", label_counts_pred) print("📌 Actual Class Distribution: ", label_counts_true) final_accuracy_percent = (accuracy_above_50 / 32) * 100 print(f"\n🟩 Final Evaluation Summary for Sequence Length {sequence_length}:") print(f"Users with >50% Accuracy: {accuracy_above_50} / 32") print(f"✅ Final Success Rate: {final_accuracy_percent:.2f}%") results.append({ 'User': 'TOTAL', 'Accuracy (%)': '', 'Predicted Class Distribution': f'Users >50% Acc: {accuracy_above_50}/32', 'Actual Class Distribution': f'Success Rate: {final_accuracy_percent:.2f}%' }) df_results = pd.DataFrame(results) df_results.to_excel(excel_writer, sheet_name=f"SeqLen_{sequence_length}", index=False)