Source code for causallib.preprocessing.transformers
"""
(C) Copyright 2019 IBM Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import warnings
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.impute import SimpleImputer as skImputer
from ..utils.stat_utils import which_columns_are_binary
from causallib.estimation import Matching
# TODO: Entire module might be redundant, now that scikit-learn supports missing values
# in its preprocessing: https://scikit-learn.org/stable/whats_new/v0.20.html#highlights
# The only support now needed is:
# 1) Transforming from numpy-array to pandas DataFrame in a pipeline, before specifying a causal model.
# 2) Possible generic support for causallib's additional `a` parameter, along with `X` and `y`.
[docs]class StandardScaler(BaseEstimator, TransformerMixin):
"""
Standardize continuous features by removing the mean and scaling to unit variance while allowing nans.
X = (X - X.mean()) / X.std()
"""
def __init__(self, with_mean=True, with_std=True, ignore_nans=True):
"""
Args:
with_mean (bool): Whether to center the data before scaling.
with_std (bool): Whether to scale the data to unit variance.
ignore_nans (bool): Whether to ignore NaNs during calculation.
"""
self.with_mean = with_mean
self.with_std = with_std
self.ignore_nans = ignore_nans
[docs] def fit(self, X, y=None):
"""
Compute the mean and std to be used for later scaling.
Args:
X (pd.DataFrame): The data used to compute the mean and standard deviation used for later scaling along the
features axis (axis=0).
y: Passthrough for ``Pipeline`` compatibility.
Returns:
StandardScaler: A fitted standard-scaler
"""
continuous_features = self._get_relevant_features(X)
self._feature_mask_ = continuous_features
if self.with_mean:
means = X.loc[:, self._feature_mask_].mean(skipna=self.ignore_nans)
else:
means = pd.Series(0, index=continuous_features)
self.mean_ = means
if self.with_std:
scales = X.loc[:, self._feature_mask_].std(skipna=self.ignore_nans)
else:
scales = pd.Series(1, index=continuous_features)
self.scale_ = scales
return self
[docs] def transform(self, X, y='deprecated'):
"""
Perform standardization by centering and scaling
Args:
X (pd.DataFrame): array-like, shape [n_samples, n_features] The data used to compute the mean and standard
deviation used for later scaling along the features axis (axis=0).
y: Passthrough for ``Pipeline`` compatibility.X:
Returns:
pd.DataFrame: Scaled dataset.
"""
# Taken from the sklearn implementation. Will probably need adjustment when a new scikit-learn version is out:
if not isinstance(y, str) or y != 'deprecated':
warnings.warn("The parameter y on transform() is deprecated since 0.19 and will be removed in 0.21",
DeprecationWarning)
X = X.copy() # type: pd.DataFrame
if self.with_mean:
X.loc[:, self._feature_mask_] -= self.mean_
if self.with_std:
X.loc[:, self._feature_mask_] /= self.scale_
return X
[docs] def inverse_transform(self, X):
"""
Scale back the data to the original representation
Args:
X (pd.DataFrame): array-like, shape [n_samples, n_features] The data used to compute the mean and standard
deviation used for later scaling along the features axis (axis=0).
Returns:
pd.DataFrame: Un-scaled dataset.
"""
X = X.copy() # type: pd.DataFrame
if self.with_std:
X.loc[:, self._feature_mask_] *= self.scale_
if self.with_mean:
X.loc[:, self._feature_mask_] += self.mean_
return X
@staticmethod
def _get_relevant_features(X):
"""
Returns a binary mask specifying the continuous features to operate on.
Args:
X (pd.DataFrame): array-like, shape [n_samples, n_features] The data used to compute the mean and standard
deviation used for later scaling along the features axis (axis=0).
Returns:
pd.Index: a pd.Index with name of columns specifying which features to apply the transformation on.
"""
# FIXME utilize sklearn.utils.multiclass.type_of_target()
continuous_cols = X.columns[~which_columns_are_binary(X)]
return continuous_cols
[docs]class MinMaxScaler(BaseEstimator, TransformerMixin):
"""
Scales features to 0-1, allowing for NaNs.
X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
"""
def __init__(self, only_binary_features=True, ignore_nans=True):
"""
Args:
only_binary_features (bool): Whether to apply only on binary features or across all.
ignore_nans (bool): Whether to ignore NaNs during calculation.
"""
self.only_binary_features = only_binary_features
self.ignore_nans = ignore_nans
[docs] def fit(self, X, y=None):
"""
Compute the minimum and maximum to be used for later scaling.
Args:
X (pd.DataFrame): array-like, shape [n_samples, n_features] The data used to compute the mean and standard
deviation used for later scaling along the features axis (axis=0).
y: Passthrough for ``Pipeline`` compatibility.
Returns:
MinMaxScaler: a fitted MinMaxScaler
"""
feature_mask = self._get_relevant_features(X)
self._feature_mask_ = feature_mask
self.min_ = X.min(skipna=self.ignore_nans)[feature_mask]
self.max_ = X.max(skipna=self.ignore_nans)[feature_mask]
self.scale_ = self.max_ - self.min_
# if feature_mask.size != X.shape[1]:
# self.scale_[~feature_mask] = 1
# self.min_[~feature_mask] = 0
# self.max_[~feature_mask] = 1
return self
[docs] def inverse_transform(self, X):
"""
Scaling chosen features of X to the range of 0 - 1.
Args:
X (pd.DataFrame): array-like, shape [n_samples, n_features] Input data that will be transformed.
Returns:
pd.DataFrame: array-like, shape [n_samples, n_features]. Transformed data.
"""
# No warning for y, since there's no y variable.
# This correpsonds to function signature in scikit-learn's code base
X = X.copy() # type: pd.DataFrame
X.loc[:, self._feature_mask_] *= self.scale_
X.loc[:, self._feature_mask_] += self.min_
return X
[docs] def transform(self, X):
"""
Undo the scaling of X according to feature_range.
Args:
X (pd.DataFrame): array-like, shape [n_samples, n_features] Input data that will be transformed.
Returns:
pd.DataFrame: array-like, shape [n_samples, n_features]. Transformed data.
"""
X = X.copy() # type: pd.DataFrame
X.loc[:, self._feature_mask_] -= self.min_
X.loc[:, self._feature_mask_] /= self.scale_
return X
def _get_relevant_features(self, X):
"""
Returns a binary mask specifying the features to operate on (either all features or binary features if
self.only_binary_features is True.
Args:
X (pd.DataFrame): array-like, shape [n_samples, n_features] The data used to compute the mean and standard
deviation used for later scaling along the features axis (axis=0).
Returns:
pd.Index: a binary mask specifying which features to apply the transformation on.
"""
if self.only_binary_features:
feature_mask = which_columns_are_binary(X)
else:
feature_mask = np.ones(X.shape[1], dtype=bool)
return feature_mask
[docs]class Imputer(skImputer):
[docs] def transform(self, X):
X_transformed = super().transform(X.values)
X_transformed = pd.DataFrame(
X_transformed, index=X.index, columns=X.columns)
return X_transformed
[docs]class PropensityTransformer(BaseEstimator, TransformerMixin):
def __init__(self, learner, include_covariates=False):
"""Transform covariates by adding/replacing with the propensity score.
Args:
learner (sklearn.estimator) : A learner implementing `fit` and
`predict_proba` to use for predicting the propensity score.
include_covariates (bool) : Whether to return the original
covariates alongside the "propensity" column.
"""
self.include_covariates = include_covariates
self.learner = learner
[docs] def transform(self, X, treatment_values=None):
"""Append propensity or replace covariates with propensity.
Args:
X (pd.DataFrame): A DataFrame of samples to transform. This will be
input to the learner trained by fit. If the columns are
different, the results will not be valid.
treatment_values (Any | None): A desired value/s to extract
propensity to (i.e. probabilities to what treatment value
should be calculated). If not specified, then the maximal
treatment value is chosen. This is since the usual case is of
treatment (A=1) control (A=0) setting.
Returns:
pd.DataFrame : DataFrame with a "propensity" column.
If "include_covariates" is `True`, it will include all of the
original features plus "propensity", else it will only have the
"propensity" column.
"""
treatment_values = 1 if treatment_values is None else treatment_values
res = self.learner.predict_proba(X)[:, treatment_values]
res = pd.DataFrame(res, index=X.index, columns=["propensity"])
if self.include_covariates:
res = X.join(res)
return res
[docs]class MatchingTransformer(object):
def __init__(
self,
propensity_transform=None,
caliper=None,
with_replacement=True,
n_neighbors=1,
matching_mode="both",
metric="mahalanobis",
knn_backend="sklearn",
):
"""Transform data by removing poorly matched samples.
Args:
propensity_transform (causallib.transformers.PropensityTransformer):
an object for data preprocessing which adds the propensity
score as a feature (default: None)
caliper (float) : maximal distance for a match to be accepted. If
not defined, all matches will be accepted. If defined, some
samples may not be matched and their outcomes will not be
estimated. (default: None)
with_replacement (bool): whether samples can be used multiple times
for matching. If set to False, the matching process will optimize
the linear sum of distances between pairs of treatment and
control samples and only `min(N_treatment, N_control)` samples
will be estimated. Matching with no replacement does not make
use of the `fit` data and is therefore not implemented for
out-of-sample data (default: True)
n_neighbors (int) : number of nearest neighbors to include in match.
Must be 1 if `with_replacement` is `False.` If larger than 1, the
estimate is calculated using the `regress_agg_function` or
`classify_agg_function` across the `n_neighbors`. Note that when
the `caliper` variable is set, some samples will have fewer than
`n_neighbors` matches. (default: 1).
matching_mode (str) : Direction of matching: `treatment_to_control`,
`control_to_treatment` or `both` to indicate which set should
be matched to which. All sets are cross-matched in `match`
and when `with_replacement` is `False` all matching modes
coincide. With replacement there is a difference.
metric (str) : Distance metric string for calculating distance
between samples. Note: if an external built `knn_backend`
object with a different metric is supplied, `metric` needs to
be changed to reflect that, because `Matching` will set its
inverse covariance matrix if "mahalanobis" is set. (default:
"mahalanobis", also supported: "euclidean")
knn_backend (str or callable) : Backend to use for nearest neighbor
search. Options are "sklearn" or a callable which returns an
object implementing `fit`, `kneighbors` and `set_params`
like the sklearn `NearestNeighbors` object. (default: "sklearn").
"""
self.matching = Matching(
propensity_transform=propensity_transform,
caliper=caliper,
with_replacement=with_replacement,
n_neighbors=n_neighbors,
matching_mode=matching_mode,
metric=metric,
knn_backend=knn_backend,
)
[docs] def fit(self, X, a, y):
"""Fit data to transform
This function loads the data for matching and must be called before
`transform`. For convenience, consider using `fit_transform`.
Args:
X (pd.DataFrame): DataFrame of shape (n,m) containing m covariates
for n samples.
a (pd.Series): Series of shape (n,) containing discrete treatment
values for the n samples.
y (pd.Series): Series of shape (n,) containing outcomes for
the n samples.
Returns:
self (MatchingTransformer) : Fitted object
"""
self.matching.fit(X, a, y)
return self
[docs] def transform(self, X, a, y):
"""Transform data by restricting it to samples which are matched
Following a matching process, not all of the samples will find matches.
Transforming the data by only allowing samples in treatment that have
close matches in control, or in control that have close matches in
treatment can make other causal methods more effective. This function
will call `match` on the underlying Matching object.
The attribute `matching_mode` changes the behavior of this function.
If set to `control_to_treatment` each control will attempt to find a
match among the treated, hence the transformed data will have a maximum
size of N_c + min(N_c,N_t).
If set to `treatment_to_control`, each treatment will attempt to find a
match among the control and the transformed data will have a maximum
size of N_t + min(N_c,N_t).
If set to `both`, both matching operations will be executed and if a
sample succeeds in either direction it will be included, hence the
maximum size of the transformed data will be `len(X)`.
If `with_replacement` is `False`, `matching_mode` does not change the
behavior. There will be up to `min(N_c,N_t)` samples in
the returned DataFrame, regardless.
Args:
X (pd.DataFrame): DataFrame of shape (n,m) containing m covariates
for n samples.
a (pd.Series): Series of shape (n,) containing discrete treatment
values for the n samples.
y (pd.Series): Series of shape (n,) containing outcomes for
the n samples.
Raises:
NotImplementedError: Raised if a value of attribute `matching_mode`
other than the supported values is set.
Returns:
Xm (pd.DataFrame): Covariates of samples that were matched
am (pd.Series): Treatment values of samples that were matched
ym (pd.Series): Outcome values of samples that were matched
"""
self.matching.match(X, a, use_cached_result=True)
matched_sample_indices = self.find_indices_of_matched_samples(X, a)
X = X.loc[matched_sample_indices]
a = a.loc[matched_sample_indices]
y = y.loc[matched_sample_indices]
return X, a, y
[docs] def find_indices_of_matched_samples(self, X, a):
"""Find indices of samples which matched successfully.
Given a DataFrame of samples `X` and treatment assignments `a`, return
a list of indices of samples which matched successfully.
Args:
X (pd.DataFrame): Covariates of samples
a (pd.Series): Treatment assignments
Returns:
pd.Series: indices of matched samples to be passed to `X.loc`
"""
matching_weights = self.matching.matches_to_weights()
matches_mask = self._filter_matching_weights_by_mode(matching_weights)
return matches_mask
def _filter_matching_weights_by_mode(self, matching_weights):
if self.matching.matching_mode == "control_to_treatment":
matches_mask = matching_weights.control_to_treatment
elif self.matching.matching_mode == "treatment_to_control":
matches_mask = matching_weights.treatment_to_control
elif self.matching.matching_mode == "both":
matches_mask = matching_weights.sum(axis=1)
else:
raise NotImplementedError("Matching mode {} not supported".format(
self.matching.matching_mode))
matches_mask = matches_mask.astype(bool)
return matches_mask
[docs] def fit_transform(self, X, a, y):
"""Match data and return matched subset.
This is a convenience method, calling `fit` and `transform` at once.
For details, see documentation of each function.
Args:
X (pd.DataFrame): DataFrame of shape (n,m) containing m covariates
for n samples.
a (pd.Series): Series of shape (n,) containing discrete treatment
values for the n samples.
y (pd.Series): Series of shape (n,) containing outcomes for
the n samples.
Returns:
Xm (pd.DataFrame): Covariates of samples that were matched
am (pd.Series): Treatment values of samples that were matched
ym (pd.Series): Outcome values of samples that were matched
"""
self.fit(X, a, y)
return self.transform(X, a, y)
[docs] def set_params(self, **kwargs):
"""Set parameters of matching engine. Supported parameters are:
Keyword Args:
propensity_transform (causallib.transformers.PropensityTransformer):
an object for data preprocessing which adds the propensity
score as a feature (default: None)
caliper (float) : maximal distance for a match to be accepted
(default: None)
with_replacement (bool): whether samples can be used multiple times
for matching (default: True)
n_neighbors (int) : number of nearest neighbors to include in match.
Must be 1 if `with_replacement` is False (default: 1).
matching_mode (str) : Direction of matching: `treatment_to_control`,
`control_to_treatment` or `both` to indicate which set should
be matched to which. All sets are cross-matched in `match`
and without replacement there is no difference in outcome,
but with replacement there is a difference and it impacts
the results of `transform`.
metric (str) : Distance metric string for calculating
distance between samples (default: "mahalanobis",
also supported: "euclidean")
knn_backend (str or callable) : Backend to use for nearest neighbor
search. Options are "sklearn" or a callable which returns an
object implementing `fit`, `kneighbors` and `set_params` like
the sklearn `NearestNeighbors` object. (default: "sklearn").
Returns:
self: (MatchingTransformer) object with new parameters set
"""
supported_params = [
"propensity_transform",
"caliper",
"n_neighbors",
"metric",
"with_replacement",
"matching_mode",
"knn_backend",
]
for key, value in kwargs.items():
if key in supported_params:
self.matching.__setattr__(key, value)
else:
warnings.warn(
"Received unsupported parameter: {}. Nothing done.".format(key))
return self