import numpy as np
import numpy.typing as npt
from pythonsi.node import Data
from typing import Optional, Literal, Tuple
from pythonsi.util import intersect, solve_quadratic_inequality
import warnings
[docs]
class SequentialFeatureSelection:
r"""Sequential feature selection with selective inference support.
This class performs sequential feature selection in a greedy manner,
either by adding features (forward selection) or removing them
(backward selection) to construct a subset of relevant features.
The number of selected features can be determined automatically using
model selection criteria such as AIC, BIC, or adjusted R².
In addition, the class provides support for performing selective
inference on the chosen feature set, allowing valid statistical
inference after the selection process.
The selection problem for each step is to find the feature that
maximizes the fit improvement (e.g., minimizes the residual sum of squares).
For forward selection, at step $k$, the next feature $j$ is chosen to
solve:
.. math::
j = \mathop{\arg \min}_{j \notin \mathcal{A}_{k-1}} \quad \|\mathbf{y} - \mathbf{x}_{\mathcal{A}_{k-1} \cup \{j\}}\boldsymbol{\beta}_{k-1}\|_2^2
where $\mathcal{A}_{k-1}$ is the active set from the previous step.
Parameters
----------
n_features_to_select : int, optional
Number of features to select. If `None`, the number of features is
determined automatically by the `criterion`. Default is `None`.
direction : Literal["forward", "backward"], optional
The direction of the selection process. 'forward' adds features
and 'backward' removes them. Default is 'forward'.
criterion : Literal["aic", "bic", "adj_r2"], optional
The model selection criterion to use. If `None`, the selection
stops when `n_features_to_select` is reached. Default is `None`.
Attributes
----------
x_node : Data or None
Input feature matrix node
y_node : Data or None
Input response vector node
n_features_to_select : int or None
Number of features to select, as specified by the user.
n_features_to_select_ : int or None
The actual number of features selected. This may differ from
`n_features_to_select` if a criterion is used.
direction : Literal["forward", "backward"]
The direction of feature selection.
criterion : Literal["aic", "bic", "adj_r2"] or None
The criterion used for automatic selection.
active_set_node : Data
Output node containing selected feature indices.
active_set : array-like or None
Indices of the selected features.
interval : list or None
Feasible interval from the last inference call.
"""
def __init__(
self,
n_features_to_select: Optional[int] = None,
direction: Literal["forward", "backward"] = "forward",
criterion: Optional[Literal["aic", "bic", "adj_r2"]] = None,
):
if criterion is not None and n_features_to_select is not None:
warnings.warn(
"'n_features_to_select' is ignored when 'criterion' is set. "
"The selection process will stop based on the chosen criterion."
)
# # Input for Sequential Selection
self.x_node = None
self.y_node = None
self.sigma = None
self.n_features_to_select = n_features_to_select
self.n_features_to_select_ = None
self.direction = direction
self.criterion = criterion
# # Output for Sequential Selection
self.active_set_node = Data(self)
self.interval = None
self.active_set = None
def __call__(self):
r"""Execute sequential feature selection on stored data.
Returns
-------
active_set : array-like, shape (k,)
Indices of selected features.
"""
x = self.x_node()
y = self.y_node()
active_set = self.forward(x=x, y=y, sigma=self.covariance)
self.active_set_node.update(active_set)
return active_set
[docs]
def run(self, x: Data, y: Data, covariance: Optional[Data] = None) -> Data:
r"""Configure sequential selection with input data and return active set node.
Parameters
----------
x : array-like, shape (n, p)
Feature matrix.
y : array-like, shape (n, 1)
Response vector.
covariance : array-like, shape (n, n), optional
Covariance matrix for the residuals.
Returns
-------
active_set_node : Data
Node containing selected feature indices.
"""
self.covariance = covariance
self.x_node = x
self.y_node = y
return self.active_set_node
[docs]
def forward(self, x, y, sigma=None):
r"""Fit the sequential feature selection model.
Performs the sequential selection process based on the configured
`direction` and either the `n_features_to_select` or the `criterion`.
Parameters
----------
X : array-like, shape (n, p)
Feature matrix.
y : array-like, shape (n, 1)
Response vector.
sigma : array-like, shape (n, n), optional
Covariance matrix of the residuals.
"""
self.fit(x, y, sigma)
return self.active_set
[docs]
def fit(
self,
X,
y,
sigma=None,
):
r"""Fit the sequential feature selection model.
Performs the sequential selection process based on the configured
`direction` and either the `n_features_to_select` or the `criterion`.
Parameters
----------
X : array-like, shape (n, p)
Feature matrix.
y : array-like, shape (n, 1)
Response vector.
sigma : array-like, shape (n, n), optional
Covariance matrix of the residuals.
"""
n_samples, n_features = X.shape
if self.criterion is not None:
if sigma is None:
sigma = np.eye(n_samples)
if self.criterion == "aic":
self.active_set = self._AIC(y, X, sigma, self.direction)
elif self.criterion == "bic":
self.active_set = self._BIC(y, X, sigma, self.direction)
elif self.criterion == "adj_r2":
self.active_set = self._adjR2(y, X, self.direction)
else:
if self.n_features_to_select is None:
self.n_features_to_select = n_features // 2
if self.n_features_to_select >= n_features:
raise ValueError("n_features_to_select must be < n_features.")
self.active_set, _ = self._stepwise_selection(
y, X, self.n_features_to_select, self.direction
)
self.n_features_to_select_ = len(self.active_set)
return self
[docs]
def inference(self, z: float) -> Tuple[list, npt.NDArray[np.floating]]:
r"""Find feasible interval of the Sequential Feature Selection for the parametrized data at z.
Calculates the feasible interval for the given parameter `z`,
ensuring the selected feature set remains the same within this interval.
The interval is determined by the stepwise selection process and, if
applicable, the chosen model selection criterion.
Parameters
----------
z : float
Inference parameter value.
Returns
-------
final_interval : list
Feasible interval [lower, upper] for z.
"""
if self.interval is not None and self.interval[0] <= z <= self.interval[1]:
self.active_set_node.parametrize(data=self.active_set_data)
return self.interval
x, _, _, interval_x = self.x_node.inference(z)
y, a, b, interval_y = self.y_node.inference(z)
active_set = self.forward(x, y)
lst_SELECk, lst_P = self.list_residualvec(x, y)
self.active_set_node.parametrize(data=active_set)
itvCriterion = [-np.inf, np.inf]
if self.criterion is not None:
Sigmatilde = np.eye(x.shape[0])
if self.criterion == "aic":
itvCriterion = self.interval_AIC(
x, lst_P, len(self.active_set), a, b, Sigmatilde, z
)
elif self.criterion == "bic":
itvCriterion = self.interval_BIC(
x, lst_P, len(self.active_set), a, b, Sigmatilde, z
)
elif self.criterion == "adj_r2":
itvCriterion = self.interval_adjr2(
x, lst_P, len(self.active_set), a, b, z
)
itv_stepwise = self._interval_stepwisefs(
x, len(self.active_set), lst_SELECk, lst_P, a, b, z
)
final_interval = intersect(interval_x, interval_y)
final_interval = intersect(
final_interval, intersect(itvCriterion, itv_stepwise)
)
self.active_set_node.parametrize(data=active_set)
self.interval = final_interval
self.active_set_data = active_set
return final_interval
[docs]
def list_residualvec(self, X, Y) -> list:
r"""Generates a list of residual vectors during the selection process.
Returns
-------
lst_SELEC_k : list
A list of active sets at each step.
lst_Portho : list
A list of orthogonal projection matrices at each step.
"""
# Create 1 ... p matrixes which multiplies Y to get "residual vector of best k-subset"
lst_Portho = []
lst_SELEC_k = []
i = np.identity(Y.shape[0])
s = None
if self.direction == "forward":
s = 0
elif self.direction == "backward":
s = 1
for k in range(s, X.shape[1] + 1):
selec_k, _ = self._stepwise_selection(Y, X, k, self.direction)
lst_SELEC_k.append(selec_k)
X_Mk = X[:, selec_k].copy()
lst_Portho.append(
i - np.dot(np.dot(X_Mk, np.linalg.inv(np.dot(X_Mk.T, X_Mk))), X_Mk.T)
)
if self.direction == "backward":
lst_SELEC_k.reverse()
return lst_SELEC_k, lst_Portho
def _stepwise_selection(self, Y, X, k, direc):
r"""Performs a single stepwise feature selection process.
Parameters
----------
Y : array-like, shape (n, 1)
Response vector.
X : array-like, shape (n, p)
Feature matrix.
k : int
Number of features to select.
direc : Literal["forward", "backward"]
Direction of selection.
Returns
-------
selection : array-like, shape (k,)
Indices of the selected features.
rsdv : array-like
Residual vector.
"""
if direc == "forward":
selection = []
rest = list(range(X.shape[1]))
rss = np.linalg.norm(Y) ** 2
rsdv = None
for i in range(1, k + 1):
rss = np.inf
sele = selection.copy()
selection.append(None)
for feature in rest:
if feature not in selection:
# select necessary data
X_temp = X[:, sorted(sele + [feature])].copy()
# create linear model
# calculate rss of model
rss_temp, rsdv_temp = SequentialFeatureSelection.RSS(Y, X_temp)
# choose feature having minimum rss and append to selection
if rss > rss_temp:
rss = rss_temp
rsdv = rsdv_temp
selection.pop()
selection.append(feature)
return selection, rsdv
elif direc == "backward":
p = X.shape[1]
selection = list(range(p))
if k == p:
rss, rsdv = SequentialFeatureSelection.RSS(Y, X)
return selection, rsdv
for i in range(p - k):
rss = np.inf
for j in selection:
sele = [x for x in selection if x != j]
X_temp = X[:, sele].copy()
rss_temp, rsdv_temp = SequentialFeatureSelection.RSS(Y, X_temp)
if rss > rss_temp:
rss = rss_temp
rsdv = rsdv_temp
jselec = j
selection = [x for x in selection if x != jselec]
return selection, rsdv
else:
raise TypeError("Direc must be in ['forward', 'backward']")
def _AIC(self, Y, X, Sigma, direc):
r"""Performs selection based on the AIC criterion.
Returns
-------
bset : array-like
Indices of the selected features.
"""
AIC = np.inf
n, p = X.shape
for i in range(1, p + 1):
sset, rsdv = self._stepwise_selection(Y, X, i, direc)
aic = rsdv.T.dot(Sigma.dot(rsdv)) + 2 * i
if aic < AIC:
bset = sset
AIC = aic
return bset
def _BIC(self, Y, X, Sigma, direc):
r"""Performs selection based on the BIC criterion.
Returns
-------
bset : array-like
Indices of the selected features.
"""
BIC = np.inf
n, p = X.shape
for i in range(1, p + 1):
sset, rsdv = self._stepwise_selection(Y, X, i, direc)
bic = rsdv.T.dot(Sigma.dot(rsdv)) + i * np.log(n)
if bic < BIC:
bset = sset
BIC = bic
return bset
def _adjR2(self, Y, X, direc):
r"""Performs selection based on the adjusted R-squared criterion.
Returns
-------
bset : array-like
Indices of the selected features.
"""
AdjR2 = -np.inf
n, p = X.shape
TSS = np.linalg.norm(Y - np.mean(Y)) ** 2
for i in range(1, p + 1):
sset, rsdv = self._stepwise_selection(Y, X, i, direc)
RSS = np.linalg.norm(rsdv) ** 2
adjr2 = 1 - (RSS / (n - i - 1)) / (TSS / (n - 1))
if adjr2 > AdjR2:
bset = sset
AdjR2 = adjr2
return bset
def _interval_stepwisefs(self, X, K, lst_SELEC_k, lst_Portho, a, b, z):
r"""Calculates the feasible interval for the stepwise selection.
Returns
-------
intervals : list
The feasible interval.
"""
n_sample, n_fea = X.shape
intervals = [-np.inf, np.inf]
if self.direction == "forward":
i = np.identity(n_sample)
for step in range(1, K + 1):
jk = lst_SELEC_k[-1]
# print('----',lst_SELEC_k[step])
X_jk = X[:, sorted(lst_SELEC_k[step])].copy()
Pjk = i - np.dot(
np.dot(X_jk, np.linalg.inv(np.dot(X_jk.T, X_jk))), X_jk.T
)
Pjk_a = Pjk.dot(a)
Pjk_b = Pjk.dot(b)
for j in range(n_fea):
if j not in lst_SELEC_k[step]:
Mj = lst_SELEC_k[step - 1] + [j]
X_j = X[:, sorted(Mj)].copy()
Pj = i - np.dot(
np.dot(X_j, np.linalg.inv(np.dot(X_j.T, X_j))), X_j.T
)
Pj_a = Pj.dot(a)
Pj_b = Pj.dot(b)
g1 = Pjk_a.T.dot(Pjk_a) - Pj_a.T.dot(Pj_a)
g2 = (
Pjk_a.T.dot(Pjk_b)
+ Pjk_b.T.dot(Pjk_a)
- Pj_a.T.dot(Pj_b)
- Pj_b.T.dot(Pj_a)
)
g3 = Pjk_b.T.dot(Pjk_b) - Pj_b.T.dot(Pj_b)
g1, g2, g3 = g1.item(), g2.item(), g3.item()
itv = solve_quadratic_inequality(g3, g2, g1, z)
intervals = intersect(intervals, itv)
return intervals
elif self.direction == "backward":
i = np.identity(n_sample)
for step in range(1, n_fea - K + 1):
jk = [x for x in lst_SELEC_k[step - 1] if x not in lst_SELEC_k[step]]
jk = jk[0]
X_jk = X[:, lst_SELEC_k[step]].copy()
Pjk = i - np.dot(
np.dot(X_jk, np.linalg.inv(np.dot(X_jk.T, X_jk))), X_jk.T
)
Pjk_a = Pjk.dot(a)
Pjk_b = Pjk.dot(b)
for j in lst_SELEC_k[step - 1]:
if j != jk:
Mj = [i for i in lst_SELEC_k[step - 1] if i != j]
X_j = X[:, Mj].copy()
Pj = i - np.dot(
np.dot(X_j, np.linalg.inv(np.dot(X_j.T, X_j))), X_j.T
)
Pj_a = Pj.dot(a)
Pj_b = Pj.dot(b)
g1 = Pjk_a.T.dot(Pjk_a) - Pj_a.T.dot(Pj_a)
g2 = (
Pjk_a.T.dot(Pjk_b)
+ Pjk_b.T.dot(Pjk_a)
- Pj_a.T.dot(Pj_b)
- Pj_b.T.dot(Pj_a)
)
g3 = Pjk_b.T.dot(Pjk_b) - Pj_b.T.dot(Pj_b)
g1, g2, g3 = g1.item(), g2.item(), g3.item()
itv = solve_quadratic_inequality(g3, g2, g1, z)
intervals = intersect(intervals, itv)
return intervals
[docs]
def interval_AIC(self, X, Portho, K, a, b, Sigma, z):
r"""Calculates the feasible interval for the AIC criterion.
Returns
-------
intervals : list
The feasible interval.
"""
n_sample, n_fea = X.shape
if self.direction == "forward":
iteration = range(1, n_fea + 1)
K = K
elif self.direction == "backward":
iteration = range(0, n_fea)
K = K - 1
Pka = Portho[K].dot(a)
Pkb = Portho[K].dot(b)
intervals = [-np.inf, np.inf]
for step in iteration:
if step != K:
Pja = Portho[step].dot(a)
Pjb = Portho[step].dot(b)
g1 = (
Pka.T.dot(Sigma.dot(Pka))
- Pja.T.dot(Sigma.dot(Pja))
+ 2 * (K - step)
)
g2 = (
Pka.T.dot(Sigma.dot(Pkb))
+ Pkb.T.dot(Sigma.dot(Pka))
- Pja.T.dot(Sigma.dot(Pjb))
- Pjb.T.dot(Sigma.dot(Pja))
)
g3 = Pkb.T.dot(Sigma.dot(Pkb)) - Pjb.T.dot(Sigma.dot(Pjb))
g1, g2, g3 = g1.item(), g2.item(), g3.item()
itv = solve_quadratic_inequality(g3, g2, g1, z)
intervals = intersect(intervals, itv)
return intervals
[docs]
def interval_BIC(self, X, Portho, K, a, b, Sigma, z):
r"""Calculates the feasible interval for the BIC criterion.
Returns
-------
intervals : list
The feasible interval.
"""
n_sample, n_fea = X.shape
if self.direction == "forward":
iteration = range(1, n_fea + 1)
K = K
elif self.direction == "backward":
iteration = range(0, n_fea)
K = K - 1
Pka = Portho[K].dot(a)
Pkb = Portho[K].dot(b)
intervals = [-np.inf, np.inf]
for step in iteration:
if step != K:
Pja = Portho[step].dot(a)
Pjb = Portho[step].dot(b)
g1 = (
Pka.T.dot(Sigma.dot(Pka))
- Pja.T.dot(Sigma.dot(Pja))
+ np.log(n_sample) * (K - step)
)
g2 = (
Pka.T.dot(Sigma.dot(Pkb))
+ Pkb.T.dot(Sigma.dot(Pka))
- Pja.T.dot(Sigma.dot(Pjb))
- Pjb.T.dot(Sigma.dot(Pja))
)
g3 = Pkb.T.dot(Sigma.dot(Pkb)) - Pjb.T.dot(Sigma.dot(Pjb))
g1, g2, g3 = g1.item(), g2.item(), g3.item()
itv = solve_quadratic_inequality(g3, g2, g1, z)
intervals = intersect(intervals, itv)
return intervals
[docs]
def interval_adjr2(self, X, Portho, K, a, b, z):
r"""Calculates the feasible interval for the adjusted R-squared criterion.
Returns
-------
intervals : list
The feasible interval.
"""
n_sample, n_fea = X.shape
ljk = 1 / (n_sample - K - 1)
if self.direction == "forward":
iteration = range(1, n_fea + 1)
K = K
elif self.direction == "backward":
iteration = range(0, n_fea)
K = K - 1
Pka = Portho[K].dot(a)
Pkb = Portho[K].dot(b)
intervals = [-np.inf, np.inf]
for step in iteration:
if step != K:
lj = 1 / (n_sample - step - 1 - 1)
Pja = Portho[step].dot(a)
Pjb = Portho[step].dot(b)
g1 = ljk * Pka.T.dot(Pka) - lj * Pja.T.dot(Pja)
g2 = (
ljk * Pka.T.dot(Pkb)
+ ljk * Pkb.T.dot(Pka)
- lj * Pja.T.dot(Pjb)
- lj * Pjb.T.dot(Pja)
)
g3 = ljk * Pkb.T.dot(Pkb) - lj * Pjb.T.dot(Pjb)
g1, g2, g3 = g1.item(), g2.item(), g3.item()
itv = solve_quadratic_inequality(g3, g2, g1, z)
intervals = intersect(intervals, itv)
return intervals