博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark-streaming-kafka包源码分析
阅读量:6372 次
发布时间:2019-06-23

本文共 7016 字,大约阅读时间需要 23 分钟。

hot3.png

spark-streaming-kafka包源码分析 博客分类: spark

转载请注明原创地址    

 

    最近由于使用sparkstreaming的同学需要对接到部门内部的的kafka集群,由于官方的spark-streaming-kafka包和现有公司的kafka集群权限系统无法对接,需要研究下spark-streaming-kafka包原有代码以便改造,本文研究的代码版本为spark在github的tag的v1.6.1版本。

     官方给出的JavaKafkaWordCount以及KafkaWordCount代码里产生kafka-streaming消费流数据的调用代码分别如下

1
2
3
4
5
JavaPairReceiverInputDStream<String, String> messages 
=
           
KafkaUtils.createStream(jssc, args[
0
], args[
1
], topicMap);
 
 
val 
lines 
= 
KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(
_
.
_
2
)

  

可以看到无论是java还是scala调用的都是KafkaUtils内重载实现的createStream方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
object 
KafkaUtils {
  
/**
   
* Create an input stream that pulls messages from Kafka Brokers.
   
* ssc       StreamingContext object
   
* zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   
* groupId   The group id for this consumer
   
* topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   
*                  in its own thread
   
* storageLevel  Storage level to use for storing the received objects
   
*                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
   
* @return DStream of (Kafka message key, Kafka message value)
   
*/
  
def 
createStream(
      
ssc
: 
StreamingContext,
      
zkQuorum
: 
String,
      
groupId
: 
String,
      
topics
: 
Map[String, Int],
      
storageLevel
: 
StorageLevel 
= 
StorageLevel.MEMORY
_
AND
_
DISK
_
SER
_
2
    
)
: 
ReceiverInputDStream[(String, String)] 
= 
{
    
val 
kafkaParams 
= 
Map[String, String](
      
"zookeeper.connect" 
-> zkQuorum, 
"group.id" 
-> groupId,
      
"zookeeper.connection.timeout.ms" 
-> 
"10000"
)
    
createStream[String, String, StringDecoder, StringDecoder](
      
ssc, kafkaParams, topics, storageLevel)
  
}
 
  
/**
   
* Create an input stream that pulls messages from Kafka Brokers.
   
* @param ssc         StreamingContext object
   
* @param kafkaParams Map of kafka configuration parameters,
   
*                    see http://kafka.apache.org/08/configuration.html
   
* @param topics      Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   
*                    in its own thread.
   
* @param storageLevel Storage level to use for storing the received objects
   
* @tparam K type of Kafka message key
   
* @tparam V type of Kafka message value
   
* @tparam U type of Kafka message key decoder
   
* @tparam T type of Kafka message value decoder
   
* @return DStream of (Kafka message key, Kafka message value)
   
*/
  
def 
createStream[K
: 
ClassTag, V
: 
ClassTag, U <
: 
Decoder[
_
]
: 
ClassTag, T <
: 
Decoder[
_
]
: 
ClassTag](
      
ssc
: 
StreamingContext,
      
kafkaParams
: 
Map[String, String],
      
topics
: 
Map[String, Int],
      
storageLevel
: 
StorageLevel
    
)
: 
ReceiverInputDStream[(K, V)] 
= 
{
    
val 
walEnabled 
= 
WriteAheadLogUtils.enableReceiverLog(ssc.conf)
    
new 
KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
  
}
 
  
/**
   
* Create an input stream that pulls messages from Kafka Brokers.
   
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
   
* @param jssc      JavaStreamingContext object
   
* @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)
   
* @param groupId   The group id for this consumer
   
* @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   
*                  in its own thread
   
* @return DStream of (Kafka message key, Kafka message value)
   
*/
  
def 
createStream(
      
jssc
: 
JavaStreamingContext,
      
zkQuorum
: 
String,
      
groupId
: 
String,
      
topics
: 
JMap[String, JInt]
    
)
: 
JavaPairReceiverInputDStream[String, String] 
= 
{
    
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(
_
.intValue()).toSeq
: 
_
*))
  
}
 
  
/**
   
* Create an input stream that pulls messages from Kafka Brokers.
   
* @param jssc      JavaStreamingContext object
   
* @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..).
   
* @param groupId   The group id for this consumer.
   
* @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   
*                  in its own thread.
   
* @param storageLevel RDD storage level.
   
* @return DStream of (Kafka message key, Kafka message value)
   
*/
  
def 
createStream(
      
jssc
: 
JavaStreamingContext,
      
zkQuorum
: 
String,
      
groupId
: 
String,
      
topics
: 
JMap[String, JInt],
      
storageLevel
: 
StorageLevel
    
)
: 
JavaPairReceiverInputDStream[String, String] 
= 
{
    
createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(
_
.intValue()).toSeq
: 
_
*),
      
