Scrapy Cluster中Kafka Topic详解

之前我们已经介绍过了Scrapy Cluster中有三大组件,Kafka,Redis和Scrapy spider。Kafka是一种高吞吐量的分布式发布订阅消息系统。每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。如果使用默认的配置,在Scrapy Cluster中Kafka有3个基本的topic:

  • demo.incoming
  • demo.crawled_firehose
  • demo.outbound_firehose

我们在scrapy-cluser-1.2.1/kafka-monitor下打开Git Bash输入:

python kafkadump.py list

该命令行能够返回kafka系统中的Topic列表。

2019-06-10 11:14:58,785 [kafkadump] INFO: Connected to test:9092
Topics:
- demo.outbound_firehose
- demo.incoming
- demo.crawled_firehose
- __consumer_offsets

就是我们说的3个基本topic(__consumer_offsets会在文末作出注释)。下面我们就来详细介绍一下这3个topic。

demo.incoming

官方文档中有这样一句话:

All incoming requests to the cluster go through the demo.incoming kafka topic.

意思是所有传入爬虫集群的请求都会进入名为demo.incoming的topic。更通俗的讲,所有我们要feed给Scrapy Cluster的消息,无论是通过命令行还是其它方式发送,都会进入demo.incoming。此topic接收合法的JSON请求,并由Kafka Monitor来消费。详情见:详解Scrapy Cluster中Kafka与Redis的消息生产和消费

demo.crawled_firehose

在Scrapy Cluster中又一个KafkaPipeline,默认情况下,由Crawlers/Spiders爬取的所有数据最终都会推送到名为demo.crawled_firehose的topic中。感兴趣的朋友可以看一下code>pipelines.py文件中KafkaPipeline类的process_item()方法。

    def process_item(self, item, spider):
        try:
            self.logger.debug("Processing item in KafkaPipeline")
            datum = dict(item)
            datum["timestamp"] = self._get_time()
            prefix = self.topic_prefix

            try:
                if self.use_base64:
                    datum['body'] = base64.b64encode(bytes(datum['body'], 'utf-8'))
                message = ujson.dumps(datum, sort_keys=True)
            except:
                message = 'json failed to parse'

            firehose_topic = "{prefix}.crawled_firehose".format(prefix=prefix)
            future = self.producer.send(firehose_topic, message)
            future.add_callback(self._kafka_success, datum, spider)
            future.add_errback(self._kafka_failure, datum, spider)

            if self.appid_topics:
                appid_topic = "{prefix}.crawled_{appid}".format(
                        prefix=prefix, appid=datum["appid"])
                future2 = self.producer.send(appid_topic, message)
                future2.add_callback(self._kafka_success, datum, spider)
                future2.add_errback(self._kafka_failure, datum, spider)

        except KafkaTimeoutError:
            self.logger.warning("Caught KafkaTimeoutError exception")

        return item

代码中消息会推送到"{prefix}.crawled_firehose"的topic中,其中prefix部分的内容由settings.py中设定(可以在localsettings.py中覆盖),默认为demo。也即demo.crawled_firehose

demo.outbound_firehose

所有的Action,Stop,Expire和Statistics的请求结果都会被推送到名为demo.outbound_firehose的topic中。此类消息由Redis Monitor的插件kafka_base_monitor.py中的KafkaBaseMonitor生产。该类为:

  • expire_monitor.py
  • info_monitor.py
  • stats_monitor.py
  • stop_monitor.py
  • zookeeper_monitor.py

以上几个类的父类,也就是这些类都会调用KafkaBaseMonitor中的_send_to_kafka()方法向Kafka生产消息。以下是KafkaBaseMonitor类中的一段代码:

    def _send_to_kafka(self, master):
        '''
        Sends the message back to Kafka
        @param master: the final dict to send
        @returns: True if successfully sent to kafka
        '''
        appid_topic = "{prefix}.outbound_{appid}".format(
                                                    prefix=self.topic_prefix,
                                                    appid=master['appid'])
        firehose_topic = "{prefix}.outbound_firehose".format(
                                                    prefix=self.topic_prefix)
        try:
            # dont want logger in outbound kafka message
            if self.use_appid_topics:
                f1 = self.producer.send(appid_topic, master)
                f1.add_callback(self._kafka_success)
                f1.add_errback(self._kafka_failure)
            f2 = self.producer.send(firehose_topic, master)
            f2.add_callback(self._kafka_success)
            f2.add_errback(self._kafka_failure)

            return True
        except Exception as ex:
            message = "An exception '{0}' occured while sending a message " \
                "to kafka. Arguments:\n{1!r}" \
                .format(type(ex).__name__, ex.args)
            self.logger.error(message)

        return False

可以看出由KafkaBaseMonitor生产的消息目标topic为"{prefix}.outbound_firehose",其中prefix部分的内容由settings.py中设定,默认为demo。也即demo.outbound_firehose。

__consumer_offsets

此topic是Kafka中的一个默认的内部topic,由Kafka自动创建的,用户一般感觉不到这个topic,其中保存的是Kafka新版本consumer的位移信息。其中保存三类消息,分别是:

  • Consumer group组元数据消息
  • Consumer group位移消息
  • Tombstone消息

注意

本文着重介绍的为Scrapy Cluster中Kafka的3个基本的topic,如果为Redis Monitor和Crawler配置了应用程序ID特殊topic,还可以有如下topic:

  • demo.crawled_
  • demo.outbound_

以上所有内容都是在生产环境下的topic,如果是在线集成测试,产生的则是虚拟Kafka topic,以免干扰生产环境的topic。demo.incoming,demo.crawled_firehose和demo.outbound_firehose分别对应测试环境下的demo.incoming_test,demo_test.crawled_firehose和demo_test.outbound_firehose。

Captain QR Code

扫码联系船长

发表回复

您的电子邮箱地址不会被公开。