SparkMLlib構(gòu)建機(jī)器學(xué)習(xí)回歸模型
3.2 回歸問(wèn)題
3.2.1 線性回歸
Spark MLlib 的線性回歸算法是一種廣泛使用的預(yù)測(cè)模型。它將輸入特征映射到連續(xù)的輸出值。這是通過(guò)訓(xùn)練模型來(lái)確定最佳擬合線性函數(shù)的系數(shù)完成的。
以下是一個(gè)使用 Spark MLlib 實(shí)現(xiàn)線性回歸的 Java 示例程序。
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class LinearRegressionExample {
public static void main(String[] args) {
// 創(chuàng)建一個(gè) SparkSession
SparkSession spark = SparkSession
.builder()
.appName("LinearRegressionExample")
.getOrCreate();
// 讀取數(shù)據(jù)集
Dataset<Row> data = spark.read()
.format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt");
// 將數(shù)據(jù)集拆分為訓(xùn)練集和測(cè)試集
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
// 將特征列合并到一個(gè)向量列中
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"features"})
.setOutputCol("featuresVector");
// 定義線性回歸模型
LinearRegression lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8);
// // 將數(shù)據(jù)集擬合到線性回歸模型中
Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
//訓(xùn)練模型
LinearRegressionModel lrModel = lr.fit(trainingDataWithFeatures);
//打印線性回歸的系數(shù)和截距
System.out.println("系數(shù)Coefficients: "+lrModel.coefficients() + "");
System.out.println(" 截距Intercept: " + lrModel.intercept()+ "");
//總結(jié)訓(xùn)練集上的模型并打印出一些指標(biāo)。
LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
Dataset<Row> dataset = trainingSummary.predictions().select("prediction", "label", "featuresVector");
dataset.show(5);
spark.stop();
}
}
3.2.2 決策樹(shù)回歸
Spark MLlib 提供了決策樹(shù)回歸(Decision Tree Regression)算法來(lái)解決回歸問(wèn)題。決策樹(shù)回歸是一種基于樹(shù)結(jié)構(gòu)的非參數(shù)統(tǒng)計(jì)方法,能夠處理多維輸入和輸出,并且具有良好的可解釋性和魯棒性。
下面是一個(gè)簡(jiǎn)單的 Java 代碼示例,演示如何使用 Spark MLlib 的決策樹(shù)回歸算法對(duì)數(shù)據(jù)進(jìn)行訓(xùn)練和預(yù)測(cè):
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class DecisionTreeRegressionExample {
public static void main(String[] args) {
// 創(chuàng)建 SparkSession
SparkSession spark = SparkSession.builder()
.appName("DecisionTreeRegressionExample")
.master("local[*]")
.getOrCreate();
// 讀取數(shù)據(jù)集
Dataset<Row> data = spark.read().format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("path/to/your/data.csv");
// 定義特征列和標(biāo)簽列
String[] featureCols = data.columns();
String labelCol = "label";
// 將特征列轉(zhuǎn)換為向量
VectorAssembler assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features");
Dataset<Row> dataWithFeatures = assembler.transform(data).select("features", labelCol);
// 將數(shù)據(jù)集拆分為訓(xùn)練集和測(cè)試集
double[] weights = {0.7, 0.3};
Dataset<Row>[] datasets = dataWithFeatures.randomSplit(weights);
Dataset<Row> trainData = datasets[0];
Dataset<Row> testData = datasets[1];
// 創(chuàng)建決策樹(shù)回歸器
DecisionTreeRegressor dt = new DecisionTreeRegressor()
.setLabelCol(labelCol)
.setFeaturesCol("features");
// 創(chuàng)建 Pipeline
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { dt });
// 訓(xùn)練模型
PipelineModel model = pipeline.fit(trainData);
// 預(yù)測(cè)測(cè)試集
Dataset<Row> predictions = model.transform(testData);
// 顯示預(yù)測(cè)結(jié)果
predictions.show();
// 獲取訓(xùn)練好的決策樹(shù)模型
DecisionTreeRegressionModel dtModel = (DecisionTreeRegressionModel) model.stages()[0];
System.out.println("Learned regression tree model:\n" + dtModel.toDebugString());
// 停止 SparkSession
spark.stop();
}
}
這只是一個(gè)簡(jiǎn)單的示例,實(shí)際應(yīng)用中可能需要更復(fù)雜的特征工程和模型調(diào)整。另外,如果數(shù)據(jù)集過(guò)大,可能需要在集群上運(yùn)行以獲得更好的性能。
3.2.3 隨機(jī)森林回歸
Spark MLlib提供了隨機(jī)森林回歸算法,可以用于預(yù)測(cè)連續(xù)的數(shù)值型數(shù)據(jù)。隨機(jī)森林是一種集成學(xué)習(xí)算法,它基于決策樹(shù),通過(guò)隨機(jī)選擇樣本和特征來(lái)減少過(guò)擬合的風(fēng)險(xiǎn)。隨機(jī)森林回歸使用多個(gè)決策樹(shù)對(duì)數(shù)據(jù)進(jìn)行擬合和預(yù)測(cè),并取這些決策樹(shù)的平均值作為最終預(yù)測(cè)結(jié)果。本文將介紹如何使用Spark MLlib中的隨機(jī)森林回歸算法,并提供一個(gè)完整可運(yùn)行的Java示例。
示例說(shuō)明: 在這個(gè)示例中,我們將使用Spark MLlib中的隨機(jī)森林回歸算法,對(duì)一組汽車數(shù)據(jù)進(jìn)行建模,然后使用模型來(lái)預(yù)測(cè)汽車的燃油效率(MPG)。我們將使用UCI Machine Learning Repository中的Auto MPG數(shù)據(jù)集。該數(shù)據(jù)集包含8個(gè)輸入特征,如汽車的氣缸數(shù)、排量、馬力、重量等,以及一個(gè)輸出特征MPG,表示汽車的燃油效率。我們將使用70%的數(shù)據(jù)來(lái)訓(xùn)練模型,30%的數(shù)據(jù)用于測(cè)試模型性能。
1.準(zhǔn)備數(shù)據(jù) 我們需要下載Auto MPG數(shù)據(jù)集,將其保存為CSV文件,并將其加載到Spark DataFrame中。
數(shù)據(jù)集下載地址:
CSV文件格式如下:
mpg,cylinders,displacement,horsepower,weight,acceleration,modelyear,origin
18.0,8,307.0,130.0,3504.0,12.0,70,1
15.0,8,350.0,165.0,3693.0,11.5,70,1
其中,第一列為輸出特征MPG,后面的列為輸入特征。
2.構(gòu)建隨機(jī)森林回歸模型
// 創(chuàng)建隨機(jī)森林回歸模型
RandomForestRegressor rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setNumTrees(10);
// 訓(xùn)練模型
RandomForestRegressionModel model = rf.fit(trainingData);
3.使用模型進(jìn)行預(yù)測(cè)
// 使用模型進(jìn)行預(yù)測(cè)
Dataset<Row> predictions = model.transform(testData);
4.評(píng)估模型性能
// 評(píng)估模型性能
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
下面是完整 Java 代碼示例:
import org.apache.spark.SparkConf;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class RandomForestRegressionExample {
public static void main(String[] args) {
// Create a Spark session
SparkConf conf = new SparkConf().setAppName("RandomForestRegressionExample").setMaster("local[*]");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
// Load data
Dataset<Row> data = spark.read().format("libsvm").load("data/sample_libsvm_data.txt");
// Split the data into training and test sets
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// Define the feature column names
String[] featureCols = new String[data.schema().fieldNames().length - 1];
for (int i = 0; i < featureCols.length; i++) {
featureCols[i] = "feature" + (i + 1);
}
// Assemble features into a vector
VectorAssembler assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features");
Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
Dataset<Row> testDataWithFeatures = assembler.transform(testData);
// Create a RandomForestRegressor model
RandomForestRegressor rf = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxDepth(5)
.setNumTrees(20);
// Set up a pipeline
Pipeline pipeline = new Pipeline().setStages(new RandomForestRegressor[]{rf});
// Set up a grid of hyperparameters to search over using 3-fold cross validation
ParamGridBuilder paramGridBuilder = new ParamGridBuilder()
.addGrid(rf.maxDepth(), new int[]{5, 10})
.addGrid(rf.numTrees(), new int[]{20, 50});
CrossValidator crossValidator = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator())
.setEstimatorParamMaps(paramGridBuilder.build())
.setNumFolds(3);
// Train the model using cross-validation
crossValidator.setSeed(12345);
CrossValidatorModel crossValidatorModel = crossValidator.fit(trainingDataWithFeatures);
// Evaluate the model on the test set
Dataset<Row> predictions = crossValidatorModel.transform(testDataWithFeatures);
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
// Stop the Spark session
spark.stop();
}
}
3.2.4 梯度提升回歸樹(shù)
Spark MLlib提供了梯度提升回歸樹(shù)(Gradient-Boosted Trees,GBT)的算法,它是一種強(qiáng)大的回歸模型,可以用于連續(xù)的數(shù)值預(yù)測(cè)。GBT在每一次迭代中,使用決策樹(shù)模型去擬合殘差值,然后將所有的模型的預(yù)測(cè)結(jié)果相加,得到最終的預(yù)測(cè)結(jié)果。
以下是一個(gè)使用Spark MLlib進(jìn)行梯度提升回歸樹(shù)的示例Java程序。這個(gè)程序?qū)⑹褂靡粋€(gè)數(shù)據(jù)集,該數(shù)據(jù)集包含了關(guān)于自行車租賃量的信息。它將使用梯度提升回歸樹(shù)來(lái)預(yù)測(cè)一天中自行車的租賃量。
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.*;
public class GBTRegressionDemo {
public static void main(String[] args) {
// 創(chuàng)建 SparkSession
SparkSession spark = SparkSession.builder()
.appName("GradientBoostedTreeRegressionDemo")
.master("local[*]")
.getOrCreate();
// 讀取數(shù)據(jù)集
Dataset<Row> data = spark.read().format("libsvm")
.load("data/sample_libsvm_data.txt");
// 將數(shù)據(jù)集劃分為訓(xùn)練集和測(cè)試集
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];
// 將特征向量合并為一個(gè)向量
VectorAssembler assembler = new VectorAssembler()
.setInputCols(trainingData.columns())
.setOutputCol("features");
Dataset<Row> trainingDataWithFeatures = assembler.transform(trainingData);
Dataset<Row> testDataWithFeatures = assembler.transform(testData);
// 創(chuàng)建梯度提升回歸樹(shù)模型
GBTRegressor gbt = new GBTRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10);
// 訓(xùn)練模型
GBTRegressionModel model = gbt.fit(trainingDataWithFeatures);
// 在測(cè)試集上進(jìn)行預(yù)測(cè)
Dataset<Row> predictions = model.transform(testDataWithFeatures);
// 評(píng)估模型
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);
// 輸出模型的節(jié)點(diǎn)信息
System.out.println("Learned regression GBT model:\n" + model.toDebugString());
// 關(guān)閉 SparkSession
spark.close();
}
}
該程序與之前的程序類似,只是將 DecisionTreeRegressor 類替換為 GBTRegressor 類。需要注意的是,GBTRegressor 類在設(shè)置參數(shù)時(shí)需要設(shè)置 maxIter 參數(shù),表示最大迭代次數(shù)。同樣需要用 setFeaturesCol 方法設(shè)置特征列,用 setLabelCol 方法設(shè)置標(biāo)簽列。最后需要調(diào)用 fit 方法訓(xùn)練模型,然后用 transform 方法在測(cè)試集上進(jìn)行預(yù)測(cè)。最后用 RegressionEvaluator 類評(píng)估模型,并輸出模型的節(jié)點(diǎn)信息。
