# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
#
import os
import pickle
import time
import numpy as np
from plaid.containers.sample import Sample
from plaid.containers.dataset import Dataset
from plaid.problem_definition import ProblemDefinition
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm
from mmgp.dimensionality_reduction import DimensionalityReductor
from mmgp.utils import remove_file, load_backend
from Muscat.Bridges.CGNSBridge import CGNSToMesh
"""
In this file, the GPs are trained independently on low-dimensional pretreated data.
Rescalings are done in this file as well.
"""
[docs]
def read_proj_in_coord_fields(
sample: Sample, zone_name: str, base_name: str) -> np.ndarray:
"""Read and concatenate coordinate fields from a Sample object.
Args:
sample (Sample): The Sample object containing coordinate fields.
zone_name (str): The zone name of the coorinates fields.
base_name (str): The base name of the coordinate fields.
Returns:
np.ndarray: A NumPy array containing concatenated coordinate fields.
"""
coord_names = ["X", "Y", "Z"]
proj_in_coord_fields = np.array([])
for j in range(3):
coord_field = sample.get_field(
"coord_" + coord_names[j], zone_name, base_name)
if coord_field is not None:
proj_in_coord_fields = np.hstack(
(proj_in_coord_fields, coord_field))
return proj_in_coord_fields
[docs]
def train(configuration: dict, problem: ProblemDefinition) -> dict:
"""Train a model using provided data in the ProblemDefinition and configuration settings.
Args:
configuration (dict): A dictionary containing various parameters and settings for the training process. It should include the following keys:
- 'case_name' (str): The name or identifier for the specific case or scenario being analyzed.
- 'base_name' (str): The name of a specific base on which to train the model.
- 'zone_name' (str): The name of a specific zone on which to train the model.
- 'generated_data_folder' (str): A string representing the folder where the training data and model will be saved.
problem (ProblemDefinition): An instance of ProblemDefinition class containing the training data and problem-specific information, such as input and output names.
Caution:
This function will load the morphed and projected PLAID dataset. Make sure it has been created and is located correctly.
"""
generated_data_folder = configuration["generated_data_folder"]
case_name = configuration["case_name"]
base_name = configuration["base_name"]
zone_name = configuration["zone_name"]
verbose = configuration["verbose"]
train_set_name = configuration["train_set"]
training_set = problem.get_split(train_set_name)
in_scalars_names = problem.in_scalars_names
out_scalars_names = problem.out_scalars_names
out_fields_names = problem.out_fields_names
remove_file(generated_data_folder+os.sep+"trainingRegressionData.pkl")
# TODO: inline these 2 lines
morph_proj_dataset = Dataset()
morph_proj_dataset._load_from_dir_(generated_data_folder+os.sep+case_name+"_morphed_and_projected/dataset", verbose = verbose)
cgns_mesh = morph_proj_dataset[configuration['common_mesh_index']].get_mesh()
common_morphed_mesh = CGNSToMesh(cgns_mesh)
in_scalars_train = morph_proj_dataset.get_scalars_to_tabular(
scalar_names=in_scalars_names, sample_ids=training_set, as_nparray=True)
out_scalars_train = morph_proj_dataset.get_scalars_to_tabular(
scalar_names=out_scalars_names, sample_ids=training_set, as_nparray=True)
# load pretreatedData
n_train = len(training_set)
proj_in_coord_size = read_proj_in_coord_fields(
morph_proj_dataset[0], zone_name, base_name).shape[0]
proj_out_field_size = morph_proj_dataset[0].get_field(
"coord_X", zone_name, base_name).shape[0]
proj_in_coord_fields_train = np.empty((n_train, proj_in_coord_size))
proj_out_fields_train = {}
for fname in out_fields_names:
proj_out_fields_train[fname] = np.empty((n_train, proj_out_field_size))
for i in range(n_train):
index = training_set[i]
proj_in_coord_fields_train[i, :] = read_proj_in_coord_fields(
morph_proj_dataset[index], zone_name, base_name)
for fname in out_fields_names:
proj_out_fields_train[fname][i, :] = morph_proj_dataset[index].get_field(
fname, zone_name, base_name)
start = time.time()
in_coord_dim_red_info = configuration['dimensionality_reduction']['input_coord_fields']
pod_proj_in_coord_fields = DimensionalityReductor(algo=in_coord_dim_red_info['algo'],
options=in_coord_dim_red_info['options'],
common_morphed_mesh=common_morphed_mesh,
input_or_output='input')
reduced_proj_in_coord_fields = pod_proj_in_coord_fields.fit_transform(
proj_in_coord_fields_train)
# Construct a rescaling for the nongeometrical input scalars
scaler_in_scalar = StandardScaler()
rescaled_in_scalar = scaler_in_scalar.fit_transform(in_scalars_train)
# Create the ML low-dimensional input by concatenating the two previous
# inputs
X = np.hstack((reduced_proj_in_coord_fields, rescaled_in_scalar))
# Construct a rescaling for the input
scalerX = StandardScaler()
rescaledX = scalerX.fit_transform(X)
# Construct the snapshot-POD dimensionality reduction for the projection
# of each output field of interest onto the common mesh
pod_proj_out_fields = {}
scaler_Y_fields = {}
rescaled_Y_fields = {}
out_field_dim_red_info = configuration['dimensionality_reduction']['output_coord_fields']
for fname in out_fields_names:
pod_temp = DimensionalityReductor(algo=out_field_dim_red_info['algo'],
options=out_field_dim_red_info['options'],
common_morphed_mesh=common_morphed_mesh,
input_or_output='output')
y_temp = pod_temp.fit_transform(proj_out_fields_train[fname][:, :])
pod_proj_out_fields[fname] = pod_temp
scaler_Y_fields[fname] = StandardScaler()
rescaled_Y_fields[fname] = scaler_Y_fields[fname].fit_transform(y_temp)
# Compute scalars scalers
scaler_Y_scalars = {}
rescaled_Y_scalars = {}
for i, sname in enumerate(out_scalars_names):
scaler_Y_scalars[sname] = StandardScaler()
rescaled_Y_scalars[sname] = scaler_Y_scalars[sname].fit_transform(
out_scalars_train[:, i].reshape(-1, 1))
regression_info = configuration['regression']
Regressor = load_backend(regression_info["algo"])
# Train the GPs for the output fields
if verbose:
print("=============================")
print("Training Gaussian process")
print("---")
print("Training GPs for the output fields")
fields_regressors = {}
for fname in tqdm(out_fields_names, disable=not (verbose)):
fields_regressors[fname] = Regressor(
options=regression_info["options"])
fields_regressors[fname].fit(rescaledX, rescaled_Y_fields[fname])
# Train the GPs for the scalar outputs
if verbose:
print("---")
print("Training GPs for the output scalars")
scalars_regressors = {}
for sname in tqdm(out_scalars_names, disable=not (verbose)):
scalars_regressors[sname] = Regressor(
options=regression_info["options"])
scalars_regressors[sname].fit(rescaledX, rescaled_Y_scalars[sname])
if verbose:
print("Duration for complete training =", time.time() - start)
# Save the data constructing during the training workflow
training_regression_data = {}
training_regression_data["podProjInCoordFields"] = pod_proj_in_coord_fields
training_regression_data["scalerInScalar"] = scaler_in_scalar
training_regression_data["scalerX"] = scalerX
training_regression_data["podProjOutFields"] = pod_proj_out_fields
training_regression_data["scalerYFields"] = scaler_Y_fields
training_regression_data["scalerYScalars"] = scaler_Y_scalars
training_regression_data["fieldsRegressors"] = fields_regressors
training_regression_data["scalarsRegressors"] = scalars_regressors
with open(generated_data_folder+os.sep+"trainingRegressionData.pkl", 'wb') as file:
pickle.dump(training_regression_data, file)
return training_regression_data
[docs]
def infer(configuration: dict, problem: ProblemDefinition) -> dict:
"""Perform inference using the provided ProblemDefinition instance and configuration settings.
Args:
configuration (dict): A dictionary containing various parameters and settings for the inference process. It should include the following keys:
- 'init_dataset_location' (str): A string specifying the location or path to the initial PLAID dataset.
- 'generated_data_folder' (str): A string representing the folder where the inference results will be saved.
- 'case_name' (str): The name or identifier for the specific case or scenario being analyzed.
- 'base_name' (str): The name of a specific base on which to perform inference.
- 'zone_name' (str): The name of a specific zone on which to perform inference.
problem (ProblemDefinition): An object of the ProblemDefinition class containing the inference data and problem-specific information.
Caution:
This function will load the morphed and projected PLAID dataset and the initial PLAID dataset. Make sure it has been created and is located correctly.
"""
init_dataset_location = configuration['init_dataset_location']
generated_data_folder = configuration['generated_data_folder']
verbose = configuration['verbose']
case_name = configuration['case_name']
base_name = configuration["base_name"]
zone_name = configuration["zone_name"]
remove_file(generated_data_folder+os.sep+"allPredictedData.pkl")
dataset = Dataset()
dataset._load_from_dir_(init_dataset_location+os.sep+"dataset", verbose = verbose)
morph_proj_dataset = Dataset()
morph_proj_dataset._load_from_dir_(generated_data_folder+os.sep+case_name+"_morphed_and_projected/dataset", verbose = verbose)
FE_interpolation_operators = {}
for index in problem.get_all_indices():
file = open(generated_data_folder+os.sep+"FEInterpolationOperators/sample_"+str(index).zfill(9)+".pkl",'rb')
FE_interpolation_operators[index] = pickle.load(file)
file.close()
file = open(generated_data_folder+os.sep+"trainingRegressionData.pkl",'rb')
training_regression_data = pickle.load(file)
file.close()
pod_proj_in_coord_fields = training_regression_data["podProjInCoordFields"]
scaler_in_scalar = training_regression_data["scalerInScalar"]
scaler_x = training_regression_data["scalerX"]
pod_proj_out_fields = training_regression_data["podProjOutFields"]
scaler_Y_fields = training_regression_data["scalerYFields"]
scaler_Y_scalars = training_regression_data["scalerYScalars"]
fields_regressors = training_regression_data["fieldsRegressors"]
scalars_regressors = training_regression_data["scalarsRegressors"]
n_samples = len(dataset)
proj_in_coord_size = read_proj_in_coord_fields(
morph_proj_dataset[0], zone_name, base_name).shape[0]
in_scalars_names = problem.in_scalars_names
out_scalars_names = problem.out_scalars_names
out_fields_names = problem.out_fields_names
proj_in_coord_fields = np.empty((n_samples, proj_in_coord_size))
for index in range(n_samples):
proj_in_coord_fields[index, :] = read_proj_in_coord_fields(
morph_proj_dataset[index], zone_name, base_name)
in_scalars = morph_proj_dataset.get_scalars_to_tabular(
scalar_names=in_scalars_names, as_nparray=True)
# Reduce the dimension of the precompputed projected spatial coordinate
# fields using the precomputed snapshots-POD
reduced_proj_in_coord_fields = pod_proj_in_coord_fields.transform(
proj_in_coord_fields)
# Rescale the nongeometrical input scalars
rescaled_in_scalar = scaler_in_scalar.transform(in_scalars)
# Create the low-dimensional input by concatenating the two previous inputs
X = np.hstack((reduced_proj_in_coord_fields, rescaled_in_scalar))
# Rescale the input
rescaled_X = scaler_x.transform(X)
# ---------------------------------------------
if configuration["regression"]["uncertainties"]:
n_Monte_Carlo_samples = configuration["regression"]['number_Monte_Carlo_samples']
predict_uncertainties = True
else:
predict_uncertainties = False
if verbose:
print("=============================")
print("Predicting output fields:")
predicted_data = {}
reference_out_fields = {}
predicted_out_fields = {}
if predict_uncertainties:
predicted_out_fields_variance = {}
predicted_out_fields_quantile0_025 = {}
predicted_out_fields_quantile0_975 = {}
for fname in out_fields_names:
reference_out_fields[fname] = []
predicted_out_fields[fname] = []
if predict_uncertainties:
predicted_out_fields_variance[fname] = []
predicted_out_fields_quantile0_025[fname] = []
predicted_out_fields_quantile0_975[fname] = []
for i_sample in tqdm(range(n_samples), disable=not (verbose)):
local_inv_proj_operator = FE_interpolation_operators[i_sample]["invProjOperator"]
n_nodes = local_inv_proj_operator.shape[0]
n_node_common_mesh = local_inv_proj_operator.shape[1]
for fname in out_fields_names:
# Store reference
ref = dataset[i_sample].get_field(fname, zone_name, base_name)
reference_out_fields[fname].append(ref)
# Inverse rescale the field outputs and decompress the field
# outputs using the pretrained snapshot-POD
if (predict_uncertainties and n_Monte_Carlo_samples==0):
#Recovery of prediction mean and standard deviation
rescaled_Y_field,rescaled_Y_field_var = fields_regressors[fname].predict(
rescaled_X[i_sample, :].reshape(1, -1),return_var=True)
else:
#Recovery only of prediction mean
rescaled_Y_field=fields_regressors[fname].predict(
rescaled_X[i_sample, :].reshape(1, -1),return_var=False)
y = scaler_Y_fields[fname].inverse_transform(
rescaled_Y_field)
local_predicted_proj_out_fields = pod_proj_out_fields[fname].inverse_transform(y).reshape(-1,1)
pred = local_inv_proj_operator.dot(
local_predicted_proj_out_fields).squeeze()
predicted_out_fields[fname].append(pred)
if predict_uncertainties:
if (n_Monte_Carlo_samples==0):
# Analytical variance
nb_modes=rescaled_Y_field_var.shape[1]
K_GP=np.zeros((nb_modes, nb_modes))
np.fill_diagonal(K_GP,rescaled_Y_field_var) # GP covariance prediction
Sigma=np.zeros((nb_modes,nb_modes))
np.fill_diagonal(Sigma,np.sqrt(scaler_Y_fields[fname].var_))
K_rescale=np.dot(Sigma,np.dot(K_GP,Sigma)) # rescaling
L = np.transpose(pod_proj_out_fields[fname].training_data["reducedOrderBasis"]) # inverse POD operator
Phibarre_L=local_inv_proj_operator.dot(L) # projection operator multiplied by POD operator
var=np.zeros((Phibarre_L.shape[0],1))
for i in range(var.shape[0]): # prediction variance on each node
Phibarre_L_i=Phibarre_L[i,:].reshape(1,-1)
var[i,0]=np.dot(Phibarre_L_i,np.dot(K_rescale,Phibarre_L_i.T))[0,0]
predicted_out_fields_variance[fname].append(
var.squeeze()) # field variance on nodes
else:
# Monte Carlo variance with online formula
mean=np.zeros((n_nodes,1))
S2=np.zeros((n_nodes,1))
count=0
for i in range(n_Monte_Carlo_samples):
rescaled_Y_field_sample=fields_regressors[fname].predict_Monte_Carlo_draw(
X=rescaled_X[i_sample,:].reshape(1,-1)) # metamodel simulation
y_train_sample = scaler_Y_fields[fname].inverse_transform(
rescaled_Y_field_sample) # rescaling
local_predicted_proj_out_fields_sample = pod_proj_out_fields[fname].inverse_transform(
y_train_sample) # inverse POD
predicted_out_fields_sample = local_inv_proj_operator.dot(
np.transpose(local_predicted_proj_out_fields_sample)) # projection on mesh
count=count+1
delta1=predicted_out_fields_sample-mean
mean=mean+(delta1)/count
delta2=predicted_out_fields_sample-mean
S2=S2+delta1*delta2 # variance update
S2=S2/(n_Monte_Carlo_samples) # field variance on nodes
predicted_out_fields_variance[fname].append(
S2.squeeze())
predicted_out_fields_quantile0_025[fname].append(
pred-1.96*np.sqrt(predicted_out_fields_variance[fname][i_sample])
)
predicted_out_fields_quantile0_975[fname].append(
pred+1.96*np.sqrt(predicted_out_fields_variance[fname][i_sample])
)
predicted_data["referenceOutFields"] = reference_out_fields
predicted_data["predictedOutFields"] = predicted_out_fields
if verbose:
print("Predicting Scalars:")
reference_out_scalars = {}
predicted_out_scalars = {}
if predict_uncertainties:
predicted_out_scalars_variance = {}
predicted_out_scalars_quantile0_025 = {}
predicted_out_scalars_quantile0_975 = {}
for sname in out_scalars_names:
reference_out_scalars[sname] = []
predicted_out_scalars[sname] = []
if predict_uncertainties:
predicted_out_scalars_variance[sname] = []
predicted_out_scalars_quantile0_025[sname] = []
predicted_out_scalars_quantile0_975[sname] = []
for i_sample in tqdm(range(n_samples), disable=not (verbose)):
for sname in out_scalars_names:
# Store reference
ref = dataset[i_sample].get_scalar(sname)
reference_out_scalars[sname].append(ref)
# Inverse rescale the scalar outputs
if (predict_uncertainties and n_Monte_Carlo_samples==0):
#Recovery of prediction mean and standard deviation
rescaled_Y_scalar,rescaled_Y_scalar_var = scalars_regressors[sname].predict(
rescaled_X[i_sample, :].reshape(1, -1),return_var=True)
rescaled_Y_scalar_var=rescaled_Y_scalar_var[0,0]
else:
#Recovery only of prediction mean
rescaled_Y_scalar = scalars_regressors[sname].predict(
rescaled_X[i_sample, :].reshape(1, -1),return_var=False)
pred = scaler_Y_scalars[sname].inverse_transform(
rescaled_Y_scalar)[0, 0]
predicted_out_scalars[sname].append(pred)
if predict_uncertainties:
if (n_Monte_Carlo_samples==0):
# Analytical variance
sigma2=scaler_Y_scalars[sname].var_[0]
v_rescale=sigma2*rescaled_Y_scalar_var # rescaling
predicted_out_scalars_variance[sname].append(
v_rescale) # variance of scalar output
else:
# Monte Carlo variance with online formula
mean=0
S2=0
count=0
for i in range(n_Monte_Carlo_samples):
rescaled_Y_scalar_sample=scalars_regressors[sname].predict_Monte_Carlo_draw(
X=rescaled_X[i_sample,:].reshape(1,-1)) # metamodel simulation
predicted_out_scalars_sample = scaler_Y_scalars[sname].inverse_transform(
rescaled_Y_scalar_sample)[0,0] # rescaling
count=count+1
delta1=predicted_out_scalars_sample-mean
mean=mean+(delta1)/count
delta2=predicted_out_scalars_sample-mean
S2=S2+delta1*delta2 # variance update
S2=S2/(n_Monte_Carlo_samples) # variance on nodes
predicted_out_scalars_variance[sname].append(
S2)
predicted_out_scalars_quantile0_025[sname].append(
pred-1.96*np.sqrt(predicted_out_scalars_variance[sname][i_sample])
)
predicted_out_scalars_quantile0_975[sname].append(
pred+1.96*np.sqrt(predicted_out_scalars_variance[sname][i_sample])
)
predicted_data["referenceOutScalars"] = reference_out_scalars
predicted_data["predictedOutScalars"] = predicted_out_scalars
if predict_uncertainties:
predicted_data["predictedOutFieldsVariance"] = predicted_out_fields_variance
predicted_data["predictedOutFieldsQuantile0_025"] = predicted_out_fields_quantile0_025
predicted_data["predictedOutFieldsQuantile0_975"] = predicted_out_fields_quantile0_975
predicted_data["predictedOutScalarsVariance"] = predicted_out_scalars_variance
predicted_data["predictedOutScalarsQuantile0_025"] = predicted_out_scalars_quantile0_025
predicted_data["predictedOutScalarsQuantile0_975"] = predicted_out_scalars_quantile0_975
# Save the predicted fields data
with open(generated_data_folder+os.sep+"allPredictedData.pkl", 'wb') as file:
pickle.dump(predicted_data, file)
return predicted_data