之前我们已经介绍过了Scrapy Cluster中有三大组件,Kafka,Redis和Scrapy spider。Kafka是一种高吞吐量的分布式发布订阅消息系统。每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。如果使用默认的配置,在Scrapy Cluster中Kafka有3个基本的topic:
- demo.incoming
- demo.crawled_firehose
- demo.outbound_firehose
我们在scrapy-cluser-1.2.1/kafka-monitor
下打开Git Bash输入:
python kafkadump.py list
该命令行能够返回kafka系统中的Topic列表。
2019-06-10 11:14:58,785 [kafkadump] INFO: Connected to test:9092 Topics: - demo.outbound_firehose - demo.incoming - demo.crawled_firehose - __consumer_offsets
就是我们说的3个基本topic(__consumer_offsets
会在文末作出注释)。下面我们就来详细介绍一下这3个topic。
demo.incoming
官方文档中有这样一句话:
All incoming requests to the cluster go through the demo.incoming kafka topic.
意思是所有传入爬虫集群的请求都会进入名为demo.incoming的topic。更通俗的讲,所有我们要feed给Scrapy Cluster的消息,无论是通过命令行还是其它方式发送,都会进入demo.incoming。此topic接收合法的JSON请求,并由Kafka Monitor来消费。详情见:详解Scrapy Cluster中Kafka与Redis的消息生产和消费
demo.crawled_firehose
在Scrapy Cluster中又一个KafkaPipeline,默认情况下,由Crawlers/Spiders爬取的所有数据最终都会推送到名为demo.crawled_firehose
的topic中。感兴趣的朋友可以看一下code>pipelines.py文件中KafkaPipeline
类的process_item()
方法。
def process_item(self, item, spider): try: self.logger.debug("Processing item in KafkaPipeline") datum = dict(item) datum["timestamp"] = self._get_time() prefix = self.topic_prefix try: if self.use_base64: datum['body'] = base64.b64encode(bytes(datum['body'], 'utf-8')) message = ujson.dumps(datum, sort_keys=True) except: message = 'json failed to parse' firehose_topic = "{prefix}.crawled_firehose".format(prefix=prefix) future = self.producer.send(firehose_topic, message) future.add_callback(self._kafka_success, datum, spider) future.add_errback(self._kafka_failure, datum, spider) if self.appid_topics: appid_topic = "{prefix}.crawled_{appid}".format( prefix=prefix, appid=datum["appid"]) future2 = self.producer.send(appid_topic, message) future2.add_callback(self._kafka_success, datum, spider) future2.add_errback(self._kafka_failure, datum, spider) except KafkaTimeoutError: self.logger.warning("Caught KafkaTimeoutError exception") return item
代码中消息会推送到"{prefix}.crawled_firehose"
的topic中,其中prefix
部分的内容由settings.py
中设定(可以在localsettings.py中覆盖),默认为demo。也即demo.crawled_firehose
。
demo.outbound_firehose
所有的Action,Stop,Expire和Statistics的请求结果都会被推送到名为demo.outbound_firehose的topic中。此类消息由Redis Monitor的插件kafka_base_monitor.py
中的KafkaBaseMonitor
生产。该类为:
expire_monitor.py
info_monitor.py
stats_monitor.py
stop_monitor.py
zookeeper_monitor.py
以上几个类的父类,也就是这些类都会调用KafkaBaseMonitor
中的_send_to_kafka()
方法向Kafka生产消息。以下是KafkaBaseMonitor
类中的一段代码:
def _send_to_kafka(self, master): ''' Sends the message back to Kafka @param master: the final dict to send @returns: True if successfully sent to kafka ''' appid_topic = "{prefix}.outbound_{appid}".format( prefix=self.topic_prefix, appid=master['appid']) firehose_topic = "{prefix}.outbound_firehose".format( prefix=self.topic_prefix) try: # dont want logger in outbound kafka message if self.use_appid_topics: f1 = self.producer.send(appid_topic, master) f1.add_callback(self._kafka_success) f1.add_errback(self._kafka_failure) f2 = self.producer.send(firehose_topic, master) f2.add_callback(self._kafka_success) f2.add_errback(self._kafka_failure) return True except Exception as ex: message = "An exception '{0}' occured while sending a message " \ "to kafka. Arguments:\n{1!r}" \ .format(type(ex).__name__, ex.args) self.logger.error(message) return False
可以看出由KafkaBaseMonitor
生产的消息目标topic为"{prefix}.outbound_firehose"
,其中prefix部分的内容由settings.py中设定,默认为demo。也即demo.outbound_firehose。
__consumer_offsets
此topic是Kafka中的一个默认的内部topic,由Kafka自动创建的,用户一般感觉不到这个topic,其中保存的是Kafka新版本consumer的位移信息。其中保存三类消息,分别是:
- Consumer group组元数据消息
- Consumer group位移消息
- Tombstone消息
注意
本文着重介绍的为Scrapy Cluster中Kafka的3个基本的topic,如果为Redis Monitor和Crawler配置了应用程序ID特殊topic,还可以有如下topic:
- demo.crawled_
- demo.outbound_
以上所有内容都是在生产环境下的topic,如果是在线集成测试,产生的则是虚拟Kafka topic,以免干扰生产环境的topic。demo.incoming,demo.crawled_firehose和demo.outbound_firehose分别对应测试环境下的demo.incoming_test,demo_test.crawled_firehose和demo_test.outbound_firehose。
扫码联系船长