Elasticsearch 8.X 導(dǎo)出 CSV 多種方案,一網(wǎng)打盡!
1、問題來源
看到 Elasticsearch 數(shù)據(jù)導(dǎo)出需求,我的第一反應(yīng)是,好好的為啥要導(dǎo)出?
寫入的時候直接寫給定格式的文件如 CSV 不就可以了。
其實真實的業(yè)務(wù)場景,遠非我想的這么簡單。
Elasticsearch 作為存儲庫和檢索源,相關(guān)的輸入數(shù)據(jù)來源早已包羅萬象、幾乎“無所不能”。
如下圖所示:

關(guān)系型數(shù)據(jù)庫(MySQL、Oracle、PostgreSQL)、非關(guān)系型數(shù)據(jù)庫(MongoDB)、大數(shù)據(jù)引擎(Kafka、Spark、Hadoop、Hbase、Flink)、內(nèi)存數(shù)據(jù)庫(Redis)都可以導(dǎo)入 Elasticsearch。
原始數(shù)據(jù)經(jīng)過采集到寫入 Elasticsearch 之前往往經(jīng)過預(yù)處理、ETL(抽取、轉(zhuǎn)換、加載),核心檢索相關(guān)的數(shù)據(jù)落地存儲到 Elasticsearch。
某些特定的業(yè)務(wù)場景(比如:銀行業(yè)務(wù))需要導(dǎo)出 Elasticsearch 數(shù)據(jù),實際是需要導(dǎo)出已經(jīng)預(yù)處理過、已經(jīng)清洗過的 Elasticsearch 數(shù)據(jù)。
那么,問題來了?如何導(dǎo)出呢?
2、Elasticsearch 導(dǎo)出數(shù)據(jù)的方式
以 CSV 格式(導(dǎo)出數(shù)據(jù)格式)數(shù)據(jù)為例。
Elasticsearch 導(dǎo)出數(shù)據(jù)的方式有很多種,包含但不限于:
logstash_output_csv 類似 es2csv python 開源工具包導(dǎo)出 kibana 可視化導(dǎo)出 python、java或shell腳本等自己實現(xiàn)
我們逐個以 Elasticsearch 8.X 版本演示一下。
3、logstash_output_csv 導(dǎo)出
input {
elasticsearch {
hosts => "172.121.10.114:9200"
index => "tianyancha_index"
query => '
{
"query": {
"match_all": {}
}
}
'
ssl => "true"
user => "elastic"
password => "changeme"
ca_file => "/www/...省略.../certs/http_ca.crt"
}
}
output {
csv {
# elastic field name
fields => ["regist_id", "establishment_time", "enttype", "company_name", "company_type"]
# This is path where we store output.
path => "/www/...省略.../sync/tyc_export.csv"
}
}
結(jié)果如下:

生成 CSV 文件如下:
常見報錯信息:
[main] Pipeline error {:pipeline_id=>"main", :exception=>#<Manticore::ClientProtocolException: 172.21.0.14:9200 failed to respond>,
解決方案:開啟 ssl,默認為false。8.X 必須得手動開啟。
4、elasticsearch_tocsv 開源工具包導(dǎo)出
工具名稱:elasticsearch_tocsv 工具地址:https://pypi.org/project/elasticsearch-tocsv/ 工具安裝方式:
pip3 install elasticsearch-tocsv
工具依賴:python 3.8(含)以上版本。 工具實戰(zhàn):
elasticsearch_tocsv -p 9200 -ho 172.121.10.114 -u elastic -pw changeme -s True -cp '../config/certs/http_ca.crt' -i tianyancha_index -f "@regist_id,establishment_time,scope_business,address,registration_number"
參數(shù)含義:
-ho:Elasticsearch IP 地址 -p: Http 端口號 -u:用戶名 -pw:密碼 -cp:CRT證書地址 -s:SSL 認證,默認為false,8.X 需要開啟 -i:索引 -f:導(dǎo)出的字段
工具導(dǎo)出實現(xiàn)截圖:

類似工具很多,拿一個舉例,方便大家實操。
5、借助kibana 導(dǎo)出
1 分鐘視頻就可以搞定。
視頻如下,一看就會。
6、自己寫代碼導(dǎo)出
6.1 Python 程序?qū)С?/span>
簡單的 Python 程序?qū)崿F(xiàn)如下。
def client_init():
ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
es = Elasticsearch(
hosts=[
"https://172.121.10.114:9200"
],
ssl_context=ssl_context,
http_auth=('elastic', 'changeme'),
use_ssl=True,
verify_certs=True,
)
return es
def tianyancha_search():
client =client_init()
s = Search(using=client, index="tianyancha_index") \
.query("match_all")
response = s.execute()
sample = response['hits']['hits']
with open( 'tianyancha_rst.csv', 'w', newline='' ) as csvfile:
spamwriter = csv.writer( csvfile, delimiter=',',
quotechar='|', quoting=csv.QUOTE_MINIMAL )
spamwriter.writerow( ['regist_id_new', 'company_name', 'business_starttime', 'scope_business'] )
for hit in sample:
# fill columns 1, 2, 3 with your data
col1 = hit._source.regist_id_new
col2 = hit._source.company_name
col3 = hit._source.business_starttime
col4 = hit._source.scope_business
spamwriter.writerow( [col1, col2, col3, col4] )
不復(fù)雜三段論:
1)連接 8.X Elasticsearch 集群; 2)遍歷索引獲取數(shù)據(jù) 3)解析數(shù)據(jù)寫入 CSV 文件。
這里只是簡單的 from + size 遍歷,數(shù)據(jù)量大可以改成 scroll 實現(xiàn)。
導(dǎo)出 CSV 結(jié)果如下:

6.2 Shell 腳本導(dǎo)出
curl -s -XGET -H "Content-Type:application/json" --cacert ../config/certs/http_ca.crt -u elastic:changeme 'https://172.121.10.114:9200/tianyancha_index/_search' -d '
{"from": 0,
"size": 2,
"query": {
"match_all": {}
}
}' | jq -r '["regist_id", "establishment_time", "scope_business", "address", "registration_number"],(.hits.hits[] |
[._source.regist_id // "", ._source.establishment_time // "", ._source.scope_business // "", ._source.address // "", ._source.registration_number // ""]) | @csv' > tyc_es2csv.csv
解釋一下:
jq 是 shell 腳本下的 json 解析工具。
["regist_id", ****, "registration_number"]代表以數(shù)組形式自定義輸出多項。
jq 使用細節(jié)可以查看幫助手冊:https://stedolan.github.io/jq/tutorial/
shell 腳本導(dǎo)出 CSV 如下:

7、小結(jié)
能導(dǎo)出 Elasticsearch 方案有 N 多種,本文僅是拋磚引玉。
導(dǎo)出方案如何選型?
根據(jù)業(yè)務(wù)需求,如果不想寫代碼可以借助第三方工具實現(xiàn)。
如果想使用 ELK 組件,推薦使用 logstash。
如果僅自己有針對的實現(xiàn),可以 Python 腳本、Shell 腳本都可以。
更多方案,歡迎留言交流。

