Spark常见问题

Spark Streaming消费Kafka数据

首先考虑Spark Streaming消费Kafka数据时,如何保证数据不丢失

参考资料:Recent Evolution of Zero Data Loss Guarantee in Spark Streaming With Kafka
Improvements to Kafka integration of Spark Streaming

丢失的情况包括:

  1. Spark Streaming正在读取Kafka的数据,突然挂掉;
  2. Spark Streaming读取完Kafka的数据,在保存offset前(包括保存到Kafka的zookeeper或者某种持久化存储)挂掉。

Spark Streaming从1.3以后提供2种方式消费Kafka数据,一种利用Receiver读取Kafka数据,生成DStream,然后供后续的task进行处理;另一种通过Driver计算每批次要消费的offset范围,由task的Executor通过Kafka简单API直接消费

问题:两种方式产生的DStream中的RDD进行partition的逻辑各是什么?第二种方式Executor的个数是否和Kafka主题的partition数目一致?

下面分别看看两种方式有哪些容错手段:
方式一:利用Receiver
代码如下:

1
2
3
4
5
import org.apache.spark.streaming.kafka._
...
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

首先,保证Receiver的可靠性:
图1. 利用Receiver消费Kafka数据

  1. 首先Receiver从Kafka读取数据
  2. 得到一段数据,生成RDD(?)之后,数据复制到2个Executor
  3. 告知Driver该段Block的所有ID
  4. 更新Zookeeper中的offset信息

Receiver挂掉重启后,可根据Zookeeper中的offset信息继续读取Kafka消息

其次,保证Driver的可靠性:
图1. Driver开启checkpoint

开启checkpoint机制后,driver会把以下信息保存到HDFS等可靠存储上:

  1. 配置信息
  2. 代码
  3. 一些排队等待处理但没有完成的RDD(仅仅是metadata,而不是data)

这样,在Driver挂掉重启后,能根据以上信息重新构造Driver继续运行

但即使做了上述的2项设置,仍然存在丢失数据的可能,主要是以下的场景:
回到图1的场景下

  1. 在第2步,数据正确复制到2个Executor,并缓存在它们的内存中;
  2. Receiver告知Kafka消息已消费(更新了ZK中的offset)
  3. Executor开始处理缓存在内存中的数据
  4. 这时,Driver挂了
  5. 由于Spark的机制,Driver挂了之后,由它启动的所有Executor都要kill掉
  6. 在处理数据的Executor进程被kill掉,缓存的内存自然也没有了,但由于Receiver认为这些数据已经消费了,因此这部分数据无法再恢复。

于是,从Spark1.2开始引入一个WAL(Write ahead log)的东东,简单来说就是在Receiver把数据复制到Executor前,先写到可靠存储上,如图:
图3. 加上WAL的完整处理逻辑

自此,再也不用担心数据会丢失了。

方式二:Direct Stream
图4. 使用Direct Stream

  1. 一开始Driver负责从ZK查询每个topic/partition的最新offset
  2. 计算下个batch的offset范围
  3. 将offset范围交给Executor
  4. Executor利用Kafka的SimpleConsumer API访问Kafka,消费消息
  5. Executor计算成功后通知Driver,Driver会将offset信息保存到checkpoint中

Exactly-once Spark Streaming from Apache Kafka——对内部实现说得比较清楚

BUT,如果要保证数据要且只能消费一次呢?

设想一下如果在Receiver更新ZK之前挂掉了,如下图:
图5. Receiver更新ZK之前挂掉的情况

  1. Kafka的消息已经成功通过WAL写进HDFS
  2. Spark Streaming也已经成功处理完这些数据
  3. Receiver更新ZK的offset之前挂掉!
  4. Receiver重启恢复,从WAL中恢复上次消费的数据
  5. 继续消费Kafka消息,但由于Receiver使用的是Kafka的高级API,会根据ZK中记录的已消费的offset往后继续消费,因此有部分消息会重复消费计算

最后考虑性能

WAL有2个缺点:

  1. 由于从Receiver取到的数据要先持久化,因此会降低消费的吞吐量(一个解决办法就是增加Receiver),代码如下:

    1
    2
    3
    4
    5
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran", /* ignore rest */)
    val numStreams = 5
    val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(..., kafkaParams) }
    val unifiedStream = streamingContext.union(kafkaStreams)
    unifiedStream.print()
  2. 增加存储空间:同一份数据在Kafka有一份,在HDFS也有一份

参考资料1
参考资料2

  1. 创建多个Input DStreams
    1
    2
    3
    4
    5
    val kafkaParams: Map[String, String] = Map("group.id" -> "terran", /* ignore rest */)
    val numStreams = 5
    val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(..., kafkaParams) }
    val unifiedStream = streamingContext.union(kafkaStreams)
    unifiedStream.print()

按以上方式创建的5个Input DStream,都是Kafka消费组terran的成员,它们可以共同消费Kafka特定主题的消息,在KafkaUtils.createStream方法中,可以设置单个Input DStream的消费者线程数

  1. 读取方式
    http://group.jobbole.com/15559/

##
https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/spark_streaming%E4%BD%BF%E7%94%A8kafka%E4%BF%9D%E8%AF%81%E6%95%B0%E6%8D%AE%E9%9B%B6%E4%B8%A2%E5%A4%B1.md

RDD的partition

参考文档