From batch to online/stream¶
A quick overview of batch learning¶
If you've already delved into machine learning, then you shouldn't have any difficulty in getting to use incremental learning. If you are somewhat new to machine learning, then do not worry! The point of this notebook in particular is to introduce simple notions. We'll also start to show how River fits in and explain how to use it.
The whole point of machine learning is to learn from data. In supervised learning you want to learn how to predict a target \(y\) given a set of features \(X\). Meanwhile in an unsupervised learning there is no target, and the goal is rather to identify patterns and trends in the features \(X\). At this point most people tend to imagine \(X\) as a somewhat big table where each row is an observation and each column is a feature, and they would be quite right. Learning from tabular data is part of what's called batch learning, which basically that all of the data is available to our learning algorithm at once. Multiple libraries have been created to handle the batch learning regime, with one of the most prominent being Python's scikit-learn.
As a simple example of batch learning let's say we want to learn to predict if a women has breast cancer or not. We'll use the breast cancer dataset available with scikit-learn. We'll learn to map a set of features to a binary decision using a logistic regression. Like many other models based on numerical weights, logistic regression is sensitive to the scale of the features. Rescaling the data so that each feature has mean 0 and variance 1 is generally considered good practice. We can apply the rescaling and fit the logistic regression sequentially in an elegant manner using a Pipeline. To measure the performance of the model we'll evaluate the average ROC AUC score using a 5 fold cross-validation.
from sklearn import datasets
from sklearn import linear_model
from sklearn import metrics
from sklearn import model_selection
from sklearn import pipeline
from sklearn import preprocessing
# Load the data
dataset = datasets.load_breast_cancer()
X, y = dataset.data, dataset.target
# Define the steps of the model
model = pipeline.Pipeline([
('scale', preprocessing.StandardScaler()),
('lin_reg', linear_model.LogisticRegression(solver='lbfgs'))
])
# Define a determistic cross-validation procedure
cv = model_selection.KFold(n_splits=5, shuffle=True, random_state=42)
# Compute the MSE values
scorer = metrics.make_scorer(metrics.roc_auc_score)
scores = model_selection.cross_val_score(model, X, y, scoring=scorer, cv=cv)
# Display the average score and its standard deviation
print(f'ROC AUC: {scores.mean():.3f} (± {scores.std():.3f})')
ROC AUC: 0.975 (± 0.011)
This might be a lot to take in if you're not accustomed to scikit-learn, but it probably isn't if you are. Batch learning basically boils down to:
- Loading (and preprocessing) the data
- Fitting a model to the data
- Computing the performance of the model on unseen data
This is pretty standard and is maybe how most people imagine a machine learning pipeline. However, this way of proceeding has certain downsides. First of all your laptop would crash if the load_boston
function returned a dataset who's size exceeds your available amount of RAM. Sometimes you can use some tricks to get around this. For example by optimizing the data types and by using sparse representations when applicable you can potentially save precious gigabytes of RAM. However, like many tricks this only goes so far. If your dataset weighs hundreds of gigabytes then you won't go far without some special hardware. One solution is to do out-of-core learning; that is, algorithms that can learn by being presented the data in chunks or mini-batches. If you want to go down this road then take a look at Dask and Spark's MLlib.
Another issue with the batch learning regime is that it can't elegantly learn from new data. Indeed if new data is made available, then the model has to learn from scratch with a new dataset composed of the old data and the new data. This is particularly annoying in a real situation where you might have new incoming data every week, day, hour, minute, or even second. For example if you're building a recommendation engine for an e-commerce app, then you're probably training your model from 0 every week or so. As your app grows in popularity, so does the dataset you're training on. This will lead to longer and longer training times and might require a hardware upgrade.
A final downside that isn't very easy to grasp concerns the manner in which features are extracted. Every time you want to train your model you first have to extract features. The trick is that some features might not be accessible at the particular point in time you are at. For example maybe that some attributes in your data warehouse get overwritten with time. In other words maybe that all the features pertaining to a particular observations are not available, whereas they were a week ago. This happens more often than not in real scenarios, and apart if you have a sophisticated data engineering pipeline then you will encounter these issues at some point.
A hands-on introduction to incremental learning¶
Incremental learning is also often called online learning or stream learning, but if you google online learning a lot of the results will point to educational websites. Hence, the terms "incremental learning" and "stream learning" (from which River derives its name) are preferred. The point of incremental learning is to fit a model to a stream of data. In other words, the data isn't available in its entirety, but rather the observations are provided one by one. As an example let's stream through the dataset used previously.
for xi, yi in zip(X, y):
# This is where the model learns
pass
In this case we're iterating over a dataset that is already in memory, but we could just as well stream from a CSV file, a Kafka stream, an SQL query, etc. If we look at xi
we can notice that it is a numpy.ndarray
.
xi
[1;35marray[0m[1m([0m[1m[[0m[1;36m7.760e+00[0m, [1;36m2.454e+01[0m, [1;36m4.792e+01[0m, [1;36m1.810e+02[0m, [1;36m5.263e-02[0m, [1;36m4.362e-02[0m,
[1;36m0.000e+00[0m, [1;36m0.000e+00[0m, [1;36m1.587e-01[0m, [1;36m5.884e-02[0m, [1;36m3.857e-01[0m, [1;36m1.428e+00[0m,
[1;36m2.548e+00[0m, [1;36m1.915e+01[0m, [1;36m7.189e-03[0m, [1;36m4.660e-03[0m, [1;36m0.000e+00[0m, [1;36m0.000e+00[0m,
[1;36m2.676e-02[0m, [1;36m2.783e-03[0m, [1;36m9.456e+00[0m, [1;36m3.037e+01[0m, [1;36m5.916e+01[0m, [1;36m2.686e+02[0m,
[1;36m8.996e-02[0m, [1;36m6.444e-02[0m, [1;36m0.000e+00[0m, [1;36m0.000e+00[0m, [1;36m2.871e-01[0m, [1;36m7.039e-02[0m[1m][0m[1m)[0m
River by design works with dict
s. We believe that dict
s are more enjoyable to program with than numpy.ndarray
s, at least for when single observations are concerned. dict
's bring the added benefit that each feature can be accessed by name rather than by position.
for xi, yi in zip(X, y):
xi = dict(zip(dataset.feature_names, xi))
pass
xi
[1m{[0m
[1;35mnp.str_[0m[1m([0m[32m'mean radius'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m7.76[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'mean texture'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m24.54[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'mean perimeter'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m47.92[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'mean area'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m181.0[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'mean smoothness'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.05263[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'mean compactness'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.04362[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'mean concavity'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.0[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'mean concave points'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.0[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'mean symmetry'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.1587[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'mean fractal dimension'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.05884[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'radius error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.3857[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'texture error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m1.428[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'perimeter error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m2.548[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'area error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m19.15[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'smoothness error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.007189[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'compactness error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.00466[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'concavity error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.0[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'concave points error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.0[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'symmetry error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.02676[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'fractal dimension error'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.002783[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst radius'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m9.456[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst texture'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m30.37[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst perimeter'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m59.16[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst area'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m268.6[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst smoothness'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.08996[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst compactness'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.06444[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst concavity'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.0[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst concave points'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.0[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst symmetry'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.2871[0m[1m)[0m,
[1;35mnp.str_[0m[1m([0m[32m'worst fractal dimension'[0m[1m)[0m: [1;35mnp.float64[0m[1m([0m[1;36m0.07039[0m[1m)[0m
[1m}[0m
Conveniently, River's stream
module has an iter_sklearn_dataset
method that we can use instead.
from river import stream
for xi, yi in stream.iter_sklearn_dataset(datasets.load_breast_cancer()):
pass
The simple fact that we are getting the data as a stream means that we can't do a lot of things the same way as in a batch setting. For example let's say we want to scale the data so that it has mean 0 and variance 1, as we did earlier. To do so we simply have to subtract the mean of each feature to each value and then divide the result by the standard deviation of the feature. The problem is that we can't possible know the values of the mean and the standard deviation before actually going through all the data! One way to proceed would be to do a first pass over the data to compute the necessary values and then scale the values during a second pass. The problem is that this defeats our purpose, which is to learn by only looking at the data once. Although this might seem rather restrictive, it reaps sizable benefits down the road.
The way we do feature scaling in River involves computing running statistics (also know as moving statistics). The idea is that we use a data structure that estimates the mean and updates itself when it is provided with a value. The same goes for the variance (and thus the standard deviation). For example, if we denote \(\mu_t\) the mean and \(n_t\) the count at any moment \(t\), then updating the mean can be done as so:
Likewise, the running variance can be computed as so:
where \(s_t\) is a running sum of squares and \(\sigma_t\) is the running variance at time \(t\). This might seem a tad more involved than the batch algorithms you learn in school, but it is rather elegant. Implementing this in Python is not too difficult. For example let's compute the running mean and variance of the 'mean area'
variable.
n, mean, sum_of_squares, variance = 0, 0, 0, 0
for xi, yi in stream.iter_sklearn_dataset(datasets.load_breast_cancer()):
n += 1
old_mean = mean
mean += (xi['mean area'] - mean) / n
sum_of_squares += (xi['mean area'] - old_mean) * (xi['mean area'] - mean)
variance = sum_of_squares / n
print(f'Running mean: {mean:.3f}')
print(f'Running variance: {variance:.3f}')
Running mean: 654.889
Running variance: 123625.903
Let's compare this with numpy
. But remember, numpy
requires access to "all" the data.
import numpy as np
i = list(dataset.feature_names).index('mean area')
print(f'True mean: {np.mean(X[:, i]):.3f}')
print(f'True variance: {np.var(X[:, i]):.3f}')
True mean: 654.889
True variance: 123625.903
The results seem to be exactly the same! The twist is that the running statistics won't be very accurate for the first few observations. In general though this doesn't matter too much. Some would even go as far as to say that this descrepancy is beneficial and acts as some sort of regularization...
Now the idea is that we can compute the running statistics of each feature and scale them as they come along. The way to do this with River is to use the StandardScaler
class from the preprocessing
module, as so:
from river import preprocessing
scaler = preprocessing.StandardScaler()
for xi, yi in stream.iter_sklearn_dataset(datasets.load_breast_cancer()):
scaler.learn_one(xi)
Now that we are scaling the data, we can start doing some actual machine learning. We're going to implement an online linear regression task. Because all the data isn't available at once, we are obliged to do what is called stochastic gradient descent, which is a popular research topic and has a lot of variants. SGD is commonly used to train neural networks. The idea is that at each step we compute the loss between the target prediction and the truth. We then calculate the gradient, which is simply a set of derivatives with respect to each weight from the linear regression. Once we have obtained the gradient, we can update the weights by moving them in the opposite direction of the gradient. The amount by which the weights are moved typically depends on a learning rate, which is typically set by the user. Different optimizers have different ways of managing the weight update, and some handle the learning rate implicitly. Online linear regression can be done in River with the LinearRegression
class from the linear_model
module. We'll be using plain and simple SGD using the SGD
optimizer from the optim
module. During training we'll measure the squared error between the truth and the predictions.
from river import linear_model
from river import optim
scaler = preprocessing.StandardScaler()
optimizer = optim.SGD(lr=0.01)
log_reg = linear_model.LogisticRegression(optimizer)
y_true = []
y_pred = []
for xi, yi in stream.iter_sklearn_dataset(datasets.load_breast_cancer(), shuffle=True, seed=42):
# Scale the features
scaler.learn_one(xi)
xi_scaled = scaler.transform_one(xi)
# Test the current model on the new "unobserved" sample
yi_pred = log_reg.predict_proba_one(xi_scaled)
# Train the model with the new sample
log_reg.learn_one(xi_scaled, yi)
# Store the truth and the prediction
y_true.append(yi)
y_pred.append(yi_pred[True])
print(f'ROC AUC: {metrics.roc_auc_score(y_true, y_pred):.3f}')
ROC AUC: 0.990
The ROC AUC is significantly better than the one obtained from the cross-validation of scikit-learn's logisitic regression. However to make things really comparable it would be nice to compare with the same cross-validation procedure. River has a compat
module that contains utilities for making River compatible with other Python libraries. Because we're doing regression we'll be using the SKLRegressorWrapper
. We'll also be using Pipeline
to encapsulate the logic of the StandardScaler
and the LogisticRegression
in one single object.
from river import compat
from river import compose
# We define a Pipeline, exactly like we did earlier for sklearn
model = compose.Pipeline(
('scale', preprocessing.StandardScaler()),
('log_reg', linear_model.LogisticRegression())
)
# We make the Pipeline compatible with sklearn
model = compat.convert_river_to_sklearn(model)
# We compute the CV scores using the same CV scheme and the same scoring
scores = model_selection.cross_val_score(model, X, y, scoring=scorer, cv=cv)
# Display the average score and its standard deviation
print(f'ROC AUC: {scores.mean():.3f} (± {scores.std():.3f})')
ROC AUC: 0.964 (± 0.016)
This time the ROC AUC score is lower, which is what we would expect. Indeed online learning isn't as accurate as batch learning. However it all depends in what you're interested in. If you're only interested in predicting the next observation then the online learning regime would be better. That's why it's a bit hard to compare both approaches: they're both suited to different scenarios.
Going further¶
Here a few resources if you want to do some reading:
- Online learning -- Wikipedia
- What is online machine learning? -- Max Pagels
- Introduction to Online Learning -- USC course
- Online Methods in Machine Learning -- MIT course
- Online Learning: A Comprehensive Survey
- Streaming 101: The world beyond batch
- Machine learning for data streams
- Data Stream Mining: A Practical Approach