【機(jī)器學(xué)習(xí)】在大數(shù)據(jù)上使用PySpark進(jìn)行K-Means
編譯 | VK
來源 | Towards Data Science

如果你不熟悉K Means聚類,我建議你閱讀下面的文章。本文主要研究數(shù)據(jù)并行和聚類,大數(shù)據(jù)上的K-Means聚類。
https://towardsdatascience.com/unsupervised-learning-techniques-using-python-k-means-and-silhouette-score-for-clustering-d6dd1f30b660
關(guān)于聚類
聚類是一種無監(jiān)督的學(xué)習(xí)技術(shù),簡而言之,你處理的是數(shù)據(jù),沒有任何關(guān)于目標(biāo)屬性或因變量的信息。
聚類的一般思想是在數(shù)據(jù)中發(fā)現(xiàn)一些內(nèi)在的結(jié)構(gòu),通常被稱為相似對象的簇。該算法研究數(shù)據(jù)以識別這些簇,使得簇中的每個(gè)成員更接近簇中的另一個(gè)成員(較低的簇內(nèi)距離),而遠(yuǎn)離不同簇中的另一個(gè)成員(較高的簇間距離)。
聚類適合哪里?
你們大多數(shù)人都熟悉現(xiàn)實(shí)生活中的這些例子:
客戶細(xì)分-廣泛用于目標(biāo)營銷 圖像分割-識別景觀 推薦引擎
背景
K-Means聚類,使用歐氏距離形式的相似性度量,通常被稱為分裂聚類或分區(qū)聚類。
K均值的基本思想是從每個(gè)數(shù)據(jù)點(diǎn)都屬于一個(gè)簇,然后根據(jù)用戶輸入K(或聚類數(shù))將它們分成更小的簇。每個(gè)簇都有一個(gè)稱為質(zhì)心的中心。質(zhì)心總數(shù)總是等于K。該算法迭代地尋找數(shù)據(jù)點(diǎn)并將它們分配給最近的簇。
一旦所有數(shù)據(jù)點(diǎn)被分配到各自的質(zhì)心(這里代表每個(gè)簇),質(zhì)心值將被重新計(jì)算,過程將重復(fù),直到簇達(dá)到收斂標(biāo)準(zhǔn)。
質(zhì)心只不過是每個(gè)簇的新平均值(例如,由客戶A、B、C組成的簇,平均支出為100、200、300,籃子大小為10、15和20,質(zhì)心分別為200和15)。收斂準(zhǔn)則是衡量簇的穩(wěn)定性的一個(gè)指標(biāo),即任意兩次迭代之間的簇內(nèi)距離在給定的閾值范圍內(nèi)不變。
Pypark有什么不同嗎
在我們討論為什么PySpark不是基于Sklearn的算法之前,讓我們先討論一下PySpark中的過程有什么不同。
在使用PySpark構(gòu)建任何聚類算法時(shí),都需要執(zhí)行一些數(shù)據(jù)轉(zhuǎn)換。讓我們先理解數(shù)據(jù),用于分析的數(shù)據(jù)可以在這里找到。
https://www.kaggle.com/arjunbhasin2013/ccdata
數(shù)據(jù)
該數(shù)據(jù)集由超過6個(gè)月的9K名活躍信用卡持卡人及其交易和賬戶屬性組成。其想法是制定一個(gè)客戶細(xì)分的營銷策略。

使用Pypark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(‘Clustering using K-Means’).getOrCreate()
data_customer=spark.read.csv('CC General.csv', header=True, inferSchema=True)
data_customer.printSchema()

