Source code for mmgp.postprocessing

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
#
import os
import pickle
import numpy as np
import yaml
from sklearn.metrics import r2_score
from tqdm import tqdm

from mmgp.utils import remove_file, reset_folder
from plaid.problem_definition import ProblemDefinition
from plaid.containers.dataset import Dataset


[docs] def compute_metrics(configuration: dict, problem: ProblemDefinition) -> dict: """Compute and print various metrics for a given ProblemDefinition instance and configuration settings. Args: configuration (dict): A dictionary containing various parameters and settings for the metric computation. It should include the following keys: - 'generated_data_folder' (str): A string representing the folder where the predicted data is stored. - 'regression' (dict): A dictionary containing regression-related parameters with the following keys: problem (ProblemDefinition): An instance of the ProblemDefinition class containing problem-specific information, including output scalars and fields. Caution: This function will load all the predicted data. Make sure it has been created and is located correctly. """ generated_data_folder = configuration['generated_data_folder'] verbose = configuration['verbose'] train_set_name = configuration["train_set"] test_set_name = configuration["test_set"] problem_split = {} problem_split[train_set_name] = problem.get_split(train_set_name) problem_split[test_set_name] = problem.get_split(test_set_name) out_scalars_names = problem.out_scalars_names out_fields_names = problem.out_fields_names remove_file(generated_data_folder+os.sep+"metrics.yaml") file = open(generated_data_folder+os.sep+"allPredictedData.pkl",'rb') predicted_data = pickle.load(file) file.close() rel_SE_out_fields = {} rel_SE_out_scalars = {} r2OutFields = {} r2OutScalars = {} tolerance = 1.e-6 for split_name, split_indices in problem_split.items(): rel_SE_out_fields[split_name] = {} r2OutFields[split_name] = {} for fname in out_fields_names: rel_SE_out_fields[split_name][fname] = np.empty(len(split_indices)) r2OutFields[split_name][fname] = 0.0 rel_SE_out_scalars[split_name] = {} r2OutScalars[split_name] = {} for sname in out_scalars_names: rel_SE_out_scalars[split_name][sname] = np.empty(len(split_indices)) r2OutScalars[split_name][sname] = 0.0 if verbose: print("Compute metrics for each regressor:") reference_out_fields = predicted_data["referenceOutFields"] predicted_out_fields = predicted_data["predictedOutFields"] reference_out_scalars = predicted_data["referenceOutScalars"] predicted_out_scalars = predicted_data["predictedOutScalars"] for split_name, split_indices in problem_split.items(): for fname in out_fields_names: ref_vect = np.array([]) predict_vect = np.array([]) for i, index in enumerate(split_indices): ref = reference_out_fields[fname][index] pred = predicted_out_fields[fname][index] ref_vect = np.hstack((ref_vect, ref)) predict_vect = np.hstack((predict_vect, pred)) maxref = np.max(np.abs(ref)) if maxref < tolerance: denom_field = 1. else: denom_field = maxref reldif = (pred - ref) / denom_field rel_SE_out_fields[split_name][fname][i] = ( 1 / ref.shape[0]) * (np.linalg.norm(reldif))**2 r2OutFields[split_name][fname] = r2_score( ref_vect, predict_vect) for sname in out_scalars_names: ref_scal = np.array([]) predict_scal = np.array([]) for i, index in enumerate(split_indices): ref = reference_out_scalars[sname][index] pred = predicted_out_scalars[sname][index] ref_scal = np.hstack((ref_scal, ref)) predict_scal = np.hstack((predict_scal, pred)) # Compute relative difference if ref < tolerance: denom_scal = 1. else: denom_scal = ref reldif = (pred - ref) / denom_scal rel_SE_out_scalars[split_name][sname][i] = reldif**2 r2OutScalars[split_name][sname] = r2_score( ref_scal, predict_scal) res = {} if verbose: print("===") print("rRMSE for fields") res["rRMSE for fields"] = {} for split_name, split_indices in problem_split.items(): res["rRMSE for fields"][split_name] = {} if verbose: print(" " + split_name) for fname in out_fields_names: rel_RMSE_out_fields_set = np.sqrt( np.mean(rel_SE_out_fields[split_name][fname])) out_string = "{:#.6g}".format( np.mean(rel_RMSE_out_fields_set)) res["rRMSE for fields"][split_name][fname] = out_string if verbose: print(fname.ljust(7) + ": " + out_string) if verbose: print("===") print("rRMSE for scalars") res["rRMSE for scalars"] = {} for split_name, split_indices in problem_split.items(): if verbose: print(" " + split_name) res["rRMSE for scalars"][split_name] = {} for sname in out_scalars_names: rel_RMSE_out_scalars_set = np.sqrt( np.mean(rel_SE_out_scalars[split_name][sname])) out_string = "{:#.6g}".format( np.mean(rel_RMSE_out_scalars_set)) res["rRMSE for scalars"][split_name][sname] = out_string if verbose: print(sname.ljust(14) + ": " + out_string) if verbose: print("===") print("R2 for fields") res["R2 for fields"] = {} for split_name, split_indices in problem_split.items(): if verbose: print(" " + split_name) res["R2 for fields"][split_name] = {} for fname in out_fields_names: out_string = "{:#.6g}".format( np.mean( r2OutFields[split_name][fname])) res["R2 for fields"][split_name][fname] = out_string if verbose: print(fname.ljust(7) + ": " + out_string) if verbose: print("===") print("R2 for scalars") res["R2 for scalars"] = {} for split_name, split_indices in problem_split.items(): if verbose: print(" " + split_name) res["R2 for scalars"][split_name] = {} for sname in out_scalars_names: out_string = "{:#.6g}".format( np.mean( r2OutScalars[split_name][sname])) res["R2 for scalars"][split_name][sname] = out_string if verbose: print(sname.ljust(14) + ": " + out_string) with open(generated_data_folder+os.sep+"metrics.yaml", 'w') as file: yaml.dump(res, file, default_flow_style=False, sort_keys=False) return res
[docs] def export_predictions(configuration: dict, problem: ProblemDefinition)->Dataset: zone_name = configuration["zone_name"] base_name = configuration["base_name"] verbose = configuration['verbose'] # Clean previous plots reset_folder( configuration["generated_data_folder"] + os.sep + configuration["case_name"] + "_predicted") # Load the problem and dataset outFieldsNames = problem.out_fields_names outScalarsNames = problem.out_scalars_names dataset = Dataset() dataset._load_from_dir_(configuration['init_dataset_location']+os.sep+"dataset", verbose = verbose) # Load the predicted fields data from chosen GP file = open(configuration["generated_data_folder"]+os.sep+"allPredictedData.pkl",'rb') predictedData = pickle.load(file) file.close() predictedOutFields = predictedData["predictedOutFields"] predictedOutScalars = predictedData["predictedOutScalars"] exportOutFieldsNames = [n + "_predicted" for n in outFieldsNames] exportOutScalarsNames = [n + "_predicted" for n in outScalarsNames] if configuration["regression"]["uncertainties"]: predictedOutFieldsVariance = predictedData["predictedOutFieldsVariance"] predictedOutFieldsQuantile0_025 = predictedData["predictedOutFieldsQuantile0_025"] predictedOutFieldsQuantile0_975 = predictedData["predictedOutFieldsQuantile0_975"] predictedOutScalarsVariance = predictedData["predictedOutScalarsVariance"] predictedOutScalarsQuantile0_025 = predictedData["predictedOutScalarsQuantile0_025"] predictedOutScalarsQuantile0_975 = predictedData["predictedOutScalarsQuantile0_975"] exportOutFieldsNames += [n + "_variance" for n in outFieldsNames] + \ [n + "_quantile_0.025" for n in outFieldsNames] + \ [n + "_quantile_0.975" for n in outFieldsNames] exportOutScalarsNames += [n + "_variance" for n in outScalarsNames] + \ [n + "_quantile_0.025" for n in outScalarsNames] + \ [n + "_quantile_0.975" for n in outScalarsNames] predicted_dataset = Dataset() if verbose: print("Building predicted_dataset") for i_sample in tqdm(range(len(dataset)), disable=not (verbose)): sample = dataset[i_sample] # export fields predictedList = [predictedOutFields[fname][i_sample] for fname in outFieldsNames] pointFields = predictedList if configuration["regression"]["uncertainties"]: varianceList = [predictedOutFieldsVariance[fname][i_sample] for fname in outFieldsNames] q0_025List = [predictedOutFieldsQuantile0_025[fname][i_sample] for fname in outFieldsNames] q0_975List = [predictedOutFieldsQuantile0_975[fname][i_sample] for fname in outFieldsNames] pointFields += varianceList + q0_025List + q0_975List for j, fname in enumerate(exportOutFieldsNames): sample.add_field(fname, pointFields[j], zone_name, base_name) # export scalars predictedList = [predictedOutScalars[sname][i_sample] for sname in outScalarsNames] scalars = predictedList if configuration["regression"]["uncertainties"]: varianceList = [predictedOutScalarsVariance[sname][i_sample] for sname in outScalarsNames] q0_025List = [predictedOutScalarsQuantile0_025[sname][i_sample] for sname in outScalarsNames] q0_975List = [predictedOutScalarsQuantile0_975[sname][i_sample] for sname in outScalarsNames] scalars += varianceList + q0_025List + q0_975List for j, sname in enumerate(exportOutScalarsNames): sample.add_scalar(sname, scalars[j]) predicted_dataset.add_sample(sample) predicted_dataset._save_to_dir_( configuration["generated_data_folder"] + os.sep + configuration["case_name"] + "_predicted/dataset", verbose = verbose) return predicted_dataset