Scrapy是非常有名的一款爬虫开发框架。如果是小型项目或者简单爬取一些数据,可以基于Scrapy迅速搭建一个从爬取到数据解析清晰的全流程方案。Scrapy的核心是基于Twisted的事件驱动。因此整个数据爬取、入栈、出栈、提取和清洗都是基于异步事件的。在研究Scrapy的调度机制之前,首先需要了解Twisted,特别是Twisted的几个非常重要的概念。否则,Scrapy的调度器代码看起来真的是非常吃力,很难理解。
Twisted介绍
Twisted在Python中是一个非常著名的异步事件驱动网络框架。在网上搜了一大圈,发现这篇文章介绍的还是挺不错的《Python Twisted介绍》(建议只看其中的两张图)。另外,比较推荐这本书《Twisted Network Programming Essentials》,里面讲得也比较清楚。想要读懂Scrapy的调度引擎不用把整个Twisted都看完,只需要了解几个关键的概念,特别是Reactor和Deferreds。如果要具体到函数的话,包括LoopingCall、CallLater、以及与Deferred相关的使用方法。
Reactor是什么?
根据官方文档的描述,Reactor是Twisted的核心。基于不同的操作系统,reactor实现的方式是不同的,如下图所示(点击详情)。但是对于我们使用reactor构造上层应用来说,不太需要了解底层的实现细节。
。
Scrapy的核心是基于Twisted事件驱动的定时任务。分别使用Twisted Reactor的LoopingCall和CallLater。
LoopingCall
LoopingCall是使用Reactor定义和执行定时任务的一种方式,可以重复执行一个定时任务。以下一个简单的示例代码,定时每1秒执行一次:
from twisted.internet import task from twisted.internet import reactor def runEverySecond(): print "a second has passed" l = task.LoopingCall(runEverySecond) l.start(1.0) # call every second # l.stop() will stop the looping calls reactor.run()
CallLater
CallLater相对简单,就是定时执行一次。
from twisted.internet import reactor def f(s): print "this will run 3.5 seconds after it was scheduled: %s" % s reactor.callLater(3.5, f, "hello, world") # f() will only be called if the event loop is started. reactor.run()
详情可以参考:
Reactor计划任务
LoopingCall说明文档
CallLater说明文档
deferred是什么?
Deferred对象是Twisted中用来管理回调函数序列的。一个Deferred可以添加若干个回调函数,分为两个回调链,callback
和errback
。当一个异步调用成功以后,可以调用Deferred
对象的callback
函数,同时会按照添加顺序依次调用callback链上注册的所有回调函数。同理,当异步调用发生错误,可以调用Deferred
对象的errback
函数,从而依次调用errback链上注册的所有回调函数。如图所示:
以下两点在Scrapy中出现多次,如果不理解的话,能难看懂Scrapy中整个调用逻辑:
- 如果Deferred的回调链中某一个回调函数返回一个Deferred对象,那么剩下的回调会暂停,等待新的Deferred对象返回结果后继续执行;如文档中描述:
- 一个Deferred对象被链到另一个Deferred对象上,它的所有
callback
和errback
回调链都会等待新的Deferred对象返回结果;详情可以参考Chaining Deferreds。
If a callback (or errback) returns another Deferred, this Deferred will be chained to it (and further callbacks will not run until that Deferred has a result).
基于Twisted定时任务的Scrapy任务调度机制
Scrapy的调度器是engine.py,在爬虫启动过程中会调用open_spider
函数。样例代码如下(具体可以看engine.py源代码):
该函数主要做以下几件事:
- 定义nextcall,这是调度器定时执行的任务。该任务是一个CallLaterOnce对象,该对象的
schedule
方法会初始化一个callLater定时任务。该方法保证了在任何时间_next_request
只会被计划执行一次。
def schedule(self, delay=0): from twisted.internet import reactor if self._call is None: self._call = reactor.callLater(delay, self)
class Slot: def __init__(self, start_requests, close_if_idle, nextcall, scheduler): self.closing = False self.inprogress = set() # requests in progress self.start_requests = iter(start_requests) self.close_if_idle = close_if_idle self.nextcall = nextcall self.scheduler = scheduler self.heartbeat = task.LoopingCall(nextcall.schedule)
到目前位置,Scrapy的调度器已经完成了第一阶段定时任务的初始化。理论上task.LoopingCall
每5秒钟会调用一次nextcall
,由nextcall.schedule()在创建一个reactor.callLater的定时任务,若干秒(由delay决定,默认为0)后执行_next_request。在_next_request中会下载队列中的任务。这里有一个重要的知识点,Scrapy使用twister的异步下载,因此调用下载任务返回一个deferred对象。
基于Deferred对象异步回调的Scrapy任务调度机制
Scrapy下载页面内容是异步处理的。因此调用_download
方法以后会返回一个deferred
对象。并且为该deferred
对象添加各种callback和errback回调函数。当该下载任务成功完成或者报错会有相对应的处理。下面让我们来看看默认的Scrapy调度器为deferred
对象添加哪些回调:
上图右下部分列出了所有的callback和errback回调函数。看起来非常简单,实际上callback回调函数_handle_downloader_output
返回了一个新的deferred
对象。而这个deferred
对象是调用了scraper.py
里面的enqueue_scrape
方法返回的。因此后面的回调函数都要等这个新的deferred
对象返回执行结果。
新的deferred
对象经过多个方法调用的层层穿透,最后是在call_spider
方法中创建并返回的。然而这个最新的deferred
对象并没有真正做任何异步任务,只是使用reactor.callLater
创建一个100毫秒后调用callback
或者errback
的定时任务。因此,下载完成后的整个callback
调用链的目的是:下载完成以后等待100毫秒调用spider的callback(默认callback是spider._parse)解析下载的内容。
可能有朋友会问最后那个deferred
对象的设计目的是什么?直接从回调函数_handle_downloader_output
同步调用spider进行结果处理不好么?我的理解是这个设计可能是为了在spider中实现异步数据处理,例如把数据通过网络保存到数据库中。

扫码联系船长