Skip to content

Pipelines

Pipelines are an integral part of river. We encourage their usage and apply them in many of their examples.

The compose.Pipeline contains all the logic for building and applying pipelines. A pipeline is essentially a list of estimators that are applied in sequence. The only requirement is that the first n - 1 steps be transformers. The last step can be a regressor, a classifier, a clusterer, a transformer, etc. Here is an example:

from river import compose
from river import linear_model
from river import preprocessing
from river import feature_extraction

model = compose.Pipeline(
    preprocessing.StandardScaler(),
    feature_extraction.PolynomialExtender(),
    linear_model.LinearRegression()
)

You can also use the | operator, as so:

model = (
    preprocessing.StandardScaler() |
    feature_extraction.PolynomialExtender() |
    linear_model.LinearRegression()
)

Or, equally:

model = preprocessing.StandardScaler() 
model |= feature_extraction.PolynomialExtender()
model |= linear_model.LinearRegression()

A pipeline has a draw method that can be used to visualize it:

model
StandardScaler
{'counts': Counter(), 'means': defaultdict(<class 'float'>, {}), 'vars': defaultdict(<class 'float'>, {}), 'with_std': True}
PolynomialExtender
{'bias_name': 'bias', 'degree': 2, 'include_bias': False, 'interaction_only': False}
LinearRegression
{'_weights': {}, '_y_name': None, 'clip_gradient': 1000000000000.0, 'initializer': Zeros (), 'intercept': 0.0, 'intercept_init': 0.0, 'intercept_lr': Constant({'learning_rate': 0.01}), 'l1': 0.0, 'l2': 0.0, 'loss': Squared({}), 'optimizer': SGD({'lr': Constant({'learning_rate': 0.01}), 'n_iterations': 0})}

compose.Pipeline inherits from base.Estimator, which means that it has a learn_one method. You would expect learn_one to update each estimator, but that's not actually what happens. Instead, the transformers are updated when predict_one (or predict_proba_one for that matter) is called. Indeed, in online machine learning, we can update the unsupervised parts of our model when a sample arrives. We don't have to wait for the ground truth to arrive in order to update unsupervised estimators that don't depend on it. In other words, in a pipeline, learn_one updates the supervised parts, whilst predict_one updates the unsupervised parts. It's important to be aware of this behavior, as it is quite different to what is done in other libraries that rely on batch machine learning.

Here is a small example to illustrate the previous point:

from river import datasets

dataset = datasets.TrumpApproval()
x, y = next(iter(dataset))
x, y
({'ordinal_date': 736389,
  'gallup': 43.843213,
  'ipsos': 46.19925042857143,
  'morning_consult': 48.318749,
  'rasmussen': 44.104692,
  'you_gov': 43.636914000000004},
 43.75505)

Let us call predict_one, which will update each transformer, but won't update the linear regression.

model.predict_one(x)
0.0

The prediction is nil because each weight of the linear regression is equal to 0.

model['StandardScaler'].means
defaultdict(float,
            {'ordinal_date': 736389.0,
             'gallup': 43.843213,
             'ipsos': 46.19925042857143,
             'morning_consult': 48.318749,
             'rasmussen': 44.104692,
             'you_gov': 43.636914000000004})

As we can see, the means of each feature have been updated, even though we called predict_one and not learn_one.

Note that if you call transform_one with a pipeline who's last step is not a transformer, then the output from the last transformer (which is thus the penultimate step) will be returned:

model.transform_one(x)
{'ordinal_date': 0.0,
 'gallup': 0.0,
 'ipsos': 0.0,
 'morning_consult': 0.0,
 'rasmussen': 0.0,
 'you_gov': 0.0,
 'ordinal_date*ordinal_date': 0.0,
 'gallup*ordinal_date': 0.0,
 'ipsos*ordinal_date': 0.0,
 'morning_consult*ordinal_date': 0.0,
 'ordinal_date*rasmussen': 0.0,
 'ordinal_date*you_gov': 0.0,
 'gallup*gallup': 0.0,
 'gallup*ipsos': 0.0,
 'gallup*morning_consult': 0.0,
 'gallup*rasmussen': 0.0,
 'gallup*you_gov': 0.0,
 'ipsos*ipsos': 0.0,
 'ipsos*morning_consult': 0.0,
 'ipsos*rasmussen': 0.0,
 'ipsos*you_gov': 0.0,
 'morning_consult*morning_consult': 0.0,
 'morning_consult*rasmussen': 0.0,
 'morning_consult*you_gov': 0.0,
 'rasmussen*rasmussen': 0.0,
 'rasmussen*you_gov': 0.0,
 'you_gov*you_gov': 0.0}

