转载请注明原创地址
最近由于使用sparkstreaming的同学需要对接到部门内部的的kafka集群,由于官方的spark-streaming-kafka包和现有公司的kafka集群权限系统无法对接,需要研究下spark-streaming-kafka包原有代码以便改造,本文研究的代码版本为spark在github的tag的v1.6.1版本。
官方给出的JavaKafkaWordCount以及KafkaWordCount代码里产生kafka-streaming消费流数据的调用代码分别如下
1 2 3 4 5 | JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[ 0 ], args[ 1 ], topicMap); val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map( _ . _ 2 ) |
可以看到无论是java还是scala调用的都是KafkaUtils内重载实现的createStream方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | object KafkaUtils { /** * Create an input stream that pulls messages from Kafka Brokers. * ssc StreamingContext object * zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * groupId The group id for this consumer * topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread * storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @return DStream of (Kafka message key, Kafka message value) */ def createStream( ssc : StreamingContext, zkQuorum : String, groupId : String, topics : Map[String, Int], storageLevel : StorageLevel = StorageLevel.MEMORY _ AND _ DISK _ SER _ 2 ) : ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000" ) createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, storageLevel) } /** * Create an input stream that pulls messages from Kafka Brokers. * @param ssc StreamingContext object * @param kafkaParams Map of kafka configuration parameters, * see http://kafka.apache.org/08/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel Storage level to use for storing the received objects * @tparam K type of Kafka message key * @tparam V type of Kafka message value * @tparam U type of Kafka message key decoder * @tparam T type of Kafka message value decoder * @return DStream of (Kafka message key, Kafka message value) */ def createStream[K : ClassTag, V : ClassTag, U < : Decoder[ _ ] : ClassTag, T < : Decoder[ _ ] : ClassTag]( ssc : StreamingContext, kafkaParams : Map[String, String], topics : Map[String, Int], storageLevel : StorageLevel ) : ReceiverInputDStream[(K, V)] = { val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) } /** * Create an input stream that pulls messages from Kafka Brokers. * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread * @return DStream of (Kafka message key, Kafka message value) */ def createStream( jssc : JavaStreamingContext, zkQuorum : String, groupId : String, topics : JMap[String, JInt] ) : JavaPairReceiverInputDStream[String, String] = { createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues( _ .intValue()).toSeq : _ *)) } /** * Create an input stream that pulls messages from Kafka Brokers. * @param jssc JavaStreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel RDD storage level. * @return DStream of (Kafka message key, Kafka message value) */ def createStream( jssc : JavaStreamingContext, zkQuorum : String, groupId : String, topics : JMap[String, JInt], storageLevel : StorageLevel ) : JavaPairReceiverInputDStream[String, String] = { createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues( _ .intValue()).toSeq : _ *), storageLevel) } /** * Create an input stream that pulls messages from Kafka Brokers. * @param jssc JavaStreamingContext object * @param keyTypeClass Key type of DStream * @param valueTypeClass value type of Dstream * @param keyDecoderClass Type of kafka key decoder * @param valueDecoderClass Type of kafka value decoder * @param kafkaParams Map of kafka configuration parameters, * see http://kafka.apache.org/08/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread * @param storageLevel RDD storage level. * @tparam K type of Kafka message key * @tparam V type of Kafka message value * @tparam U type of Kafka message key decoder * @tparam T type of Kafka message value decoder * @return DStream of (Kafka message key, Kafka message value) */ def createStream[K, V, U < : Decoder[ _ ], T < : Decoder[ _ ]]( jssc : JavaStreamingContext, keyTypeClass : Class[K], valueTypeClass : Class[V], keyDecoderClass : Class[U], valueDecoderClass : Class[T], kafkaParams : JMap[String, String], topics : JMap[String, JInt], storageLevel : StorageLevel ) : JavaPairReceiverInputDStream[K, V] = { implicit val keyCmt : ClassTag[K] = ClassTag(keyTypeClass) implicit val valueCmt : ClassTag[V] = ClassTag(valueTypeClass) implicit val keyCmd : ClassTag[U] = ClassTag(keyDecoderClass) implicit val valueCmd : ClassTag[T] = ClassTag(valueDecoderClass) createStream[K, V, U, T]( jssc.ssc, kafkaParams.asScala.toMap, Map(topics.asScala.mapValues( _ .intValue()).toSeq : _ *), storageLevel) } |
其中java相关的第三个和第四个createStream调用了第一个createStream,而第一个createStream最后调用的是第二个createStream,所以所有的rdd数据流都是从下面这句代码产生的:
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
查看KafkaInputDStream类定义,发现获取receiver有两种类型:KafkaReceiver和ReliableKafkaReceiver。
1 2 3 4 5 6 7 | def getReceiver() : Receiver[(K, V)] = { if (!useReliableReceiver) { new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } else { new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } } |
其中,KafkaReceiver实现比较简单,调用的是kafka的high level api产生数据流,产生的每个线程的数据流都被放到一个线程池由单独的线程来消费
val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder)
ReliableKafkaReceiver是结合了spark的预写日志(Write Ahead Logs)功能,开启这个功能需要设置sparkconf属性 spark.streaming.receiver.writeAheadLog.enable为真(默认值是假)
这个receiver会把收到的kafka数据首先存储到日志上,然后才会向kafka提交offset,这样保证了在driver程序出现问题的时候不会丢失kafka数据。
参考文章