<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          分布式機器學習原理及實戰(zhàn)(Pyspark)

          共 6473字,需瀏覽 13分鐘

           ·

          2021-06-12 17:14

          一、大數(shù)據(jù)框架及Spark介紹

          1.1 大數(shù)據(jù)框架

          大數(shù)據(jù)(Big Data)是指無法在一定時間內(nèi)用常規(guī)軟件工具對其內(nèi)容進行抓取、管理和處理的數(shù)據(jù)集合。大數(shù)據(jù)技術,是指從各種各樣類型的數(shù)據(jù)中,快速獲得有價值信息的能力。

          自2003年Google公布了3篇大數(shù)據(jù)奠基性論文,為大數(shù)據(jù)存儲及分布式處理的核心問題提供了思路:非結構化文件分布式存儲(GFS)、分布式計算(MapReduce)及結構化數(shù)據(jù)存儲(BigTable),并奠定了現(xiàn)代大數(shù)據(jù)技術的理論基礎,而后大數(shù)據(jù)技術便快速發(fā)展,誕生了很多日新月異的技術。歸納現(xiàn)有大數(shù)據(jù)框架解決的核心問題及相關技術主要為:

          • 分布式存儲的問題:有GFS,HDFS等,使得大量的數(shù)據(jù)能橫跨成百上千臺機器;
          • 大數(shù)據(jù)計算的問題:有MapReduce、Spark批處理、Flink流處理等,可以分配計算任務給各個計算節(jié)點(機器);
          • 結構化數(shù)據(jù)存儲及查詢的問題:有Hbase、Bigtable等,可以快速獲取/存儲結構化的鍵值數(shù)據(jù);
          • 大數(shù)據(jù)挖掘的問題:有Hadoop的mahout,spark的ml等,可以使用分布式機器學習算法挖掘信息;

          1.2 Spark的介紹

          Spark是一個分布式內(nèi)存批計算處理框架,Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node組成。對于每個Spark應用程序,Worker Node上存在一個Executor進程,Executor進程中包括多個Task線程。在執(zhí)行具體的程序時,Spark會將程序拆解成一個任務DAG(有向無環(huán)圖),再根據(jù)DAG決定程序各步驟執(zhí)行的方法。該程序先分別從textFile和HadoopFile讀取文件,經(jīng)過一些列操作后再進行join,最終得到處理結果。PySpark是Spark的Python API,通過Pyspark可以方便地使用 Python編寫 Spark 應用程序, 其支持 了Spark 的大部分功能,例如 Spark SQL、DataFrame、Streaming、MLLIB(ML)和 Spark Core。

          二、PySpark分布式機器學習

          2.1 PySpark機器學習庫

          Pyspark中支持兩個機器學習庫:mllib及ml,區(qū)別在于ml主要操作的是DataFrame,而mllib操作的是RDD,即二者面向的數(shù)據(jù)集不一樣。相比于mllib在RDD提供的基礎操作,ml在DataFrame上的抽象級別更高,數(shù)據(jù)和操作耦合度更低。

          注:mllib在后面的版本中可能被廢棄,本文示例使用的是ml庫。

          pyspark.ml訓練機器學習庫有三個主要的抽象類:Transformer、Estimator、Pipeline。

          • Transformer主要對應feature子模塊,實現(xiàn)了算法訓練前的一系列的特征預處理工作,例如MinMaxScaler、word2vec、onehotencoder等,對應操作為transform;
          # 舉例:特征加工
          from pyspark.ml.feature import VectorAssembler
          featuresCreator = VectorAssembler(
              inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],
              outputCol='features'
          )

          • Estimator對應各種機器學習算法,主要為分類、回歸、聚類和推薦算法4大類,具體可選算法大多在sklearn中均有對應,對應操作為fit;
          # 舉例:分類模型
          from pyspark.ml.classification import LogisticRegression

          logistic = LogisticRegression(featuresCol=featuresCreator.getOutputCol(),
                                          labelCol='INFANT_ALIVE_AT_REPORT')
          • Pipeline可將一些列轉換和訓練過程串聯(lián)形成流水線。
          # 舉例:創(chuàng)建流水線
          from pyspark.ml import Pipeline

          pipeline = Pipeline(stages=[encoder, featuresCreator, logistic]) # 特征編碼,特征加工,載入LR模型
          # 擬合模型
          train, test = data.randomSplit([0.7,0.3],seed=123)
          model = pipeline.fit(train)

          2.2 PySpark分布式機器學習原理

          在分布式訓練中,用于訓練模型的工作負載會在多個微型處理器之間進行拆分和共享,這些處理器稱為工作器節(jié)點,通過這些工作器節(jié)點并行工作以加速模型訓練。分布式訓練可用于傳統(tǒng)的 ML 模型,但更適用于計算和時間密集型任務,如用于訓練深度神經(jīng)網(wǎng)絡。分布式訓練有兩種主要類型:數(shù)據(jù)并行及模型并行,主要代表有Spark ML,Parameter Server和TensorFlow。

          spark的分布式訓練的實現(xiàn)為數(shù)據(jù)并行:按行對數(shù)據(jù)進行分區(qū),從而可以對數(shù)百萬甚至數(shù)十億個實例進行分布式訓練。以其核心的梯度下降算法為例: 

          1、首先對數(shù)據(jù)劃分至各計算節(jié)點; 

          2、把當前的模型參數(shù)廣播到各個計算節(jié)點(當模型參數(shù)量較大時會比較耗帶寬資源);

          3、各計算節(jié)點進行數(shù)據(jù)抽樣得到mini batch的數(shù)據(jù),分別計算梯度,再通過treeAggregate操作匯總梯度,得到最終梯度gradientSum; 

          4、利用gradientSum更新模型權重(這里采用的阻斷式的梯度下降方式,當各節(jié)點有數(shù)據(jù)傾斜時,每輪的時間取決于最慢的節(jié)點。這是Spark并行訓練效率較低的主要原因)。

          PySpark項目實戰(zhàn)

          注:單純拿Pyspark練練手,可無需配置Pyspark集群,直接本地配置下單機Pyspark,也可以使用線上spark集群(如: community.cloud.databricks.com)。

          本項目通過PySpark實現(xiàn)機器學習建模全流程:包括數(shù)據(jù)的載入,數(shù)據(jù)分析,特征加工,二分類模型訓練及評估。

          #!/usr/bin/env python
          # coding: utf-8


          #  初始化SparkSession
          from pyspark.sql import SparkSession

          spark = SparkSession.builder.appName("Python Spark RF example").config("spark.some.config.option""some-value").getOrCreate()

          # 加載數(shù)據(jù)
          df = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("./data.csv",header=True)

          from pyspark.sql.functions import *
          # 數(shù)據(jù)基本信息分析
          df.dtypes # Return df column names and data types
          df.show()  #Display the content of df
          df.head()  #Return first n rows
          df.first()  #Return first row 
          df.take(2)  #Return the first n rows
          df.schema   # Return the schema of df
          df.columns # Return the columns of df
          df.count()  #Count the number of rows in df
          df.distinct().count()  #Count the number of distinct rows in df
          df.printSchema()  #Print the schema of df
          df.explain()  #Print the (logical and physical)  plans
          df.describe().show()  #Compute summary statistics 
          df.groupBy('Survived').agg(avg("Age"),avg("Fare")).show()  # 聚合分析
          df.select(df.Sex, df.Survived==1).show()  # 帶條件查詢 
          df.sort("Age", ascending=False).collect() # 排序
          # 特征加工
          df = df.dropDuplicates()   # 刪除重復值
          df = df.na.fill(value=0)  # 缺失填充值
          df = df.na.drop()        # 或者刪除缺失值
          df = df.withColumn('isMale', when(df['Sex']=='male',1).otherwise(0)) # 新增列:性別0 1
          df = df.drop('_c0','Name','Sex'# 刪除姓名、性別、索引列

          # 設定特征/標簽列
          from pyspark.ml.feature import VectorAssembler
          ignore=['Survived']
          vectorAssembler = VectorAssembler(inputCols=[x for x in df.columns  
                            if x not in ignore], outputCol = 'features')
          new_df = vectorAssembler.transform(df)
          new_df = new_df.select(['features''Survived'])

          # 劃分測試集訓練集
          train, test = new_df.randomSplit([0.75, 0.25], seed = 12345)

          # 模型訓練
          from pyspark.ml.classification import LogisticRegression

          lr = LogisticRegression(featuresCol = 'features'
                                   labelCol='Survived')
          lr_model = lr.fit(test)

          # 模型評估
          from pyspark.ml.evaluation import BinaryClassificationEvaluator

          predictions = lr_model.transform(test)
          auc = BinaryClassificationEvaluator().setLabelCol('Survived')
          print('AUC of the model:' + str(auc.evaluate(predictions)))
          print('features weights', lr_model.coefficientMatrix)

          文章首發(fā)于算法進階,公眾號閱讀原文可訪問GitHub項目源碼


          瀏覽 103
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  男女操逼网站国产 | 狠狠干2025 | 国产一级卖婬片AAAAA揪痧 | 玖玖精品电影 | 日韩电影中文字幕一级黄片 |