屬性可以分為三大類??蛻粜畔ⅲㄖ麈I為CUST_ID)、帳戶信息(余額、余額頻率、購買、信用額度、使用期限等)和交易(購買頻率、付款、預(yù)付現(xiàn)金等)。
data_customer=data_customer.na.drop()
所考慮的所有屬性都是數(shù)字或離散數(shù)字,因此我們需要使用向量匯編器(Vector Assembler)將它們轉(zhuǎn)換為特征。向量匯編器是一種轉(zhuǎn)換器,它將一組特征轉(zhuǎn)換為單個(gè)向量列,通常稱為特征數(shù)組,這里的特征是列。
customer id不會(huì)用于聚類。我們首先使用.columns提取所需的列,將其作為輸入傳遞給Vector Assembler,然后使用transform將輸入列轉(zhuǎn)換為一個(gè)稱為feature的向量列。
from pyspark.ml.feature import VectorAssembler
data_customer.columns
assemble=VectorAssembler(inputCols=[
'BALANCE',
'BALANCE_FREQUENCY',
'PURCHASES',
'ONEOFF_PURCHASES',
'INSTALLMENTS_PURCHASES',
'CASH_ADVANCE',
'PURCHASES_FREQUENCY',
'ONEOFF_PURCHASES_FREQUENCY',
'PURCHASES_INSTALLMENTS_FREQUENCY',
'CASH_ADVANCE_FREQUENCY',
'CASH_ADVANCE_TRX',
'PURCHASES_TRX',
'CREDIT_LIMIT',
'PAYMENTS',
'MINIMUM_PAYMENTS',
'PRC_FULL_PAYMENT',
'TENURE'], outputCol='features')
assembled_data=assemble.transform(data_customer)
assembled_data.show(2)

既然所有的列都被轉(zhuǎn)換成一個(gè)單一的特征向量,我們就需要對數(shù)據(jù)進(jìn)行標(biāo)準(zhǔn)化,使它們具有可比的規(guī)模。例如,BALANCE可以是10-1000,而BALANCE_FREQUENCY可以是0-1。
歐幾里德距離總是在大尺度上受到更大的影響,因此對變量進(jìn)行標(biāo)準(zhǔn)化是非常重要的。
from pyspark.ml.feature import StandardScaler
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)
data_scale_output.show(2)

既然我們的數(shù)據(jù)已經(jīng)標(biāo)準(zhǔn)化了,我們就可以開發(fā)K均值算法了。
K-means是最常用的聚類算法之一,用于將數(shù)據(jù)分簇到預(yù)定義數(shù)量的聚類中。
spark.mllib包括k-means++方法的一個(gè)并行化變體,稱為kmeans||。KMeans函數(shù)來自pyspark.ml.clustering,包括以下參數(shù):
k是用戶指定的簇?cái)?shù) maxIterations是聚類算法停止之前的最大迭代次數(shù)。請注意,如果簇內(nèi)距離的變化不超過上面提到的epsilon值,迭代將停止,而不考慮最大迭代次數(shù) initializationMode指定質(zhì)心的隨機(jī)初始化或通過k-means||初始化(類似于k-means++) epsilon決定k-均值收斂的距離閾值 initialModel是一簇可選的群集質(zhì)心,用戶可以將其作為輸入提供。如果使用此參數(shù),算法只運(yùn)行一次,將點(diǎn)分配到最近的質(zhì)心
train(k=4, maxIterations=20, minDivisibleClusterSize=1.0, seed=-1888008604)是默認(rèn)值。
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', \
metricName='silhouette', distanceMeasure='squaredEuclidean')
for i in range(2,10):
KMeans_algo=KMeans(featuresCol='standardized', k=i)
KMeans_fit=KMeans_algo.fit(data_scale_output)
output=KMeans_fit.transform(data_scale_output)
score=evaluator.evaluate(output)
silhouette_score.append(score)
print("Silhouette Score:",score)
可視化分?jǐn)?shù)。注意,以前版本的K Means有computeScore,它計(jì)算聚類內(nèi)距離的總和,但在spark3.0.0中被棄用。
輪廓分?jǐn)?shù)使用ClusteringEvaluator,它測量一個(gè)簇中的每個(gè)點(diǎn)與相鄰簇中的點(diǎn)的接近程度,從而幫助判斷簇是否緊湊且間隔良好
# 可視化輪廓分?jǐn)?shù)
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1,1, figsize =(8,6))
ax.plot(range(2,10),silhouette_score)
ax.set_xlabel(‘k’)
ax.set_ylabel(‘cost’)