In many cases, you might want to connect a step to multiple steps. For instance, you might to extract different kinds of features from a single input. An elegant way to do this is to use a compose.TransformerUnion. Essentially, the latter is a list of transformers who's results will be merged into a single dict when transform_one is called. As an example let's say that we want to apply a feature_extraction.RBFSampler as well as the feature_extraction.PolynomialExtender. This may be done as so:

model = (
    preprocessing.StandardScaler() |
    (feature_extraction.PolynomialExtender() + feature_extraction.RBFSampler()) |
    linear_model.LinearRegression()
)

model
StandardScaler
{'counts': Counter(), 'means': defaultdict(<class 'float'>, {}), 'vars': defaultdict(<class 'float'>, {}), 'with_std': True}
PolynomialExtender
{'bias_name': 'bias', 'degree': 2, 'include_bias': False, 'interaction_only': False}
RBFSampler
{'gamma': 1.0, 'n_components': 100, 'offsets': [3.4571730837031014, 5.766949316456484, 2.0272839798793933, 1.9975553064706302, 2.0994302517815595, 0.37524640271275156, 1.1434291813643873, 3.528911157519655, 0.584223930682899, 1.614148452756728, 3.9713605715699796, 5.535868738323768, 3.8989620730022643, 4.727525426722225, 0.01196083929421796, 0.7364834575308957, 3.3407639329925116, 5.985216574502856, 2.0093600921765673, 3.7453483084156143, 3.7494510930590823, 1.7229277545841801, 5.119870807980672, 2.3915182204515886, 5.800371128989329, 1.6791714500181927, 4.162141926231546, 4.979436669649198, 4.626542673523112, 6.155828022780694, 0.6790321427810038, 1.1539622064170492, 2.790606377085793, 0.21487722674054063, 5.037077772126967, 3.7557981959602196, 5.213006226602903, 4.553401560308722, 5.698678012890359, 2.24896444561786, 0.6726472933700277, 4.35716424515264, 0.33647695742986905, 5.833107591377312, 5.6477966900403995, 4.89155409961375, 4.427168344834377, 4.280254917401488, 1.3813425509717814, 4.281683365284319, 6.093733045822216, 4.808073049771093, 0.2655034639269169, 1.5770517297443285, 5.513594435037689, 3.705057632177666, 5.032726261066229, 3.4539843699442776, 5.601748818116984, 3.845290841365657, 2.7133030003126297, 2.4329375442776286, 3.291015779764899, 2.150775990461034, 5.737484737521754, 3.9359514464676777, 3.252753819933982, 1.654065551890587, 0.9201673788334243, 3.575252633222211, 4.420430686162939, 0.0835842115422514, 3.2857293330088315, 5.433539722187042, 5.043848582842625, 5.188297438444799, 3.5643140568285077, 3.438262531759355, 6.1048940936753695, 1.8415694024888738, 5.818024500597106, 0.9550623386809638, 5.247522744942794, 4.949811925888011, 2.9034369543775753, 2.5821047221168976, 1.1922955349765654, 6.207328372353932, 3.794945786830836, 3.745258320905564, 2.6445138498056804, 3.33340777843683, 2.320824029872465, 4.421188895789822, 3.2189149340165195, 4.723015288822798, 2.393461903923333, 1.056106262925579, 4.650642252696491, 4.09832796769197], 'rng': <random.Random object at 0x555f4c63bd50>, 'seed': None, 'weights': defaultdict(<bound method RBFSampler._random_weights of RBFSampler ( gamma=1. n_components=100 seed=None )>, {})}
LinearRegression
{'_weights': {}, '_y_name': None, 'clip_gradient': 1000000000000.0, 'initializer': Zeros (), 'intercept': 0.0, 'intercept_init': 0.0, 'intercept_lr': Constant({'learning_rate': 0.01}), 'l1': 0.0, 'l2': 0.0, 'loss': Squared({}), 'optimizer': SGD({'lr': Constant({'learning_rate': 0.01}), 'n_iterations': 0})}

Note that the + symbol acts as a shorthand notation for creating a compose.TransformerUnion, which means that we could have declared the above pipeline as so:

model = (
    preprocessing.StandardScaler() |
    compose.TransformerUnion(
        feature_extraction.PolynomialExtender(),
        feature_extraction.RBFSampler()
    ) |
    linear_model.LinearRegression()
)

Pipelines provide the benefit of removing a lot of cruft by taking care of tedious details for you. They also enable to clearly define what steps your model is made of. Finally, having your model in a single object means that you can move it around more easily. Note that you can include user-defined functions in a pipeline by using a compose.FuncTransformer.