基于Twisted的Scrapy异步调度器

Scrapy是非常有名的一款爬虫开发框架。如果是小型项目或者简单爬取一些数据,可以基于Scrapy迅速搭建一个从爬取到数据解析清晰的全流程方案。Scrapy的核心是基于Twisted的事件驱动。因此整个数据爬取、入栈、出栈、提取和清洗都是基于异步事件的。在研究Scrapy的调度机制之前,首先需要了解Twisted,特别是Twisted的几个非常重要的概念。否则,Scrapy的调度器代码看起来真的是非常吃力,很难理解。

Twisted介绍

Twisted在Python中是一个非常著名的异步事件驱动网络框架。在网上搜了一大圈,发现这篇文章介绍的还是挺不错的《Python Twisted介绍》(建议只看其中的两张图)。另外,比较推荐这本书《Twisted Network Programming Essentials》,里面讲得也比较清楚。想要读懂Scrapy的调度引擎不用把整个Twisted都看完,只需要了解几个关键的概念,特别是Reactor和Deferreds。如果要具体到函数的话,包括LoopingCall、CallLater、以及与Deferred相关的使用方法。

Reactor是什么?

根据官方文档的描述,Reactor是Twisted的核心。基于不同的操作系统,reactor实现的方式是不同的,如下图所示(点击详情)。但是对于我们使用reactor构造上层应用来说,不太需要了解底层的实现细节。
reactor functionality

Scrapy的核心是基于Twisted事件驱动的定时任务。分别使用Twisted Reactor的LoopingCall和CallLater。

LoopingCall
LoopingCall是使用Reactor定义和执行定时任务的一种方式,可以重复执行一个定时任务。以下一个简单的示例代码,定时每1秒执行一次:

from twisted.internet import task
from twisted.internet import reactor

def runEverySecond():
    print "a second has passed"

l = task.LoopingCall(runEverySecond)
l.start(1.0) # call every second

# l.stop() will stop the looping calls
reactor.run()

CallLater
CallLater相对简单,就是定时执行一次。

from twisted.internet import reactor

def f(s):
    print "this will run 3.5 seconds after it was scheduled: %s" % s

reactor.callLater(3.5, f, "hello, world")

# f() will only be called if the event loop is started.
reactor.run()

详情可以参考:
Reactor计划任务
LoopingCall说明文档
CallLater说明文档

deferred是什么?

Deferred对象是Twisted中用来管理回调函数序列的。一个Deferred可以添加若干个回调函数,分为两个回调链,callbackerrback。当一个异步调用成功以后,可以调用Deferred对象的callback函数,同时会按照添加顺序依次调用callback链上注册的所有回调函数。同理,当异步调用发生错误,可以调用Deferred对象的errback函数,从而依次调用errback链上注册的所有回调函数。如图所示:
deferred-process说明

以下两点在Scrapy中出现多次,如果不理解的话,能难看懂Scrapy中整个调用逻辑:

  • 如果Deferred的回调链中某一个回调函数返回一个Deferred对象,那么剩下的回调会暂停,等待新的Deferred对象返回结果后继续执行;如文档中描述:
  • If a callback (or errback) returns another Deferred, this Deferred will be chained to it (and further callbacks will not run until that Deferred has a result).

  • 一个Deferred对象被链到另一个Deferred对象上,它的所有callbackerrback回调链都会等待新的Deferred对象返回结果;详情可以参考Chaining Deferreds

基于Twisted定时任务的Scrapy任务调度机制

Scrapy的调度器是engine.py,在爬虫启动过程中会调用open_spider函数。样例代码如下(具体可以看engine.py源代码):
open_spider

该函数主要做以下几件事:

  • 定义nextcall,这是调度器定时执行的任务。该任务是一个CallLaterOnce对象,该对象的schedule方法会初始化一个callLater定时任务。该方法保证了在任何时间_next_request只会被计划执行一次。
  •     def schedule(self, delay=0):
            from twisted.internet import reactor
            if self._call is None:
                self._call = reactor.callLater(delay, self)
    
  • 初始化爬虫等变量
  • 初始化Slot
  • class Slot:
        def __init__(self, start_requests, close_if_idle, nextcall, scheduler):
            self.closing = False
            self.inprogress = set()  # requests in progress
            self.start_requests = iter(start_requests)
            self.close_if_idle = close_if_idle
            self.nextcall = nextcall
            self.scheduler = scheduler
            self.heartbeat = task.LoopingCall(nextcall.schedule)
    
  • 调用Slot中nextcall的schedule(),注册一个定时任务。
  • 调用Slot中heartbeat,启动循环定时任务(task.LoopingCall),这里可以看到是每5秒钟激活一次

到目前位置,Scrapy的调度器已经完成了第一阶段定时任务的初始化。理论上task.LoopingCall每5秒钟会调用一次nextcall,由nextcall.schedule()在创建一个reactor.callLater的定时任务,若干秒(由delay决定,默认为0)后执行_next_request。在_next_request中会下载队列中的任务。这里有一个重要的知识点,Scrapy使用twister的异步下载,因此调用下载任务返回一个deferred对象。
scrapy定时调度

基于Deferred对象异步回调的Scrapy任务调度机制

Scrapy下载页面内容是异步处理的。因此调用_download方法以后会返回一个deferred对象。并且为该deferred对象添加各种callback和errback回调函数。当该下载任务成功完成或者报错会有相对应的处理。下面让我们来看看默认的Scrapy调度器为deferred对象添加哪些回调:
deferred对象callback和errback队列

上图右下部分列出了所有的callback和errback回调函数。看起来非常简单,实际上callback回调函数_handle_downloader_output返回了一个新的deferred对象。而这个deferred对象是调用了scraper.py里面的enqueue_scrape方法返回的。因此后面的回调函数都要等这个新的deferred对象返回执行结果。

新的deferred对象经过多个方法调用的层层穿透,最后是在call_spider方法中创建并返回的。然而这个最新的deferred对象并没有真正做任何异步任务,只是使用reactor.callLater创建一个100毫秒后调用callback或者errback的定时任务。因此,下载完成后的整个callback调用链的目的是:下载完成以后等待100毫秒调用spider的callback(默认callback是spider._parse)解析下载的内容。

Scraper中deferred对象调用链

可能有朋友会问最后那个deferred对象的设计目的是什么?直接从回调函数_handle_downloader_output同步调用spider进行结果处理不好么?我的理解是这个设计可能是为了在spider中实现异步数据处理,例如把数据通过网络保存到数据库中。

Captain QR Code

扫码联系船长

发表回复

您的电子邮箱地址不会被公开。