Scrapy源碼剖析:Scrapy如何完成抓取任務(wù)?
運(yùn)行入口
調(diào)用? cmdline.py?的?execute?方法找到對(duì)應(yīng)的? 命令實(shí)例?解析命令行構(gòu)建? CrawlerProcess?實(shí)例,調(diào)用?crawl?和?start?方法開始抓取
crawl?方法最終是調(diào)用了?Cralwer?實(shí)例的?crawl,這個(gè)方法最終把控制權(quán)交給了Engine,而?start?方法注冊(cè)好協(xié)程池,就開始異步調(diào)度執(zhí)行了。Cralwer?的?crawl?方法:@defer.inlineCallbacks
def?crawl(self,?*args,?**kwargs):
????assert?not?self.crawling,?"Crawling?already?taking?place"
????self.crawling?=?True
????try:
????????#?創(chuàng)建爬蟲實(shí)例
????????self.spider?=?self._create_spider(*args,?**kwargs)
????????#?創(chuàng)建引擎
????????self.engine?=?self._create_engine()
????????#?調(diào)用spider的start_requests?獲取種子URL
????????start_requests?=?iter(self.spider.start_requests())
????????#?調(diào)用engine的open_spider?交由引擎調(diào)度
????????yield?self.engine.open_spider(self.spider,?start_requests)
????????yield?defer.maybeDeferred(self.engine.start)
????except?Exception:
????????if?six.PY2:
????????????exc_info?=?sys.exc_info()
????????self.crawling?=?False
????????if?self.engine?is?not?None:
????????????yield?self.engine.close()
????????if?six.PY2:
????????????six.reraise(*exc_info)
????????raise
spider?的?start_requests?方法,這個(gè)方法就是我們平時(shí)寫的最多爬蟲類的父類,它在?spiders/__init__.py?中定義:def?start_requests(self):
????#?根據(jù)定義好的start_urls屬性?生成種子URL對(duì)象
????for?url?in?self.start_urls:
????????yield?self.make_requests_from_url(url)
def?make_requests_from_url(self,?url):
????#?構(gòu)建Request對(duì)象
????return?Request(url,?dont_filter=True)
構(gòu)建請(qǐng)求
start_urls?屬性,原來(lái)就是在這里用來(lái)構(gòu)建?Request?的,來(lái)看?Request?的定義:class?Request(object_ref):
????def?__init__(self,?url,?callback=None,?method='GET',?headers=None,?body=None,
?????????????????cookies=None,?meta=None,?encoding='utf-8',?priority=0,
?????????????????dont_filter=False,?errback=None):
????????#?編碼
????????self._encoding?=?encoding
????????#?請(qǐng)求方法
????????self.method?=?str(method).upper()
????????#?設(shè)置url
????????self._set_url(url)
????????#?設(shè)置body
????????self._set_body(body)
????????assert?isinstance(priority,?int),?"Request?priority?not?an?integer:?%r"?%?priority
????????#?優(yōu)先級(jí)
????????self.priority?=?priority
????????assert?callback?or?not?errback,?"Cannot?use?errback?without?a?callback"
????????#?回調(diào)函數(shù)
????????self.callback?=?callback
????????#?異常回調(diào)函數(shù)
????????self.errback?=?errback
????????#?cookies
????????self.cookies?=?cookies?or?{}
????????#?構(gòu)建Header
????????self.headers?=?Headers(headers?or?{},?encoding=encoding)
????????#?是否需要過(guò)濾
????????self.dont_filter?=?dont_filter
??#?附加信息
????????self._meta?=?dict(meta)?if?meta?else?None
Request?對(duì)象比較簡(jiǎn)單,就是封裝了請(qǐng)求參數(shù)、請(qǐng)求方法、回調(diào)以及可附加的屬性信息。start_requests?和?make_requests_from_url?這 2 個(gè)方法,用來(lái)自定義邏輯構(gòu)建種子請(qǐng)求。引擎調(diào)度
crawl?方法,構(gòu)建好種子請(qǐng)求對(duì)象后,調(diào)用了?engine?的?open_spider:@defer.inlineCallbacks
def?open_spider(self,?spider,?start_requests=(),?close_if_idle=True):
????assert?self.has_capacity(),?"No?free?spider?slot?when?opening?%r"?%?\
????????spider.name
????logger.info("Spider?opened",?extra={'spider':?spider})
????#?注冊(cè)_next_request調(diào)度方法?循環(huán)調(diào)度
????nextcall?=?CallLaterOnce(self._next_request,?spider)
????#?初始化scheduler
????scheduler?=?self.scheduler_cls.from_crawler(self.crawler)
????#?調(diào)用爬蟲中間件?處理種子請(qǐng)求
????start_requests?=?yield?self.scraper.spidermw.process_start_requests(start_requests,?spider)
????#?封裝Slot對(duì)象
????slot?=?Slot(start_requests,?close_if_idle,?nextcall,?scheduler)
????self.slot?=?slot
????self.spider?=?spider
????#?調(diào)用scheduler的open
????yield?scheduler.open(spider)
????#?調(diào)用scrapyer的open
????yield?self.scraper.open_spider(spider)
????#?調(diào)用stats的open
????self.crawler.stats.open_spider(spider)
????yield?self.signals.send_catch_log_deferred(signals.spider_opened,?spider=spider)
????#?發(fā)起調(diào)度
????slot.nextcall.schedule()
????slot.heartbeat.start(5)
CallLaterOnce,之后把?_next_request?方法注冊(cè)了進(jìn)去,看此類的實(shí)現(xiàn):class?CallLaterOnce(object):
????#?在twisted的reactor中循環(huán)調(diào)度一個(gè)方法
????def?__init__(self,?func,?*a,?**kw):
????????self._func?=?func
????????self._a?=?a
????????self._kw?=?kw
????????self._call?=?None
????def?schedule(self,?delay=0):
????????#?上次發(fā)起調(diào)度?才可再次繼續(xù)調(diào)度
????????if?self._call?is?None:
????????????#?注冊(cè)self到callLater中
????????????self._call?=?reactor.callLater(delay,?self)
????def?cancel(self):
????????if?self._call:
????????????self._call.cancel()
????def?__call__(self):
????????#?上面注冊(cè)的是self?所以會(huì)執(zhí)行__call__
????????self._call?=?None
????????return?self._func(*self._a,?**self._kw)?
twisted?的?reactor?中異步執(zhí)行,以后執(zhí)行只需調(diào)用?schedule,就會(huì)注冊(cè)?self?到?reactor?的?callLater?中,然后它會(huì)執(zhí)行?__call__?方法,最終執(zhí)行的就是我們注冊(cè)的方法。_next_request,也就是說(shuō),此方法會(huì)循環(huán)調(diào)度,直到程序退出。process_start_requests?方法,你可以定義多個(gè)自己的爬蟲中間件,每個(gè)類都重寫此方法,爬蟲在調(diào)度之前會(huì)分別調(diào)用你定義好的爬蟲中間件,來(lái)處理初始化請(qǐng)求,你可以進(jìn)行過(guò)濾、加工、篩選以及你想做的任何邏輯。調(diào)度器
Scheduler?的?open:def?open(self,?spider):
????self.spider?=?spider
????#?實(shí)例化優(yōu)先級(jí)隊(duì)列
????self.mqs?=?self.pqclass(self._newmq)
????#?如果定義了dqdir則實(shí)例化基于磁盤的隊(duì)列
????self.dqs?=?self._dq()?if?self.dqdir?else?None
????#?調(diào)用請(qǐng)求指紋過(guò)濾器的open方法
????return?self.df.open()
????
def?_dq(self):
????#?實(shí)例化磁盤隊(duì)列
????activef?=?join(self.dqdir,?'active.json')
????if?exists(activef):
????????with?open(activef)?as?f:
????????????prios?=?json.load(f)
????else:
????????prios?=?()
????q?=?self.pqclass(self._newdq,?startprios=prios)
????if?q:
????????logger.info("Resuming?crawl?(%(queuesize)d?requests?scheduled)",
????????????????????{'queuesize':?len(q)},?extra={'spider':?self.spider})
????return?q
open?方法中,調(diào)度器會(huì)實(shí)例化出優(yōu)先級(jí)隊(duì)列,以及根據(jù)?dqdir是否配置,決定是否使用磁盤隊(duì)列,最后調(diào)用了請(qǐng)求指紋過(guò)濾器的?open?方法,這個(gè)方法在父類?BaseDupeFilter?中定義:class?BaseDupeFilter(object):
????#?過(guò)濾器基類,子類可重寫以下方法
????@classmethod
????def?from_settings(cls,?settings):
????????return?cls()
????def?request_seen(self,?request):
????????#?請(qǐng)求過(guò)濾
????????return?False
????def?open(self):
????????#?可重寫?完成過(guò)濾器的初始化工作
????????pass
????def?close(self,?reason):
????????#?可重寫?完成關(guān)閉過(guò)濾器工作
????????pass
????def?log(self,?request,?spider):
????????pas
RFPDupeFilter?過(guò)濾器實(shí)現(xiàn)過(guò)濾重復(fù)請(qǐng)求的邏輯,這里先對(duì)這個(gè)類有個(gè)了解,后面會(huì)講具體是如何過(guò)濾重復(fù)請(qǐng)求的。Scraper
Scraper?的?open_spider?方法,在之前的文章中我們提到過(guò),Scraper?類是連接?Engine、Spider、Item Pipeline?這 3 個(gè)組件的橋梁:@defer.inlineCallbacks
def?open_spider(self,?spider):
????self.slot?=?Slot()
????#?調(diào)用所有pipeline的open_spider
????yield?self.itemproc.open_spider(spider)
Scraper?調(diào)用所有?Pipeline?的?open_spider?方法,如果我們定義了多個(gè)?Pipeline?輸出類,可以重寫?open_spider?完成每個(gè)?Pipeline?在輸出前的初始化工作。循環(huán)調(diào)度
open?方法后,最后調(diào)用了?nextcall.schedule()?開始調(diào)度,也就是循環(huán)執(zhí)行在上面注冊(cè)的?_next_request?方法:def?_next_request(self,?spider):
????#?此方法會(huì)循環(huán)調(diào)度
????slot?=?self.slot
????if?not?slot:
????????return
????#?暫停
????if?self.paused:
????????return
????#?是否等待
????while?not?self._needs_backout(spider):
????????#?從scheduler中獲取request
????????#?注意:第一次獲取時(shí),是沒(méi)有的,也就是會(huì)break出來(lái)
????????#?從而執(zhí)行下面的邏輯
????????if?not?self._next_request_from_scheduler(spider):
????????????break
????#?如果start_requests有數(shù)據(jù)且不需要等待
????if?slot.start_requests?and?not?self._needs_backout(spider):
????????try:
????????????#?獲取下一個(gè)種子請(qǐng)求
????????????request?=?next(slot.start_requests)
????????except?StopIteration:
????????????slot.start_requests?=?None
????????except?Exception:
????????????slot.start_requests?=?None
????????????logger.error('Error?while?obtaining?start?requests',
?????????????????????????exc_info=True,?extra={'spider':?spider})
????????else:
????????????#?調(diào)用crawl,實(shí)際是把request放入scheduler的隊(duì)列中
????????????self.crawl(request,?spider)
????#?空閑則關(guān)閉spider
????if?self.spider_is_idle(spider)?and?slot.close_if_idle:
????????self._spider_idle(spider)
????????
def?_needs_backout(self,?spider):
????#?是否需要等待,取決4個(gè)條件
????#?1.?Engine是否stop
????#?2.?slot是否close
????#?3.?downloader下載超過(guò)預(yù)設(shè)
????#?4.?scraper處理response超過(guò)預(yù)設(shè)
????slot?=?self.slot
????return?not?self.running?\
????????or?slot.closing?\
????????or?self.downloader.needs_backout()?\
????????or?self.scraper.slot.needs_backout()
def?_next_request_from_scheduler(self,?spider):
????slot?=?self.slot
????#?從scheduler拿出下個(gè)request
????request?=?slot.scheduler.next_request()
????if?not?request:
????????return
????#?下載
????d?=?self._download(request,?spider)
????#?注冊(cè)成功、失敗、出口回調(diào)方法
????d.addBoth(self._handle_downloader_output,?request,?spider)
????d.addErrback(lambda?f:?logger.info('Error?while?handling?downloader?output',
???????????????????????????????????????exc_info=failure_to_exc_info(f),
???????????????????????????????????????extra={'spider':?spider}))
????d.addBoth(lambda?_:?slot.remove_request(request))
????d.addErrback(lambda?f:?logger.info('Error?while?removing?request?from?slot',
???????????????????????????????????????exc_info=failure_to_exc_info(f),
???????????????????????????????????????extra={'spider':?spider}))
????d.addBoth(lambda?_:?slot.nextcall.schedule())
????d.addErrback(lambda?f:?logger.info('Error?while?scheduling?new?request',
???????????????????????????????????????exc_info=failure_to_exc_info(f),
???????????????????????????????????????extra={'spider':?spider}))
????return?d
????
def?crawl(self,?request,?spider):
????assert?spider?in?self.open_spiders,?\
????????"Spider?%r?not?opened?when?crawling:?%s"?%?(spider.name,?request)
????#?request放入scheduler隊(duì)列,調(diào)用nextcall的schedule
????self.schedule(request,?spider)
????self.slot.nextcall.schedule()
def?schedule(self,?request,?spider):
????self.signals.send_catch_log(signal=signals.request_scheduled,
????????????request=request,?spider=spider)
????#?調(diào)用scheduler的enqueue_request,把request放入scheduler隊(duì)列
????if?not?self.slot.scheduler.enqueue_request(request):
????????self.signals.send_catch_log(signal=signals.request_dropped,
????????????????????????????????????request=request,?spider=spider)
_next_request?方法首先調(diào)用?_needs_backout?檢查是否需要等待,等待的條件有以下幾種情況:引擎是否主動(dòng)關(guān)閉 Slot是否關(guān)閉 下載器在網(wǎng)絡(luò)下載時(shí)是否超過(guò)預(yù)設(shè)參數(shù) Scraper處理輸出是否超過(guò)預(yù)設(shè)參數(shù)
_next_request_from_scheduler,此方法從名字上就能看出,主要是從?Schduler?中獲取?Request。Scheduler?中是沒(méi)有放入任何?Request?的,這里會(huì)直接break?出來(lái),執(zhí)行下面的邏輯,而下面就會(huì)調(diào)用?crawl?方法,實(shí)際是把請(qǐng)求放到?Scheduler?的請(qǐng)求隊(duì)列,放入隊(duì)列的過(guò)程會(huì)經(jīng)過(guò)請(qǐng)求過(guò)濾器校驗(yàn)是否重復(fù)。_next_request_from_scheduler?時(shí),就能從?Scheduler?中獲取到下載請(qǐng)求,然后執(zhí)行下載動(dòng)作。crawl:def?crawl(self,?request,?spider):
????assert?spider?in?self.open_spiders,?\
????????"Spider?%r?not?opened?when?crawling:?%s"?%?(spider.name,?request)
????#?放入Scheduler隊(duì)列
????self.schedule(request,?spider)
????#?進(jìn)行下一次調(diào)度
????self.slot.nextcall.schedule()
????
def?schedule(self,?request,?spider):
????self.signals.send_catch_log(signal=signals.request_scheduled,
????????????request=request,?spider=spider)
????#?放入Scheduler隊(duì)列
????if?not?self.slot.scheduler.enqueue_request(request):
????????self.signals.send_catch_log(signal=signals.request_dropped,
????????????????????????????????????request=request,?spider=spider)
crawl?實(shí)際就是把請(qǐng)求放入?Scheduler?的隊(duì)列中,下面看請(qǐng)求是如何入隊(duì)列的。請(qǐng)求入隊(duì)
Scheduler?請(qǐng)求入隊(duì)方法:def?enqueue_request(self,?request):
????#?請(qǐng)求入隊(duì)?若請(qǐng)求過(guò)濾器驗(yàn)證重復(fù)?返回False
????if?not?request.dont_filter?and?self.df.request_seen(request):
????????self.df.log(request,?self.spider)
????????return?False
????#?磁盤隊(duì)列是否入隊(duì)成功
????dqok?=?self._dqpush(request)
????if?dqok:
????????self.stats.inc_value('scheduler/enqueued/disk',?spider=self.spider)
????else:
????????#?沒(méi)有定義磁盤隊(duì)列?則使用內(nèi)存隊(duì)列
????????self._mqpush(request)
????????self.stats.inc_value('scheduler/enqueued/memory',?spider=self.spider)
????self.stats.inc_value('scheduler/enqueued',?spider=self.spider)
????return?True
????
def?_dqpush(self,?request):
????#?是否定義磁盤隊(duì)列
????if?self.dqs?is?None:
????????return
????try:
????????#?Request對(duì)象轉(zhuǎn)dict
????????reqd?=?request_to_dict(request,?self.spider)
????????#?放入磁盤隊(duì)列
????????self.dqs.push(reqd,?-request.priority)
????except?ValueError?as?e:??#?non?serializable?request
????????if?self.logunser:
????????????msg?=?("Unable?to?serialize?request:?%(request)s?-?reason:"
???????????????????"?%(reason)s?-?no?more?unserializable?requests?will?be"
???????????????????"?logged?(stats?being?collected)")
????????????logger.warning(msg,?{'request':?request,?'reason':?e},
???????????????????????????exc_info=True,?extra={'spider':?self.spider})
????????????self.logunser?=?False
????????self.stats.inc_value('scheduler/unserializable',
?????????????????????????????spider=self.spider)
????????return
????else:
????????return?True
????
def?_mqpush(self,?request):
????#?入內(nèi)存隊(duì)列
????self.mqs.push(request,?-request.priority)
Scheduler?時(shí)候傳入?jobdir,則使用磁盤隊(duì)列,否則使用內(nèi)存隊(duì)列,默認(rèn)使用內(nèi)存隊(duì)列。指紋過(guò)濾
request_seen:def?request_seen(self,?request):
????#?生成請(qǐng)求指紋
????fp?=?self.request_fingerprint(request)
????#?請(qǐng)求指紋如果在指紋集合中?則認(rèn)為重復(fù)
????if?fp?in?self.fingerprints:
????????return?True
????#?不重復(fù)則記錄此指紋
????self.fingerprints.add(fp)
????#?實(shí)例化如果有path則把指紋寫入文件
????if?self.file:
????????self.file.write(fp?+?os.linesep)
def?request_fingerprint(self,?request):
????#?調(diào)用utils.request的request_fingerprint
????return?request_fingerprint(request)
utils.request?的?request_fingerprint?邏輯如下:def?request_fingerprint(request,?include_headers=None):
????"""生成請(qǐng)求指紋"""
????#?指紋生成是否包含headers
????if?include_headers:
????????include_headers?=?tuple(to_bytes(h.lower())
?????????????????????????????????for?h?in?sorted(include_headers))
????cache?=?_fingerprint_cache.setdefault(request,?{})
????if?include_headers?not?in?cache:
????????#?使用sha1算法生成指紋
????????fp?=?hashlib.sha1()
????????fp.update(to_bytes(request.method))
????????fp.update(to_bytes(canonicalize_url(request.url)))
????????fp.update(request.body?or?b'')
????????if?include_headers:
????????????for?hdr?in?include_headers:
????????????????if?hdr?in?request.headers:
????????????????????fp.update(hdr)
????????????????????for?v?in?request.headers.getlist(hdr):
????????????????????????fp.update(v)
????????cache[include_headers]?=?fp.hexdigest()
????return?cache[include_headers]
Request?對(duì)象生成一個(gè)請(qǐng)求指紋,在這里使用?sha1?算法,并記錄到指紋集合,每次請(qǐng)求入隊(duì)前先到這里驗(yàn)證一下指紋集合,如果已存在,則認(rèn)為請(qǐng)求重復(fù),則不會(huì)重復(fù)入隊(duì)列。enqueue_request?的第一行判斷,僅需將?Request?實(shí)例的?dont_filter?設(shè)置為?True?就可以重復(fù)抓取此請(qǐng)求,非常靈活。下載請(qǐng)求
_next_request_from_scheduler?方法,此時(shí)調(diào)用調(diào)度器的?next_request?方法,就是從調(diào)度器隊(duì)列中取出一個(gè)請(qǐng)求,這次就要開始進(jìn)行網(wǎng)絡(luò)下載了,也就是調(diào)用?_download:def?_download(self,?request,?spider):
????#?下載請(qǐng)求
????slot?=?self.slot
????slot.add_request(request)
????def?_on_success(response):
????????#?成功回調(diào)?結(jié)果必須是Request或Response
????????assert?isinstance(response,?(Response,?Request))
????????if?isinstance(response,?Response):
????????????#?如果下載后結(jié)果為Response?返回Response
????????????response.request?=?request
????????????logkws?=?self.logformatter.crawled(request,?response,?spider)
????????????logger.log(*logformatter_adapter(logkws),?extra={'spider':?spider})
????????????self.signals.send_catch_log(signal=signals.response_received,?\
????????????????response=response,?request=request,?spider=spider)
????????return?response
????def?_on_complete(_):
????????#?此次下載完成后?繼續(xù)進(jìn)行下一次調(diào)度
????????slot.nextcall.schedule()
????????return?_
????#?調(diào)用Downloader進(jìn)行下載
????dwld?=?self.downloader.fetch(request,?spider)
????#?注冊(cè)成功回調(diào)
????dwld.addCallbacks(_on_success)
????#?結(jié)束回調(diào)
????dwld.addBoth(_on_complete)
????return?dwld
Downloader?的?fetch:def?fetch(self,?request,?spider):
????def?_deactivate(response):
????????#?下載結(jié)束后刪除此記錄
????????self.active.remove(request)
????????return?response
????#?下載前記錄處理中的請(qǐng)求
????self.active.add(request)
????#?調(diào)用下載器中間件download?并注冊(cè)下載成功的回調(diào)方法是self._enqueue_request
????dfd?=?self.middleware.download(self._enqueue_request,?request,?spider)
????#?注冊(cè)結(jié)束回調(diào)
????return?dfd.addBoth(_deactivate)
download,并注冊(cè)下載成功的回調(diào)方法是?_enqueue_request,來(lái)看下載方法:def?download(self,?download_func,?request,?spider):
[email protected]
????def?process_request(request):
????????#?如果下載器中間件有定義process_request?則依次執(zhí)行
????????for?method?in?self.methods['process_request']:
????????????response?=?yield?method(request=request,?spider=spider)
????????????assert?response?is?None?or?isinstance(response,?(Response,?Request)),?\
????????????????????'Middleware?%s.process_request?must?return?None,?Response?or?Request,?got?%s'?%?\
????????????????????(six.get_method_self(method).__class__.__name__,?response.__class__.__name__)
????????????#?如果下載器中間件有返回值?直接返回此結(jié)果
????????????if?response:
????????????????defer.returnValue(response)
????????#?如果下載器中間件沒(méi)有返回值,則執(zhí)行注冊(cè)進(jìn)來(lái)的方法?也就是Downloader的_enqueue_request
????????defer.returnValue((yield?download_func(request=request,spider=spider)))
[email protected]
????def?process_response(response):
????????assert?response?is?not?None,?'Received?None?in?process_response'
????????if?isinstance(response,?Request):
????????????defer.returnValue(response)
????????#?如果下載器中間件有定義process_response?則依次執(zhí)行
????????for?method?in?self.methods['process_response']:
????????????response?=?yield?method(request=request,?response=response,
????????????????????????????????????spider=spider)
????????????assert?isinstance(response,?(Response,?Request)),?\
????????????????'Middleware?%s.process_response?must?return?Response?or?Request,?got?%s'?%?\
????????????????(six.get_method_self(method).__class__.__name__,?type(response))
????????????if?isinstance(response,?Request):
????????????????defer.returnValue(response)
????????defer.returnValue(response)
[email protected]
????def?process_exception(_failure):
????????exception?=?_failure.value
????????#?如果下載器中間件有定義process_exception?則依次執(zhí)行
????????for?method?in?self.methods['process_exception']:
????????????response?=?yield?method(request=request,?exception=exception,
????????????????????????????????????spider=spider)
????????????assert?response?is?None?or?isinstance(response,?(Response,?Request)),?\
????????????????'Middleware?%s.process_exception?must?return?None,?Response?or?Request,?got?%s'?%?\
????????????????(six.get_method_self(method).__class__.__name__,?type(response))
????????????if?response:
????????????????defer.returnValue(response)
????????defer.returnValue(_failure)
????#?注冊(cè)執(zhí)行、錯(cuò)誤、回調(diào)方法
????deferred?=?mustbe_deferred(process_request,?request)
????deferred.addErrback(process_exception)
????deferred.addCallback(process_response)
????return?deferred
process_request,可對(duì)?Request?進(jìn)行加工、處理、校驗(yàn)等操作,然后發(fā)起真正的網(wǎng)絡(luò)下載,也就是第一個(gè)參數(shù)?download_func,在這里是?Downloader?的?_enqueue_request?方法:Downloader的?_enqueue_request:def?_enqueue_request(self,?request,?spider):
????#?加入下載請(qǐng)求隊(duì)列
????key,?slot?=?self._get_slot(request,?spider)
????request.meta['download_slot']?=?key
????def?_deactivate(response):
????????slot.active.remove(request)
????????return?response
????slot.active.add(request)
????deferred?=?defer.Deferred().addBoth(_deactivate)
????#?下載隊(duì)列
????slot.queue.append((request,?deferred))
????#?處理下載隊(duì)列
????self._process_queue(spider,?slot)
????return?deferred
????
def?_process_queue(self,?spider,?slot):
????if?slot.latercall?and?slot.latercall.active():
????????return
????#?如果延遲下載參數(shù)有配置?則延遲處理隊(duì)列
????now?=?time()
????delay?=?slot.download_delay()
????if?delay:
????????penalty?=?delay?-?now?+?slot.lastseen
????????if?penalty?>?0:
????????????slot.latercall?=?reactor.callLater(penalty,?self._process_queue,?spider,?slot)
????????????return
????#?處理下載隊(duì)列
????while?slot.queue?and?slot.free_transfer_slots()?>?0:
????????slot.lastseen?=?now
????????#?從下載隊(duì)列中取出下載請(qǐng)求
????????request,?deferred?=?slot.queue.popleft()
????????#?開始下載
????????dfd?=?self._download(slot,?request,?spider)
????????dfd.chainDeferred(deferred)
????????#?延遲
????????if?delay:
????????????self._process_queue(spider,?slot)
????????????break
????????????
def?_download(self,?slot,?request,?spider):
????#?注冊(cè)方法?調(diào)用handlers的download_request
????dfd?=?mustbe_deferred(self.handlers.download_request,?request,?spider)
????#?注冊(cè)下載完成回調(diào)方法
????def?_downloaded(response):
????????self.signals.send_catch_log(signal=signals.response_downloaded,
????????????????????????????????????response=response,
????????????????????????????????????request=request,
????????????????????????????????????spider=spider)
????????return?response
????dfd.addCallback(_downloaded)
????slot.transferring.add(request)
????def?finish_transferring(_):
????????slot.transferring.remove(request)
????????#?下載完成后調(diào)用_process_queue
????????self._process_queue(spider,?slot)
????????return?_
????return?dfd.addBoth(finish_transferring)
self.handlers.download_request:def?download_request(self,?request,?spider):
????#?獲取請(qǐng)求的scheme
????scheme?=?urlparse_cached(request).scheme
????#?根據(jù)scheeme獲取下載處理器
????handler?=?self._get_handler(scheme)
????if?not?handler:
????????raise?NotSupported("Unsupported?URL?scheme?'%s':?%s"?%
???????????????????????????(scheme,?self._notconfigured[scheme]))
????#?開始下載?并返回結(jié)果
????return?handler.download_request(request,?spider)
????
def?_get_handler(self,?scheme):
????#?根據(jù)scheme獲取對(duì)應(yīng)的下載處理器
????#?配置文件中定義好了http、https、ftp等資源的下載處理器
????if?scheme?in?self._handlers:
????????return?self._handlers[scheme]
????if?scheme?in?self._notconfigured:
????????return?None
????if?scheme?not?in?self._schemes:
????????self._notconfigured[scheme]?=?'no?handler?available?for?that?scheme'
????????return?None
????path?=?self._schemes[scheme]
????try:
????????#?實(shí)例化下載處理器
????????dhcls?=?load_object(path)
????????dh?=?dhcls(self._crawler.settings)
????except?NotConfigured?as?ex:
????????self._notconfigured[scheme]?=?str(ex)
????????return?None
????except?Exception?as?ex:
????????logger.error('Loading?"%(clspath)s"?for?scheme?"%(scheme)s"',
?????????????????????{"clspath":?path,?"scheme":?scheme},
?????????????????????exc_info=True,??extra={'crawler':?self._crawler})
????????self._notconfigured[scheme]?=?str(ex)
????????return?None
????else:
????????self._handlers[scheme]?=?dh
????return?self._handlers[scheme]
request?的?scheme?來(lái)獲取對(duì)應(yīng)的下載處理器,默認(rèn)配置文件中定義的下載處理器如下:DOWNLOAD_HANDLERS_BASE?=?{
????'file':?'scrapy.core.downloader.handlers.file.FileDownloadHandler',
????'http':?'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
????'https':?'scrapy.core.downloader.handlers.http.HTTPDownloadHandler',
????'s3':?'scrapy.core.downloader.handlers.s3.S3DownloadHandler',
????'ftp':?'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',
}
download_request?方法,完成網(wǎng)絡(luò)下載,這里不再詳細(xì)講解每個(gè)處理器的實(shí)現(xiàn),簡(jiǎn)單來(lái)說(shuō),你可以把它想象成封裝好的網(wǎng)絡(luò)下載庫(kù),輸入U(xiǎn)RL,它會(huì)給你輸出下載結(jié)果,這樣方便理解。process_exception?方法,每個(gè)中間件只需定義自己的異常處理邏輯即可。process_response?方法,每個(gè)中間件可以進(jìn)一步處理下載后的結(jié)果,最終返回。process_request?方法是每個(gè)中間件順序執(zhí)行的,而?process_response?和?process_exception?方法是每個(gè)中間件倒序執(zhí)行的,具體可看一下?DownaloderMiddlewareManager?的?_add_middleware?方法,就可以明白是如何注冊(cè)這個(gè)方法鏈的。ExecuteEngine?的?_next_request_from_scheduler?中,會(huì)看到調(diào)用了?_handle_downloader_output,也就是處理下載結(jié)果的邏輯:def?_handle_downloader_output(self,?response,?request,?spider):
????#?下載結(jié)果必須是Request、Response、Failure其一
????assert?isinstance(response,?(Request,?Response,?Failure)),?response
????#?如果是Request?則再次調(diào)用crawl?執(zhí)行Scheduler的入隊(duì)邏輯
????if?isinstance(response,?Request):
????????self.crawl(response,?spider)
????????return
????#?如果是Response或Failure?則調(diào)用scraper的enqueue_scrape進(jìn)一步處理
????#?主要是和Spiders和Pipeline交互
????d?=?self.scraper.enqueue_scrape(response,?request,?spider)
????d.addErrback(lambda?f:?logger.error('Error?while?enqueuing?downloader?output',
????????????????????????????????????????exc_info=failure_to_exc_info(f),
????????????????????????????????????????extra={'spider':?spider}))
????return?d
如果返回的是? Request?實(shí)例,則直接再次放入?Scheduler?請(qǐng)求隊(duì)列如果返回的是是? Response?或?Failure?實(shí)例,則調(diào)用?Scraper?的?enqueue_scrape?方法,做進(jìn)一步處理
處理下載結(jié)果
Scraper?的?enqueue_scrape,看Scraper?組件是如何處理后續(xù)邏輯的:def?enqueue_scrape(self,?response,?request,?spider):
????#?加入Scrape處理隊(duì)列
????slot?=?self.slot
????dfd?=?slot.add_response_request(response,?request)
????def?finish_scraping(_):
????????slot.finish_response(response,?request)
????????self._check_if_closing(spider,?slot)
????????self._scrape_next(spider,?slot)
????????return?_
????dfd.addBoth(finish_scraping)
????dfd.addErrback(
????????lambda?f:?logger.error('Scraper?bug?processing?%(request)s',
???????????????????????????????{'request':?request},
???????????????????????????????exc_info=failure_to_exc_info(f),
???????????????????????????????extra={'spider':?spider}))
????self._scrape_next(spider,?slot)
????return?dfd
def?_scrape_next(self,?spider,?slot):
????while?slot.queue:
????????#?從Scraper隊(duì)列中獲取一個(gè)待處理的任務(wù)
????????response,?request,?deferred?=?slot.next_response_request_deferred()
????????self._scrape(response,?request,?spider).chainDeferred(deferred)
def?_scrape(self,?response,?request,?spider):
????assert?isinstance(response,?(Response,?Failure))
????#?調(diào)用_scrape2繼續(xù)處理
????dfd?=?self._scrape2(response,?request,?spider)
????#?注冊(cè)異常回調(diào)
????dfd.addErrback(self.handle_spider_error,?request,?response,?spider)
????#?出口回調(diào)
????dfd.addCallback(self.handle_spider_output,?request,?response,?spider)
????return?dfd
def?_scrape2(self,?request_result,?request,?spider):
????#?如果結(jié)果不是Failure實(shí)例?則調(diào)用爬蟲中間件管理器的scrape_response
????if?not?isinstance(request_result,?Failure):
????????return?self.spidermw.scrape_response(
????????????self.call_spider,?request_result,?request,?spider)
????else:
????????#?直接調(diào)用call_spider
????????dfd?=?self.call_spider(request_result,?request,?spider)
????????return?dfd.addErrback(
????????????self._log_download_errors,?request_result,?request,?spider)
Scraper?的處理隊(duì)列中,然后從隊(duì)列中獲取到任務(wù),如果不是異常結(jié)果,則調(diào)用爬蟲中間件管理器的?scrape_response?方法:def?scrape_response(self,?scrape_func,?response,?request,?spider):
????fname?=?lambda?f:'%s.%s'?%?(
????????????six.get_method_self(f).__class__.__name__,
????????????six.get_method_function(f).__name__)
????def?process_spider_input(response):
????????#?執(zhí)行一系列爬蟲中間件的process_spider_input
????????for?method?in?self.methods['process_spider_input']:
????????????try:
????????????????result?=?method(response=response,?spider=spider)
????????????????assert?result?is?None,?\
????????????????????????'Middleware?%s?must?returns?None?or?'?\
????????????????????????'raise?an?exception,?got?%s?'?\
????????????????????????%?(fname(method),?type(result))
????????????except:
????????????????return?scrape_func(Failure(),?request,?spider)
????????#?執(zhí)行完中間件的一系列process_spider_input方法后?執(zhí)行call_spider
????????return?scrape_func(response,?request,?spider)
????def?process_spider_exception(_failure):
????????#?執(zhí)行一系列爬蟲中間件的process_spider_exception
????????exception?=?_failure.value
????????for?method?in?self.methods['process_spider_exception']:
????????????result?=?method(response=response,?exception=exception,?spider=spider)
????????????assert?result?is?None?or?_isiterable(result),?\
????????????????'Middleware?%s?must?returns?None,?or?an?iterable?object,?got?%s?'?%?\
????????????????(fname(method),?type(result))
????????????if?result?is?not?None:
????????????????return?result
????????return?_failure
????def?process_spider_output(result):
????????#?執(zhí)行一系列爬蟲中間件的process_spider_output
????????for?method?in?self.methods['process_spider_output']:
????????????result?=?method(response=response,?result=result,?spider=spider)
????????????assert?_isiterable(result),?\
????????????????'Middleware?%s?must?returns?an?iterable?object,?got?%s?'?%?\
????????????????(fname(method),?type(result))
????????return?result
????#?執(zhí)行process_spider_input
????dfd?=?mustbe_deferred(process_spider_input,?response)
????#?注冊(cè)異常回調(diào)
????dfd.addErrback(process_spider_exception)
????#?注冊(cè)出口回調(diào)
????dfd.addCallback(process_spider_output)
????return?dfd
回調(diào)爬蟲
call_spider?方法,這里回調(diào)我們寫好的爬蟲類:def?call_spider(self,?result,?request,?spider):
????#?回調(diào)爬蟲模塊
????result.request?=?request
????dfd?=?defer_result(result)
????#?注冊(cè)回調(diào)方法?取得request.callback?如果未定義則調(diào)用爬蟲模塊的parse方法
????dfd.addCallbacks(request.callback?or?spider.parse,?request.errback)
????return?dfd.addCallback(iterate_spider_output)
parse?則是第一個(gè)回調(diào)方法。之后爬蟲類拿到下載結(jié)果,就可以定義下載后的?callback?方法,也是在這里進(jìn)行回調(diào)執(zhí)行的。處理輸出
Scraper?調(diào)用了?handle_spider_output?方法處理爬蟲的輸出結(jié)果:def?handle_spider_output(self,?result,?request,?response,?spider):
????#?處理爬蟲輸出結(jié)果
????if?not?result:
????????return?defer_succeed(None)
????it?=?iter_errback(result,?self.handle_spider_error,?request,?response,?spider)
????#?注冊(cè)_process_spidermw_output
????dfd?=?parallel(it,?self.concurrent_items,
????????self._process_spidermw_output,?request,?response,?spider)
????return?dfd
def?_process_spidermw_output(self,?output,?request,?response,?spider):
????#?處理Spider模塊返回的每一個(gè)Request/Item
????if?isinstance(output,?Request):
????????#?如果結(jié)果是Request?再次入Scheduler的請(qǐng)求隊(duì)列
????????self.crawler.engine.crawl(request=output,?spider=spider)
????elif?isinstance(output,?(BaseItem,?dict)):
????????#?如果結(jié)果是BaseItem/dict
????????self.slot.itemproc_size?+=?1
????????#?調(diào)用Pipeline的process_item
????????dfd?=?self.itemproc.process_item(output,?spider)
????????dfd.addBoth(self._itemproc_finished,?output,?response,?spider)
????????return?dfd
????elif?output?is?None:
????????pass
????else:
????????typename?=?type(output).__name__
????????logger.error('Spider?must?return?Request,?BaseItem,?dict?or?None,?'
?????????????????????'got?%(typename)r?in?%(request)s',
?????????????????????{'request':?request,?'typename':?typename},
?????????????????????extra={'spider':?spider})
Request?或?BaseItem?實(shí)例。Scheduler?進(jìn)入請(qǐng)求隊(duì)列,如果是?BaseItem?實(shí)例,則調(diào)用?Pipeline?管理器,依次執(zhí)行?process_item。我們想輸出結(jié)果時(shí),只需要定義?Pepeline?類,然后重寫這個(gè)方法就可以了。ItemPipeManager?處理邏輯:class?ItemPipelineManager(MiddlewareManager):
????component_name?=?'item?pipeline'
????@classmethod
????def?_get_mwlist_from_settings(cls,?settings):
????????return?build_component_list(settings.getwithbase('ITEM_PIPELINES'))
????def?_add_middleware(self,?pipe):
????????super(ItemPipelineManager,?self)._add_middleware(pipe)
????????if?hasattr(pipe,?'process_item'):
????????????self.methods['process_item'].append(pipe.process_item)
????def?process_item(self,?item,?spider):
????????#?依次調(diào)用Pipeline的process_item
????????return?self._process_chain('process_item',?item,?spider)
ItemPipeManager?也是一個(gè)中間件,和之前下載器中間件管理器和爬蟲中間件管理器類似,如果子類有定義?process_item,則依次執(zhí)行它。_itemproc_finished:def?_itemproc_finished(self,?output,?item,?response,?spider):
????self.slot.itemproc_size?-=?1
????if?isinstance(output,?Failure):
????????ex?=?output.value
????????#?如果在Pipeline處理中拋DropItem異常?忽略處理結(jié)果
????????if?isinstance(ex,?DropItem):
????????????logkws?=?self.logformatter.dropped(item,?ex,?response,?spider)
????????????logger.log(*logformatter_adapter(logkws),?extra={'spider':?spider})
????????????return?self.signals.send_catch_log_deferred(
????????????????signal=signals.item_dropped,?item=item,?response=response,
????????????????spider=spider,?exception=output.value)
????????else:
????????????logger.error('Error?processing?%(item)s',?{'item':?item},
?????????????????????????exc_info=failure_to_exc_info(output),
?????????????????????????extra={'spider':?spider})
????else:
????????logkws?=?self.logformatter.scraped(output,?response,?spider)
????????logger.log(*logformatter_adapter(logkws),?extra={'spider':?spider})
????????return?self.signals.send_catch_log_deferred(
????????????signal=signals.item_scraped,?item=output,?response=response,
????????????spider=spider)
Pipeline?中丟棄某個(gè)結(jié)果,直接拋出?DropItem?異常即可,Scrapy 會(huì)進(jìn)行對(duì)應(yīng)的處理。Request?則會(huì)再次進(jìn)入請(qǐng)求隊(duì)列,等待引擎下一次調(diào)度,也就是再次調(diào)用?ExecutionEngine?的?_next_request,直至請(qǐng)求隊(duì)列沒(méi)有新的任務(wù),整個(gè)程序退出。CrawlerSpider
CrawlerSpider?類,我們平時(shí)用的也比較多,它其實(shí)就是繼承了?Spider?類,然后重寫了?parse?方法(這也是繼承此類不要重寫此方法的原因),并結(jié)合?Rule?規(guī)則類,來(lái)完成?Request?的自動(dòng)提取邏輯。總結(jié)


更多閱讀
特別推薦

點(diǎn)擊下方閱讀原文加入社區(qū)會(huì)員
評(píng)論
圖片
表情
