手把手教你使用Flask搭建ES搜索引擎(預(yù)備篇)
回復(fù)“書籍”即可獲贈(zèng)Python從入門到進(jìn)階共10本電子書
/1 前言/
Elasticsearch 是一個(gè)開源的搜索引擎,建立在一個(gè)全文搜索引擎庫 Apache Lucene? 基礎(chǔ)之上。

那么如何實(shí)現(xiàn) Elasticsearch和 Python 的對接成為我們所關(guān)心的問題了 (怎么什么都要和 Python 關(guān)聯(lián)啊)。
/2 Python 交互/
所以,Python 也就提供了可以對接 Elasticsearch的依賴庫。
pip install elasticsearch
初始化連接一個(gè) Elasticsearch 操作對象。
def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):
# self.es = Elasticsearch([ip], http_auth=('username', 'password'), port=9200)
self.es = Elasticsearch("localhost:9200")
self.index_type = index_type
self.index_name = index_name
默認(rèn)端口 9200,初始化前請確保本地已搭建好 Elasticsearch的所屬環(huán)境。
根據(jù) ID 獲取文檔數(shù)據(jù)
def get_doc(self, uid):
return self.es.get(index=self.index_name, id=uid)
插入文檔數(shù)據(jù)
def insert_one(self, doc: dict):
self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
def insert_array(self, docs: list):
for doc in docs:
self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
搜索文檔數(shù)據(jù)
def search(self, query, count: int = 30):
dsl = {
"query": {
"multi_match": {
"query": query,
"fields": ["title", "content", "link"]
}
},
"highlight": {
"fields": {
"title": {}
}
}
}
match_data = self.es.search(index=self.index_name, body=dsl, size=count)
return match_data
def __search(self, query: dict, count: int = 20): # count: 返回的數(shù)據(jù)大小
results = []
params = {
'size': count
}
match_data = self.es.search(index=self.index_name, body=query, params=params)
for hit in match_data['hits']['hits']:
results.append(hit['_source'])
return results
刪除文檔數(shù)據(jù)
def delete_index(self):
try:
self.es.indices.delete(index=self.index_name)
except:
pass
好啊,封裝 search 類也是為了方便調(diào)用,整體貼一下。
from elasticsearch import Elasticsearch
class elasticSearch():
def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):
# self.es = Elasticsearch([ip], http_auth=('elastic', 'password'), port=9200)
self.es = Elasticsearch("localhost:9200")
self.index_type = index_type
self.index_name = index_name
def create_index(self):
if self.es.indices.exists(index=self.index_name) is True:
self.es.indices.delete(index=self.index_name)
self.es.indices.create(index=self.index_name, ignore=400)
def delete_index(self):
try:
self.es.indices.delete(index=self.index_name)
except:
pass
def get_doc(self, uid):
return self.es.get(index=self.index_name, id=uid)
def insert_one(self, doc: dict):
self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
def insert_array(self, docs: list):
for doc in docs:
self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
def search(self, query, count: int = 30):
dsl = {
"query": {
"multi_match": {
"query": query,
"fields": ["title", "content", "link"]
}
},
"highlight": {
"fields": {
"title": {}
}
}
}
match_data = self.es.search(index=self.index_name, body=dsl, size=count)
return match_data
嘗試一下把 Mongodb 中的數(shù)據(jù)插入到 ES 中。
import json
from datetime import datetime
import pymongo
from app.elasticsearchClass import elasticSearch
client = pymongo.MongoClient('127.0.0.1', 27017)
db = client['spider']
sheet = db.get_collection('Spider').find({}, {'_id': 0, })
es = elasticSearch(index_type="spider_data",index_name="spider")
es.create_index()
for i in sheet:
data = {
'title': i["title"],
'content':i["data"],
'link': i["link"],
'create_time':datetime.now()
}
es.insert_one(doc=data)
到 ES 中查看一下,啟動(dòng) elasticsearch-head 插件。
如果是 npm 安裝的那么 cd 到根目錄之后直接 npm run start 就跑起來了。
本地訪問 http://localhost:9100/

發(fā)現(xiàn)新加的 spider 數(shù)據(jù)文檔確實(shí)已經(jīng)進(jìn)去了。
/3 爬蟲入庫/
要想實(shí)現(xiàn) ES 搜索,首先要有數(shù)據(jù)支持,而海量的數(shù)據(jù)往往來自爬蟲。
為了節(jié)省時(shí)間,編寫一個(gè)最簡單的爬蟲,抓取 百度百科。
簡單粗暴一點(diǎn),先 遞歸獲取 很多很多的 url 鏈接
import requests
import re
import time
exist_urls = []
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36',
}
def get_link(url):
try:
response = requests.get(url=url, headers=headers)
response.encoding = 'UTF-8'
html = response.text
link_lists = re.findall('.*?<a target=_blank href="/item/([^:#=<>]*?)".*?</a>', html)
return link_lists
except Exception as e:
pass
finally:
exist_urls.append(url)
# 當(dāng)爬取深度小于10層時(shí),遞歸調(diào)用主函數(shù),繼續(xù)爬取第二層的所有鏈接
def main(start_url, depth=1):
link_lists = get_link(start_url)
if link_lists:
unique_lists = list(set(link_lists) - set(exist_urls))
for unique_url in unique_lists:
unique_url = 'https://baike.baidu.com/item/' + unique_url
with open('url.txt', 'a+') as f:
f.write(unique_url + '\n')
f.close()
if depth < 10:
main(unique_url, depth + 1)
if __name__ == '__main__':
start_url = 'https://baike.baidu.com/item/%E7%99%BE%E5%BA%A6%E7%99%BE%E7%A7%91'
main(start_url)把全部 url 存到 url.txt 文件中之后,然后啟動(dòng)任務(wù)。
# parse.pyfrom celery import Celery
import requests
from lxml import etree
import pymongo
app = Celery('tasks', broker='redis://localhost:6379/2')
client = pymongo.MongoClient('localhost',27017)
db = client['baike']
@app.task
def get_url(link):
item = {}
headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
res = requests.get(link,headers=headers)
res.encoding = 'UTF-8'
doc = etree.HTML(res.text)
content = doc.xpath("http://div[@class='lemma-summary']/div[@class='para']//text()")
print(res.status_code)
print(link,'\t','++++++++++++++++++++')
item['link'] = link
data = ''.join(content).replace(' ', '').replace('\t', '').replace('\n', '').replace('\r', '')
item['data'] = data
if db['Baike'].insert(dict(item)):
print("is OK ...")
else:
print('Fail')
run.py 飛起來
from parse import get_url
def main(url):
result = get_url.delay(url)
return result
def run():
with open('./url.txt', 'r') as f:
for url in f.readlines():
main(url.strip('\n'))
if __name__ == '__main__':
run()黑窗口鍵入
celery -A parse worker -l info -P gevent -c 10
哦豁 !! 你居然使用了 Celery 任務(wù)隊(duì)列,gevent 模式,-c 就是10個(gè)線程刷刷刷就干起來了,速度杠杠的 !!
啥?分布式? 那就加多幾臺(tái)機(jī)器啦,直接把代碼拷貝到目標(biāo)服務(wù)器,通過 redis 共享隊(duì)列協(xié)同多機(jī)抓取。
這里是先將數(shù)據(jù)存儲(chǔ)到了 MongoDB 上(個(gè)人習(xí)慣),你也可以直接存到 ES 中,但是單條單條的插入速度堪憂(接下來會(huì)講到優(yōu)化,哈哈)。
使用前面的例子將 Mongo 中的數(shù)據(jù)批量導(dǎo)入到 ES 中,OK !!!

到這一個(gè)簡單的數(shù)據(jù)抓取就已經(jīng)完畢了。
好啦,現(xiàn)在 ES 中已經(jīng)有了數(shù)據(jù)啦,接下來就應(yīng)該是 Flask web 的操作啦,當(dāng)然,Django,FastAPI 也很優(yōu)秀。嘿嘿,你喜歡 !!
關(guān)于FastAPI 的文章可以看這個(gè)系列文章:
1、(入門篇)簡析Python web框架FastAPI——一個(gè)比Flask和Tornada更高性能的API 框架
2、(進(jìn)階篇)Python web框架FastAPI——一個(gè)比Flask和Tornada更高性能的API 框架
3、(完結(jié)篇)Python web框架FastAPI——一個(gè)比Flask和Tornada更高性能的API 框架
/4 Flask 項(xiàng)目結(jié)構(gòu)/

這樣一來前期工作就差不多了,接下來剩下的工作主要集中于 Flask 的實(shí)際開發(fā)中,蓄力中 ?。?br>
------------------- End -------------------
往期精彩文章推薦:
手把手用Python教你如何發(fā)現(xiàn)隱藏wifi
手把手教你用Python做個(gè)可視化的“剪刀石頭布”小游戲
手把手用Python網(wǎng)絡(luò)爬蟲帶你爬取全國著名高校附近酒店評論

歡迎大家點(diǎn)贊,留言,轉(zhuǎn)發(fā),轉(zhuǎn)載,感謝大家的相伴與支持
想加入Python學(xué)習(xí)群請?jiān)诤笈_(tái)回復(fù)【入群】
萬水千山總是情,點(diǎn)個(gè)【在看】行不行
/今日留言主題/
隨便說一兩句吧~