storageLevel)
  
}
 
  
/**
   
* Create an input stream that pulls messages from Kafka Brokers.
   
* @param jssc      JavaStreamingContext object
   
* @param keyTypeClass Key type of DStream
   
* @param valueTypeClass value type of Dstream
   
* @param keyDecoderClass Type of kafka key decoder
   
* @param valueDecoderClass Type of kafka value decoder
   
* @param kafkaParams Map of kafka configuration parameters,
   
*                    see http://kafka.apache.org/08/configuration.html
   
* @param topics  Map of (topic_name -> numPartitions) to consume. Each partition is consumed
   
*                in its own thread
   
* @param storageLevel RDD storage level.
   
* @tparam K type of Kafka message key
   
* @tparam V type of Kafka message value
   
* @tparam U type of Kafka message key decoder
   
* @tparam T type of Kafka message value decoder
   
* @return DStream of (Kafka message key, Kafka message value)
   
*/
  
def 
createStream[K, V, U <
: 
Decoder[
_
], T <
: 
Decoder[
_
]](
      
jssc
: 
JavaStreamingContext,
      
keyTypeClass
: 
Class[K],
      
valueTypeClass
: 
Class[V],
      
keyDecoderClass
: 
Class[U],
      
valueDecoderClass
: 
Class[T],
      
kafkaParams
: 
JMap[String, String],
      
topics
: 
JMap[String, JInt],
      
storageLevel
: 
StorageLevel
    
)
: 
JavaPairReceiverInputDStream[K, V] 
= 
{
    
implicit 
val 
keyCmt
: 
ClassTag[K] 
= 
ClassTag(keyTypeClass)
    
implicit 
val 
valueCmt
: 
ClassTag[V] 
= 
ClassTag(valueTypeClass)
 
    
implicit 
val 
keyCmd
: 
ClassTag[U] 
= 
ClassTag(keyDecoderClass)
    
implicit 
val 
valueCmd
: 
ClassTag[T] 
= 
ClassTag(valueDecoderClass)
 
    
createStream[K, V, U, T](
      
jssc.ssc,
      
kafkaParams.asScala.toMap,
      
Map(topics.asScala.mapValues(
_
.intValue()).toSeq
: 
_
*),
      
storageLevel)
  
}

 

其中java相关的第三个和第四个createStream调用了第一个createStream,而第一个createStream最后调用的是第二个createStream,所以所有的rdd数据流都是从下面这句代码产生的:

new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)

查看KafkaInputDStream类定义,发现获取receiver有两种类型:KafkaReceiver和ReliableKafkaReceiver。

1
2
3
4
5
6
7
def 
getReceiver()
: 
Receiver[(K, V)] 
= 
{
  
if 
(!useReliableReceiver) {
    
new 
KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
  
else 
{
    
new 
ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
  
}
}

其中,KafkaReceiver实现比较简单,调用的是kafka的high level api产生数据流,产生的每个线程的数据流都被放到一个线程池由单独的线程来消费

val topicMessageStreams = consumerConnector.createMessageStreams(  topics, keyDecoder, valueDecoder)

 ReliableKafkaReceiver是结合了spark的预写日志(Write Ahead Logs)功能,开启这个功能需要设置sparkconf属性 spark.streaming.receiver.writeAheadLog.enable为真(默认值是假)

这个receiver会把收到的kafka数据首先存储到日志上,然后才会向kafka提交offset,这样保证了在driver程序出现问题的时候不会丢失kafka数据。

 

 

参考文章 

转载于:https://my.oschina.net/xiaominmin/blog/1598402

你可能感兴趣的文章
Xshell用鼠标选中一段文字后自动换行的问题
查看>>
英特尔与Voke合作,用VR直播纽约时装周
查看>>
第135天:移动端开发经验总结
查看>>
Dozer 使用小结
查看>>
第153天:关于HTML标签嵌套的问题详解
查看>>
可通过区块链实现去中心化的三大网络巨头
查看>>
vue-element-admin 4.0.1 发布,后台集成方案
查看>>
一位耶鲁教授,在和大公司比谁最快造出第一台量子计算机
查看>>
TMS云应邀参加第六届西部国际物流博览会
查看>>
(四)开源IT资产管理系统-->部署GLPI与OCS数据同步
查看>>
Hyper-V Server 第二代虚拟机
查看>>
Oracle Sys用户用默认密码change_on_install 无法登录的问题(错误代码:ORA-28009)
查看>>
WDS--部署服务器
查看>>
springMVC3学习--ModelAndView对象(转)
查看>>
Linux服务器的架设 (DHCP,NIS,SAMBA,FTP,DNS)
查看>>
利用Windows Server Backup备份数据进行域控裸机还原
查看>>
VTP协议与三层交换
查看>>
hadoop常用服务管理命令
查看>>
自定义nginx版本号
查看>>
古典加密算法-----置换密码
查看>>