我更喜歡用K=7,在那里可以觀察到輪廓分?jǐn)?shù)的局部最大值。什么值的K是好的沒有正確的答案。
我們可以使用描述性統(tǒng)計(jì)和其他圖表來檢查,這點(diǎn)在SkLearn和PCA上實(shí)現(xiàn)更方便。我們中的大多數(shù)人更喜歡研究肘部圖,而不是輪廓分?jǐn)?shù),但PySpark有它的優(yōu)點(diǎn)。
為什么是Pypark?
PySpark在執(zhí)行K均值聚類時(shí)使用數(shù)據(jù)并行或結(jié)果并行的概念。
假設(shè)你需要為墨爾本節(jié)禮日活動(dòng)開展有針對性的營銷活動(dòng),并且你希望接觸到具有不同購買屬性的20萬客戶。想象一下在本地系統(tǒng)上運(yùn)行K Means的多次迭代。對于K=5,需要計(jì)算的距離度量數(shù)為5 x 200K=1百萬。100萬個(gè)這樣的度量需要計(jì)算30次才能滿足收斂標(biāo)準(zhǔn),即3000萬個(gè)距離(歐幾里德距離)。處理這樣的場景需要大量的計(jì)算能力和時(shí)間。
數(shù)據(jù)并行性
數(shù)據(jù)并行所做的是,通過將數(shù)據(jù)集劃分為更小的分區(qū),從一開始就創(chuàng)建并行性。另一方面,結(jié)果并行是基于目標(biāo)聚類的。例如:
D=記錄數(shù){X1,X2,…,Xn}
k=簇?cái)?shù)
P=處理器數(shù){P1,P2,…,Pm}
C=初始質(zhì)心{C1,C2,…,Ck}
數(shù)據(jù)D被P個(gè)處理器分割。每個(gè)處理器處理一簇記錄(由spark配置決定)。初始質(zhì)心值C在每個(gè)處理器之間共享 現(xiàn)在每個(gè)處理器都有質(zhì)心信息。處理器計(jì)算它們的記錄到這些質(zhì)心的距離,并通過將數(shù)據(jù)點(diǎn)分配到最近的質(zhì)心來形成局部聚類 完成步驟2后,主進(jìn)程將存儲P個(gè)處理器上每個(gè)聚類的記錄總數(shù)和計(jì)數(shù),以供將來參考 一旦一次迭代完成,來自處理器的信息被交換,主進(jìn)程計(jì)算更新的質(zhì)心并再次在P個(gè)處理器之間共享它們,即,主進(jìn)程更新質(zhì)心,并與處理器重新共享信息 這個(gè)過程不斷迭代直到收斂。一旦滿足收斂條件,主進(jìn)程就收集本地簇并將它們組合成一個(gè)全局聚類
想象一下,將20萬條記錄分成3個(gè)處理器,每個(gè)處理器有約70萬條記錄。這就是分布式處理的用武之地,以減少數(shù)據(jù)量,同時(shí)確保完整的結(jié)果。
結(jié)果并行性
例如:
D=記錄數(shù){X1,X2,…,Xn}
k=簇?cái)?shù)
P=處理器數(shù){P1,P2,…,Pm}
C=初始質(zhì)心{C1,C2,…,Ck}
數(shù)據(jù)D被P個(gè)處理器分割,然后在每個(gè)處理器內(nèi)排序。每個(gè)處理器處理一組記錄(由spark配置決定) 初始質(zhì)心值C被初始化,并在這些處理器中的每一個(gè)處理器之間進(jìn)行分割/共享(即,與所有質(zhì)心值在所有處理器之間共享的數(shù)據(jù)并行性不同,這里,我們將一個(gè)質(zhì)心值傳遞給一個(gè)處理器) 現(xiàn)在每個(gè)處理器都有一個(gè)中心。計(jì)算這些點(diǎn)到這些質(zhì)心的距離。對于處理器中的數(shù)據(jù)點(diǎn):如果它們更接近處理器的質(zhì)心,則將它們分配給該簇,否則如果它們更接近屬于其他處理器的質(zhì)心,則將數(shù)據(jù)點(diǎn)移動(dòng)到新處理器 重復(fù),直到收斂。
有用的鏈接
https://spark.apache.org/docs/latest/mllib-clustering.html https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeansModel https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.ClusteringEvaluator https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/evaluation.html
感謝閱讀。
往期精彩回顧
本站qq群851320808,加入微信群請掃碼:
