Source code for mmgp.dimensionality_reduction
# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
#
from typing import Tuple
import numpy as np
from Muscat.FE import FETools as FT
from Muscat.Containers import Mesh
from scipy import sparse
"""List of available dimensionality reduction algorithms
"""
[docs]
class DimensionalityReductor(object):
"""The DimensionalityReductor class is designed to perform dimensionality reduction
on a set of data using different algorithms.
"""
def __init__(self, algo: str, options: dict,
common_morphed_mesh: Mesh, input_or_output: str):
"""Initializes an instance of the DimensionalityReductor class.
Args:
algo (str): The name of the dimensionality reduction algorithm to use.
options (dict[str: Union[int, str]]): A dictionary of options and settings for the algorithm with fields 'number_of_modes' and 'correlation_type'.
common_morphed_mesh (Mesh): The mesh containing the data for dimensionality reduction.
input_or_output (str): Specifies whether the data should be reduced based on input or output.
Raises:
AssertionError: If the specified algorithm is not supported.
Example:
.. code-block:: python
from mmgp.dimensionality_reduction import DimensionalityReductor
options = { 'number_of_modes': 8, 'correlation_type': 'mass_matrix' }
dim_reductor = Regressor('SnapshotPOD', options)
"""
assert algo in available_dimensionality_reduction_algos, "Dimensionality reduction algo " + \
str(algo) + " not available"
[docs]
def fit_transform(self, snapshots: np.ndarray) -> np.ndarray:
"""Fits and transforms the dimensionality reduction model to the data.
Args:
snapshots (np.ndarray): The input data snapshots to be reduced.
Returns:
np.ndarray: The reduced data in the form of generalized coordinates.
"""
if self.algo == "SnapshotPOD":
nb_modes = self.options["number_of_modes"]
number_of_snapshots = snapshots.shape[0]
number_of_dofs = snapshots.shape[1]
if self.options["correlation_type"] == "mass_matrix":
if self.input_or_output == 'input':
correlation_operator = FT.ComputeL2ScalarProductMatrix(
self.common_morphed_mesh, numberOfComponents=self.common_morphed_mesh.GetPointsDimensionality())
else:
correlation_operator = FT.ComputeL2ScalarProductMatrix(
self.common_morphed_mesh, numberOfComponents=1)
else:
correlation_operator = sparse.eye(number_of_dofs)
correlation_matrix = np.zeros(
(number_of_snapshots, number_of_snapshots))
mat_vec_products = np.zeros((number_of_dofs, number_of_snapshots))
for i, snapshot1 in enumerate(snapshots):
mat_vec_product = correlation_operator.dot(snapshot1)
mat_vec_products[:, i] = mat_vec_product
for j, snapshot2 in enumerate(snapshots):
if j <= i and j < number_of_snapshots:
correlation_matrix[i, j] = np.dot(
mat_vec_product, snapshot2)
eigen_values_red, eigen_vectors_red = truncated_SVD_sym_lower(
correlation_matrix, nb_modes=nb_modes)
nbe_POD_modes = eigen_values_red.shape[0]
change_of_basis_matrix = np.zeros(
(nbe_POD_modes, number_of_snapshots))
for j in range(nbe_POD_modes):
change_of_basis_matrix[j,
:] = eigen_vectors_red[:,
j] / np.sqrt(eigen_values_red[j])
reduced_order_basis = np.dot(change_of_basis_matrix, snapshots)
generalized_coordinates = np.dot(
reduced_order_basis, mat_vec_products).T
self.training_data = {
"reducedOrderBasis": reduced_order_basis,
"correlationOperator": correlation_operator
}
return generalized_coordinates
[docs]
def transform(self, snapshots: np.ndarray) -> np.ndarray:
"""Apply dimensionality reduction to a set of data 'snapshots'
Args:
snapshots (np.ndarray): The input data snapshots that needs to be transformed
Returns:
np.ndarray: The transformed data or dataset resulting from the dimensionality reduction operation.
"""
if self.algo == "SnapshotPOD":
reduced_order_basis = self.training_data["reducedOrderBasis"]
correlation_operator = self.training_data["correlationOperator"]
return np.dot(reduced_order_basis,
correlation_operator.dot(snapshots.T)).T
[docs]
def inverse_transform(
self, reduced_dimension_representation: np.ndarray) -> np.ndarray:
"""Reverse the dimensionality reduction transformation applied to a reduced-dimensional representation of data.
Args:
reduced_dimension_representation (np.ndarray): The reduced-dimensional representation of the data
Returns:
np.ndarray: The reconstructed data resulting from the inverse transformation.
"""
if self.algo == "SnapshotPOD":
reduced_order_basis = self.training_data["reducedOrderBasis"]
return np.dot(reduced_dimension_representation,
reduced_order_basis)
[docs]
def truncated_SVD_sym_lower(matrix: sparse.csr, epsilon: float = None,
nb_modes: int = None) -> Tuple[np.ndarray, np.ndarray]:
"""
Computes a truncated singular value decomposition of a symetric definite
matrix in scipy.sparse.csr format. Only the lower triangular part needs
to be defined
Parameters
----------
matrix : scipy.sparse.csr
the input matrix
epsilon : float
the truncation tolerence, determining the number of keps eigenvalues
nb_modes : int
the number of keps eigenvalues
Returns
-------
np.ndarray
kept eigenvalues, of size (numberOfEigenvalues)
np.ndarray
kept eigenvectors, of size (numberOfEigenvalues, numberOfSnapshots)
"""
if epsilon is not None and nb_modes is not None: # pragma: no cover
raise ("cannot specify both epsilon and nb_modes")
eigen_values, eigen_vectors = np.linalg.eigh(matrix, UPLO="L")
idx = eigen_values.argsort()[::-1]
eigen_values = eigen_values[idx]
eigen_vectors = eigen_vectors[:, idx]
if nb_modes is None:
if epsilon is None:
nb_modes = matrix.shape[0]
else:
nb_modes = 0
bound = (epsilon ** 2) * eigen_values[0]
for e in eigen_values:
if e > bound:
nb_modes += 1
id_max2 = 0
bound = (1 - epsilon ** 2) * np.sum(eigen_values)
temp = 0
for e in eigen_values:
temp += e
if temp < bound:
id_max2 += 1
nb_modes = max(nb_modes, id_max2)
if nb_modes > matrix.shape[0]:
print("nb_modes taken to max possible value of " +
str(matrix.shape[0]) +
" instead of provided value " +
str(nb_modes))
nb_modes = matrix.shape[0]
index = np.where(eigen_values <= 0)
if len(eigen_values[index]) > 0:
if index[0][0] < nb_modes:
print("removing numerical noise from eigenvalues, nb_modes is set to " +
str(index[0][0]) + " instead of " + str(nb_modes))
nb_modes = index[0][0]
return eigen_values[0:nb_modes], eigen_vectors[:, 0:nb_modes]