Source code for mmgp.processing

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
#
import os
import pickle
import time

import numpy as np
from plaid.containers.sample import Sample
from plaid.containers.dataset import Dataset
from plaid.problem_definition import ProblemDefinition
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm

from mmgp.dimensionality_reduction import DimensionalityReductor
from mmgp.utils import remove_file, load_backend

from Muscat.Bridges.CGNSBridge import CGNSToMesh


"""
In this file, the GPs are trained independently on low-dimensional pretreated data.
Rescalings are done in this file as well.
"""


[docs] def read_proj_in_coord_fields( sample: Sample, zone_name: str, base_name: str) -> np.ndarray: """Read and concatenate coordinate fields from a Sample object. Args: sample (Sample): The Sample object containing coordinate fields. zone_name (str): The zone name of the coorinates fields. base_name (str): The base name of the coordinate fields. Returns: np.ndarray: A NumPy array containing concatenated coordinate fields. """ coord_names = ["X", "Y", "Z"] proj_in_coord_fields = np.array([]) for j in range(3): coord_field = sample.get_field( "coord_" + coord_names[j], zone_name, base_name) if coord_field is not None: proj_in_coord_fields = np.hstack( (proj_in_coord_fields, coord_field)) return proj_in_coord_fields
[docs] def train(configuration: dict, problem: ProblemDefinition) -> dict: """Train a model using provided data in the ProblemDefinition and configuration settings. Args: configuration (dict): A dictionary containing various parameters and settings for the training process. It should include the following keys: - 'case_name' (str): The name or identifier for the specific case or scenario being analyzed. - 'base_name' (str): The name of a specific base on which to train the model. - 'zone_name' (str): The name of a specific zone on which to train the model. - 'generated_data_folder' (str): A string representing the folder where the training data and model will be saved. problem (ProblemDefinition): An instance of ProblemDefinition class containing the training data and problem-specific information, such as input and output names. Caution: This function will load the morphed and projected PLAID dataset. Make sure it has been created and is located correctly. """ generated_data_folder = configuration["generated_data_folder"] case_name = configuration["case_name"] base_name = configuration["base_name"] zone_name = configuration["zone_name"] verbose = configuration["verbose"] train_set_name = configuration["train_set"] training_set = problem.get_split(train_set_name) in_scalars_names = problem.in_scalars_names out_scalars_names = problem.out_scalars_names out_fields_names = problem.out_fields_names remove_file(generated_data_folder+os.sep+"trainingRegressionData.pkl") # TODO: inline these 2 lines morph_proj_dataset = Dataset() morph_proj_dataset._load_from_dir_(generated_data_folder+os.sep+case_name+"_morphed_and_projected/dataset", verbose = verbose) cgns_mesh = morph_proj_dataset[configuration['common_mesh_index']].get_mesh() common_morphed_mesh = CGNSToMesh(cgns_mesh) in_scalars_train = morph_proj_dataset.get_scalars_to_tabular( scalar_names=in_scalars_names, sample_ids=training_set, as_nparray=True) out_scalars_train = morph_proj_dataset.get_scalars_to_tabular( scalar_names=out_scalars_names, sample_ids=training_set, as_nparray=True) # load pretreatedData n_train = len(training_set) proj_in_coord_size = read_proj_in_coord_fields( morph_proj_dataset[0], zone_name, base_name).shape[0] proj_out_field_size = morph_proj_dataset[0].get_field( "coord_X", zone_name, base_name).shape[0] proj_in_coord_fields_train = np.empty((n_train, proj_in_coord_size)) proj_out_fields_train = {} for fname in out_fields_names: proj_out_fields_train[fname] = np.empty((n_train, proj_out_field_size)) for i in range(n_train): index = training_set[i] proj_in_coord_fields_train[i, :] = read_proj_in_coord_fields( morph_proj_dataset[index], zone_name, base_name) for fname in out_fields_names: proj_out_fields_train[fname][i, :] = morph_proj_dataset[index].get_field( fname, zone_name, base_name) start = time.time() in_coord_dim_red_info = configuration['dimensionality_reduction']['input_coord_fields'] pod_proj_in_coord_fields = DimensionalityReductor(algo=in_coord_dim_red_info['algo'], options=in_coord_dim_red_info['options'], common_morphed_mesh=common_morphed_mesh, input_or_output='input') reduced_proj_in_coord_fields = pod_proj_in_coord_fields.fit_transform( proj_in_coord_fields_train) # Construct a rescaling for the nongeometrical input scalars scaler_in_scalar = StandardScaler() rescaled_in_scalar = scaler_in_scalar.fit_transform(in_scalars_train) # Create the ML low-dimensional input by concatenating the two previous # inputs X = np.hstack((reduced_proj_in_coord_fields, rescaled_in_scalar)) # Construct a rescaling for the input scalerX = StandardScaler() rescaledX = scalerX.fit_transform(X) # Construct the snapshot-POD dimensionality reduction for the projection # of each output field of interest onto the common mesh pod_proj_out_fields = {} scaler_Y_fields = {} rescaled_Y_fields = {} out_field_dim_red_info = configuration['dimensionality_reduction']['output_coord_fields'] for fname in out_fields_names: pod_temp = DimensionalityReductor(algo=out_field_dim_red_info['algo'], options=out_field_dim_red_info['options'], common_morphed_mesh=common_morphed_mesh, input_or_output='output') y_temp = pod_temp.fit_transform(proj_out_fields_train[fname][:, :]) pod_proj_out_fields[fname] = pod_temp scaler_Y_fields[fname] = StandardScaler() rescaled_Y_fields[fname] = scaler_Y_fields[fname].fit_transform(y_temp) # Compute scalars scalers scaler_Y_scalars = {} rescaled_Y_scalars = {} for i, sname in enumerate(out_scalars_names): scaler_Y_scalars[sname] = StandardScaler() rescaled_Y_scalars[sname] = scaler_Y_scalars[sname].fit_transform( out_scalars_train[:, i].reshape(-1, 1)) regression_info = configuration['regression'] Regressor = load_backend(regression_info["algo"]) # Train the GPs for the output fields if verbose: print("=============================") print("Training Gaussian process") print("---") print("Training GPs for the output fields") fields_regressors = {} for fname in tqdm(out_fields_names, disable=not (verbose)): fields_regressors[fname] = Regressor( options=regression_info["options"]) fields_regressors[fname].fit(rescaledX, rescaled_Y_fields[fname]) # Train the GPs for the scalar outputs if verbose: print("---") print("Training GPs for the output scalars") scalars_regressors = {} for sname in tqdm(out_scalars_names, disable=not (verbose)): scalars_regressors[sname] = Regressor( options=regression_info["options"]) scalars_regressors[sname].fit(rescaledX, rescaled_Y_scalars[sname]) if verbose: print("Duration for complete training =", time.time() - start) # Save the data constructing during the training workflow training_regression_data = {} training_regression_data["podProjInCoordFields"] = pod_proj_in_coord_fields training_regression_data["scalerInScalar"] = scaler_in_scalar training_regression_data["scalerX"] = scalerX training_regression_data["podProjOutFields"] = pod_proj_out_fields training_regression_data["scalerYFields"] = scaler_Y_fields training_regression_data["scalerYScalars"] = scaler_Y_scalars training_regression_data["fieldsRegressors"] = fields_regressors training_regression_data["scalarsRegressors"] = scalars_regressors with open(generated_data_folder+os.sep+"trainingRegressionData.pkl", 'wb') as file: pickle.dump(training_regression_data, file) return training_regression_data
[docs] def infer(configuration: dict, problem: ProblemDefinition) -> dict: """Perform inference using the provided ProblemDefinition instance and configuration settings. Args: configuration (dict): A dictionary containing various parameters and settings for the inference process. It should include the following keys: - 'init_dataset_location' (str): A string specifying the location or path to the initial PLAID dataset. - 'generated_data_folder' (str): A string representing the folder where the inference results will be saved. - 'case_name' (str): The name or identifier for the specific case or scenario being analyzed. - 'base_name' (str): The name of a specific base on which to perform inference. - 'zone_name' (str): The name of a specific zone on which to perform inference. problem (ProblemDefinition): An object of the ProblemDefinition class containing the inference data and problem-specific information. Caution: This function will load the morphed and projected PLAID dataset and the initial PLAID dataset. Make sure it has been created and is located correctly. """ init_dataset_location = configuration['init_dataset_location'] generated_data_folder = configuration['generated_data_folder'] verbose = configuration['verbose'] case_name = configuration['case_name'] base_name = configuration["base_name"] zone_name = configuration["zone_name"] remove_file(generated_data_folder+os.sep+"allPredictedData.pkl") dataset = Dataset() dataset._load_from_dir_(init_dataset_location+os.sep+"dataset", verbose = verbose) morph_proj_dataset = Dataset() morph_proj_dataset._load_from_dir_(generated_data_folder+os.sep+case_name+"_morphed_and_projected/dataset", verbose = verbose) FE_interpolation_operators = {} for index in problem.get_all_indices(): file = open(generated_data_folder+os.sep+"FEInterpolationOperators/sample_"+str(index).zfill(9)+".pkl",'rb') FE_interpolation_operators[index] = pickle.load(file) file.close() file = open(generated_data_folder+os.sep+"trainingRegressionData.pkl",'rb') training_regression_data = pickle.load(file) file.close() pod_proj_in_coord_fields = training_regression_data["podProjInCoordFields"] scaler_in_scalar = training_regression_data["scalerInScalar"] scaler_x = training_regression_data["scalerX"] pod_proj_out_fields = training_regression_data["podProjOutFields"] scaler_Y_fields = training_regression_data["scalerYFields"] scaler_Y_scalars = training_regression_data["scalerYScalars"] fields_regressors = training_regression_data["fieldsRegressors"] scalars_regressors = training_regression_data["scalarsRegressors"] n_samples = len(dataset) proj_in_coord_size = read_proj_in_coord_fields( morph_proj_dataset[0], zone_name, base_name).shape[0] in_scalars_names = problem.in_scalars_names out_scalars_names = problem.out_scalars_names out_fields_names = problem.out_fields_names proj_in_coord_fields = np.empty((n_samples, proj_in_coord_size)) for index in range(n_samples): proj_in_coord_fields[index, :] = read_proj_in_coord_fields( morph_proj_dataset[index], zone_name, base_name) in_scalars = morph_proj_dataset.get_scalars_to_tabular( scalar_names=in_scalars_names, as_nparray=True) # Reduce the dimension of the precompputed projected spatial coordinate # fields using the precomputed snapshots-POD reduced_proj_in_coord_fields = pod_proj_in_coord_fields.transform( proj_in_coord_fields) # Rescale the nongeometrical input scalars rescaled_in_scalar = scaler_in_scalar.transform(in_scalars) # Create the low-dimensional input by concatenating the two previous inputs X = np.hstack((reduced_proj_in_coord_fields, rescaled_in_scalar)) # Rescale the input rescaled_X = scaler_x.transform(X) # --------------------------------------------- if configuration["regression"]["uncertainties"]: n_Monte_Carlo_samples = configuration["regression"]['number_Monte_Carlo_samples'] predict_uncertainties = True else: predict_uncertainties = False if verbose: print("=============================") print("Predicting output fields:") predicted_data = {} reference_out_fields = {} predicted_out_fields = {} if predict_uncertainties: predicted_out_fields_variance = {} predicted_out_fields_quantile0_025 = {} predicted_out_fields_quantile0_975 = {} for fname in out_fields_names: reference_out_fields[fname] = [] predicted_out_fields[fname] = [] if predict_uncertainties: predicted_out_fields_variance[fname] = [] predicted_out_fields_quantile0_025[fname] = [] predicted_out_fields_quantile0_975[fname] = [] for i_sample in tqdm(range(n_samples), disable=not (verbose)): local_inv_proj_operator = FE_interpolation_operators[i_sample]["invProjOperator"] n_nodes = local_inv_proj_operator.shape[0] n_node_common_mesh = local_inv_proj_operator.shape[1] for fname in out_fields_names: # Store reference ref = dataset[i_sample].get_field(fname, zone_name, base_name) reference_out_fields[fname].append(ref) # Inverse rescale the field outputs and decompress the field # outputs using the pretrained snapshot-POD if (predict_uncertainties and n_Monte_Carlo_samples==0): #Recovery of prediction mean and standard deviation rescaled_Y_field,rescaled_Y_field_var = fields_regressors[fname].predict( rescaled_X[i_sample, :].reshape(1, -1),return_var=True) else: #Recovery only of prediction mean rescaled_Y_field=fields_regressors[fname].predict( rescaled_X[i_sample, :].reshape(1, -1),return_var=False) y = scaler_Y_fields[fname].inverse_transform( rescaled_Y_field) local_predicted_proj_out_fields = pod_proj_out_fields[fname].inverse_transform(y).reshape(-1,1) pred = local_inv_proj_operator.dot( local_predicted_proj_out_fields).squeeze() predicted_out_fields[fname].append(pred) if predict_uncertainties: if (n_Monte_Carlo_samples==0): # Analytical variance nb_modes=rescaled_Y_field_var.shape[1] K_GP=np.zeros((nb_modes, nb_modes)) np.fill_diagonal(K_GP,rescaled_Y_field_var) # GP covariance prediction Sigma=np.zeros((nb_modes,nb_modes)) np.fill_diagonal(Sigma,np.sqrt(scaler_Y_fields[fname].var_)) K_rescale=np.dot(Sigma,np.dot(K_GP,Sigma)) # rescaling L = np.transpose(pod_proj_out_fields[fname].training_data["reducedOrderBasis"]) # inverse POD operator Phibarre_L=local_inv_proj_operator.dot(L) # projection operator multiplied by POD operator var=np.zeros((Phibarre_L.shape[0],1)) for i in range(var.shape[0]): # prediction variance on each node Phibarre_L_i=Phibarre_L[i,:].reshape(1,-1) var[i,0]=np.dot(Phibarre_L_i,np.dot(K_rescale,Phibarre_L_i.T))[0,0] predicted_out_fields_variance[fname].append( var.squeeze()) # field variance on nodes else: # Monte Carlo variance with online formula mean=np.zeros((n_nodes,1)) S2=np.zeros((n_nodes,1)) count=0 for i in range(n_Monte_Carlo_samples): rescaled_Y_field_sample=fields_regressors[fname].predict_Monte_Carlo_draw( X=rescaled_X[i_sample,:].reshape(1,-1)) # metamodel simulation y_train_sample = scaler_Y_fields[fname].inverse_transform( rescaled_Y_field_sample) # rescaling local_predicted_proj_out_fields_sample = pod_proj_out_fields[fname].inverse_transform( y_train_sample) # inverse POD predicted_out_fields_sample = local_inv_proj_operator.dot( np.transpose(local_predicted_proj_out_fields_sample)) # projection on mesh count=count+1 delta1=predicted_out_fields_sample-mean mean=mean+(delta1)/count delta2=predicted_out_fields_sample-mean S2=S2+delta1*delta2 # variance update S2=S2/(n_Monte_Carlo_samples) # field variance on nodes predicted_out_fields_variance[fname].append( S2.squeeze()) predicted_out_fields_quantile0_025[fname].append( pred-1.96*np.sqrt(predicted_out_fields_variance[fname][i_sample]) ) predicted_out_fields_quantile0_975[fname].append( pred+1.96*np.sqrt(predicted_out_fields_variance[fname][i_sample]) ) predicted_data["referenceOutFields"] = reference_out_fields predicted_data["predictedOutFields"] = predicted_out_fields if verbose: print("Predicting Scalars:") reference_out_scalars = {} predicted_out_scalars = {} if predict_uncertainties: predicted_out_scalars_variance = {} predicted_out_scalars_quantile0_025 = {} predicted_out_scalars_quantile0_975 = {} for sname in out_scalars_names: reference_out_scalars[sname] = [] predicted_out_scalars[sname] = [] if predict_uncertainties: predicted_out_scalars_variance[sname] = [] predicted_out_scalars_quantile0_025[sname] = [] predicted_out_scalars_quantile0_975[sname] = [] for i_sample in tqdm(range(n_samples), disable=not (verbose)): for sname in out_scalars_names: # Store reference ref = dataset[i_sample].get_scalar(sname) reference_out_scalars[sname].append(ref) # Inverse rescale the scalar outputs if (predict_uncertainties and n_Monte_Carlo_samples==0): #Recovery of prediction mean and standard deviation rescaled_Y_scalar,rescaled_Y_scalar_var = scalars_regressors[sname].predict( rescaled_X[i_sample, :].reshape(1, -1),return_var=True) rescaled_Y_scalar_var=rescaled_Y_scalar_var[0,0] else: #Recovery only of prediction mean rescaled_Y_scalar = scalars_regressors[sname].predict( rescaled_X[i_sample, :].reshape(1, -1),return_var=False) pred = scaler_Y_scalars[sname].inverse_transform( rescaled_Y_scalar)[0, 0] predicted_out_scalars[sname].append(pred) if predict_uncertainties: if (n_Monte_Carlo_samples==0): # Analytical variance sigma2=scaler_Y_scalars[sname].var_[0] v_rescale=sigma2*rescaled_Y_scalar_var # rescaling predicted_out_scalars_variance[sname].append( v_rescale) # variance of scalar output else: # Monte Carlo variance with online formula mean=0 S2=0 count=0 for i in range(n_Monte_Carlo_samples): rescaled_Y_scalar_sample=scalars_regressors[sname].predict_Monte_Carlo_draw( X=rescaled_X[i_sample,:].reshape(1,-1)) # metamodel simulation predicted_out_scalars_sample = scaler_Y_scalars[sname].inverse_transform( rescaled_Y_scalar_sample)[0,0] # rescaling count=count+1 delta1=predicted_out_scalars_sample-mean mean=mean+(delta1)/count delta2=predicted_out_scalars_sample-mean S2=S2+delta1*delta2 # variance update S2=S2/(n_Monte_Carlo_samples) # variance on nodes predicted_out_scalars_variance[sname].append( S2) predicted_out_scalars_quantile0_025[sname].append( pred-1.96*np.sqrt(predicted_out_scalars_variance[sname][i_sample]) ) predicted_out_scalars_quantile0_975[sname].append( pred+1.96*np.sqrt(predicted_out_scalars_variance[sname][i_sample]) ) predicted_data["referenceOutScalars"] = reference_out_scalars predicted_data["predictedOutScalars"] = predicted_out_scalars if predict_uncertainties: predicted_data["predictedOutFieldsVariance"] = predicted_out_fields_variance predicted_data["predictedOutFieldsQuantile0_025"] = predicted_out_fields_quantile0_025 predicted_data["predictedOutFieldsQuantile0_975"] = predicted_out_fields_quantile0_975 predicted_data["predictedOutScalarsVariance"] = predicted_out_scalars_variance predicted_data["predictedOutScalarsQuantile0_025"] = predicted_out_scalars_quantile0_025 predicted_data["predictedOutScalarsQuantile0_975"] = predicted_out_scalars_quantile0_975 # Save the predicted fields data with open(generated_data_folder+os.sep+"allPredictedData.pkl", 'wb') as file: pickle.dump(predicted_data, file) return predicted_data