Source code for mmgp.dimensionality_reduction

# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
#
from typing import Tuple

import numpy as np
from Muscat.FE import FETools as FT
from Muscat.Containers import Mesh
from scipy import sparse

[docs] available_dimensionality_reduction_algos = ["SnapshotPOD"]
"""List of available dimensionality reduction algorithms """
[docs] class DimensionalityReductor(object): """The DimensionalityReductor class is designed to perform dimensionality reduction on a set of data using different algorithms. """ def __init__(self, algo: str, options: dict, common_morphed_mesh: Mesh, input_or_output: str): """Initializes an instance of the DimensionalityReductor class. Args: algo (str): The name of the dimensionality reduction algorithm to use. options (dict[str: Union[int, str]]): A dictionary of options and settings for the algorithm with fields 'number_of_modes' and 'correlation_type'. common_morphed_mesh (Mesh): The mesh containing the data for dimensionality reduction. input_or_output (str): Specifies whether the data should be reduced based on input or output. Raises: AssertionError: If the specified algorithm is not supported. Example: .. code-block:: python from mmgp.dimensionality_reduction import DimensionalityReductor options = { 'number_of_modes': 8, 'correlation_type': 'mass_matrix' } dim_reductor = Regressor('SnapshotPOD', options) """ assert algo in available_dimensionality_reduction_algos, "Dimensionality reduction algo " + \ str(algo) + " not available"
[docs] self.algo: str = algo
[docs] self.options: dict = options
[docs] self.common_morphed_mesh: Mesh = common_morphed_mesh
[docs] self.input_or_output: str = input_or_output
[docs] self.training_data: dict = None
[docs] def fit_transform(self, snapshots: np.ndarray) -> np.ndarray: """Fits and transforms the dimensionality reduction model to the data. Args: snapshots (np.ndarray): The input data snapshots to be reduced. Returns: np.ndarray: The reduced data in the form of generalized coordinates. """ if self.algo == "SnapshotPOD": nb_modes = self.options["number_of_modes"] number_of_snapshots = snapshots.shape[0] number_of_dofs = snapshots.shape[1] if self.options["correlation_type"] == "mass_matrix": if self.input_or_output == 'input': correlation_operator = FT.ComputeL2ScalarProductMatrix( self.common_morphed_mesh, numberOfComponents=self.common_morphed_mesh.GetPointsDimensionality()) else: correlation_operator = FT.ComputeL2ScalarProductMatrix( self.common_morphed_mesh, numberOfComponents=1) else: correlation_operator = sparse.eye(number_of_dofs) correlation_matrix = np.zeros( (number_of_snapshots, number_of_snapshots)) mat_vec_products = np.zeros((number_of_dofs, number_of_snapshots)) for i, snapshot1 in enumerate(snapshots): mat_vec_product = correlation_operator.dot(snapshot1) mat_vec_products[:, i] = mat_vec_product for j, snapshot2 in enumerate(snapshots): if j <= i and j < number_of_snapshots: correlation_matrix[i, j] = np.dot( mat_vec_product, snapshot2) eigen_values_red, eigen_vectors_red = truncated_SVD_sym_lower( correlation_matrix, nb_modes=nb_modes) nbe_POD_modes = eigen_values_red.shape[0] change_of_basis_matrix = np.zeros( (nbe_POD_modes, number_of_snapshots)) for j in range(nbe_POD_modes): change_of_basis_matrix[j, :] = eigen_vectors_red[:, j] / np.sqrt(eigen_values_red[j]) reduced_order_basis = np.dot(change_of_basis_matrix, snapshots) generalized_coordinates = np.dot( reduced_order_basis, mat_vec_products).T self.training_data = { "reducedOrderBasis": reduced_order_basis, "correlationOperator": correlation_operator } return generalized_coordinates
[docs] def transform(self, snapshots: np.ndarray) -> np.ndarray: """Apply dimensionality reduction to a set of data 'snapshots' Args: snapshots (np.ndarray): The input data snapshots that needs to be transformed Returns: np.ndarray: The transformed data or dataset resulting from the dimensionality reduction operation. """ if self.algo == "SnapshotPOD": reduced_order_basis = self.training_data["reducedOrderBasis"] correlation_operator = self.training_data["correlationOperator"] return np.dot(reduced_order_basis, correlation_operator.dot(snapshots.T)).T
[docs] def inverse_transform( self, reduced_dimension_representation: np.ndarray) -> np.ndarray: """Reverse the dimensionality reduction transformation applied to a reduced-dimensional representation of data. Args: reduced_dimension_representation (np.ndarray): The reduced-dimensional representation of the data Returns: np.ndarray: The reconstructed data resulting from the inverse transformation. """ if self.algo == "SnapshotPOD": reduced_order_basis = self.training_data["reducedOrderBasis"] return np.dot(reduced_dimension_representation, reduced_order_basis)
[docs] def truncated_SVD_sym_lower(matrix: sparse.csr, epsilon: float = None, nb_modes: int = None) -> Tuple[np.ndarray, np.ndarray]: """ Computes a truncated singular value decomposition of a symetric definite matrix in scipy.sparse.csr format. Only the lower triangular part needs to be defined Parameters ---------- matrix : scipy.sparse.csr the input matrix epsilon : float the truncation tolerence, determining the number of keps eigenvalues nb_modes : int the number of keps eigenvalues Returns ------- np.ndarray kept eigenvalues, of size (numberOfEigenvalues) np.ndarray kept eigenvectors, of size (numberOfEigenvalues, numberOfSnapshots) """ if epsilon is not None and nb_modes is not None: # pragma: no cover raise ("cannot specify both epsilon and nb_modes") eigen_values, eigen_vectors = np.linalg.eigh(matrix, UPLO="L") idx = eigen_values.argsort()[::-1] eigen_values = eigen_values[idx] eigen_vectors = eigen_vectors[:, idx] if nb_modes is None: if epsilon is None: nb_modes = matrix.shape[0] else: nb_modes = 0 bound = (epsilon ** 2) * eigen_values[0] for e in eigen_values: if e > bound: nb_modes += 1 id_max2 = 0 bound = (1 - epsilon ** 2) * np.sum(eigen_values) temp = 0 for e in eigen_values: temp += e if temp < bound: id_max2 += 1 nb_modes = max(nb_modes, id_max2) if nb_modes > matrix.shape[0]: print("nb_modes taken to max possible value of " + str(matrix.shape[0]) + " instead of provided value " + str(nb_modes)) nb_modes = matrix.shape[0] index = np.where(eigen_values <= 0) if len(eigen_values[index]) > 0: if index[0][0] < nb_modes: print("removing numerical noise from eigenvalues, nb_modes is set to " + str(index[0][0]) + " instead of " + str(nb_modes)) nb_modes = index[0][0] return eigen_values[0:nb_modes], eigen_vectors[:, 0:nb_modes]