求助,关于利用SSL进行Kafkadnf尚未进行安全认证证的一些问题

Storm Kafka Integration (0.10.x+)官方文档翻译:storm与kafka整合 - jeanFlower的博客 - CSDN博客
Storm Kafka Integration (0.10.x+)官方文档翻译:storm与kafka整合
Storm Kafka Integration (0.10.x+)
Apache Kafka版本0.10以上
向kafka写数据作为拓扑的一部分
你可以创建一个org.apache.storm.kafka.bolt.KafkaBolt的实例,并将其作为一个组件添加到你的拓扑上,或者如果你正在使用trident你可以使用
org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory
和org.apache.storm.kafka.trident.TridentKafkaUpdater.
你需要实现下面两个接口:
TupleToKafkaMapper 和 TridentTupleToKafkaMapper
这些接口有两个方法定义:
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
顾名思义,这些方法被调用映射一个tuple到一个kafka key和kafka message。
如果你只需要一个字段作为key,一个字段作为value,那么你可以使用提供的FieldNameBasedTupleToKafkaMapper.java实现。
在KafkaBolt里,如果使用默认构造函数构造FieldNameBasedTupleToKafkaMapper,则实现始终会查找具有字段名称“key”和“message”的字段,以实现向后兼容性的原因。
或者,您也可以使用非默认构造函数指定不同的key和message字段。
在TridentKafkaState中,您必须指定key和message的字段名称,因为没有默认构造函数。
在构造FieldNameBasedTupleToKafkaMapper的实例时应指定这些。
KafkaTopicSelector 和trident KafkaTopicSelector
这个接口只有一个方法
public interface KafkaTopicSelector {
String getTopics(Tuple/TridentTuple tuple);
这个接口的实现应该返回要发布tuple的key/message映射的主题。您可以返回一个null,该消息将被忽略。如果你有一个静态主题名称,那么你可以
使用DefaultTopicSelector.java并在构造函数中设置主题的名称。
FieldNameTopicSelector 和 FieldIndexTopicSelector可以被使用来选择一个topic应该去发布一个tuple到哪。(select the topic should to publish a tuple to.)
用户只需要在tuple本身中指定topic名称的字段名称或字段索引。
当topic名称未找到时,Field * TopicSelector将会将消息写入默认topic。请保证默认的topic已经被创建。
指定Kafka生产者属性
您可以通过调用KafkaBolt.withProducerProperties()和TridentKafkaStateFactory.withProducerProperties()来提供Storm拓扑中的所有生产者属性。
生产者的重要的配置属性包括:
metadata.broker.list
request.required.acks
producer.type
serializer.class
这些也被定义在org.apache.kafka.clients.producer.ProducerConfig里
使用通配符topic匹配(Using wildcard kafka topic match)
您可以通过添加以下配置来进行通配符主题匹配
Config config = new Config(); config.put("kafka.topic.wildcard.match",true);
之后你可以制定一个通配符主题去匹配。例如clickstream.*.log.
这将会匹配如下所有的流 clickstream.my.log, clickstream.cart.log 等等。
for the bolt:
TopologyBuilder builder = new TopologyBuilder();
Fields fields = new Fields("key", "message");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
spout.setCycle(true);
builder.setSpout("spout", spout, 5);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.mon.serialization.StringSerializer");
props.put("value.serializer", "org.mon.serialization.StringSerializer");
KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
Config conf = new Config();
StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
For Trident:
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.mon.serialization.StringSerializer");
props.put("value.serializer", "org.mon.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
从kafka读数据(Spouts)
spout的实现使用KafkaSpoutConfig类来配置。这个类使用Build模式,可以通过调用其中一个Builds构造函数或通过调用KafkaSpoutConfig类中的静态方法构建器来启动。
创建构建器的构造函数或静态方法需要几个key values(稍后可以更改),但是启动一个spout所需的最小配置。
bootstrapServers与Kafka Consumer Property “bootstrap.servers”是相同的。
spout将消耗的主题可以是特定topic名称(1个或更多)的集合或正则表达式Pattern,它指定匹配该正则表达式的任何主题将被使用。
在构造函数的情况下,您可能还需要指定键解串器和值解串器。这是为了通过使用Java泛型来保证类型安全。默认值为StringDeserializer,
可以通过调用setKeyDeserializer 和/或 setValueDeserializer来覆盖。
如果这些设置为null,代码将回退到kafka属性中设置的内容,但是最好在这里明确表示,再次使用泛型来维护类型安全性。
有几个关键配置要注意
setFirstPollOffsetStrategy:允许您设置从哪里开始使用数据.这在故障恢复和首次启动spout的情况下都被使用。允许的值包括:
EARLIEST:无论以前的提交如何,kafka spout会轮询从分区的第一个偏移开始的记录
LATEST:无论以前的提交如何,kafka spout轮询(具有大于分区中最后一个偏移量的)偏移量的记录
UNCOMMITTED_EARLIEST(默认):kafka spout从最后提交的偏移量(如果有的话)中轮询记录,如果没有提交任何偏移量,则表现为EARLIEST。
UNCOMMITTED_LATEST:kafka spout从最后提交的偏移量(如果有的话)中轮询记录,如果没有提交任何偏移量,则表现为LATEST。
setRecordTranslator:允许您修改spout如何将Kafka Consume Record转换为tuple,以及将发布该元组的流。
默认情况下,”topic”, “partition”, “offset”, “key” 和 “value”将被发送到“默认”流。
如果要根据主题将条目输出到不同的流,则storm提供ByTopicRecordTranslator。有关如何使用这些的更多示例,请参见下文。
setProp:可以用来设置没有方便方法的kafka属性。
setGroupId:让你设置kafka消费者组属性“group.id”的id。
setSSLKeystore 和 setSSLTruststore:允许您配置SSL身份验证。
API是用java 8 lambda表达式写的,它可以与java7及以下版本配合使用。
创建一个简单的不安全Spout
以下将消费所有的发布到“topic”的事件,并发送他们到MyBolt有着字段”topic”, “partition”, “offset”, “key”, “value”。
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout&&(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
通配符topics
通配符主题将从指定代理列表中存在的所有主题消耗,并匹配该模式。所以在下面的例子中,”topic”, “topic_foo” and “topic_bar”将都会匹配”topic.*”,
但是”not_my_topic”不匹配。
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout&&(KafkaSpoutConfig.builder("127.0.0.1:" + port, pile("topic.*")).build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
Multiple Streams(多个流)
这使用了java 8 lambda表达式。
final TopologyBuilder tp = new TopologyBuilder();
ByTopicRecordTranslator byTopic = new ByTopicRecordTranslator&&( (r) -& new Values(r.topic(), r.key(), r.value()), new Fields("topic", "key", "value"), "STREAM_1");
byTopic.forTopic("topic_2", (r) -& new Values(r.key(), r.value()), new Fields("key", "value"), "STREAM_2");
tp.setSpout("kafka_spout", new KafkaSpout&&(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic_1", "topic_2", "topic_3").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout", "STREAM_1");
tp.setBolt("another", new myOtherBolt()).shuffleGrouping("kafka_spout", "STREAM_2");
final TridentTopology tridentTopology = new TridentTopology();
final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
new KafkaTridentSpoutOpaque&&(KafkaSpoutConfig.builder("127.0.0.1:" + port, pile("topic.*")).build()))
.parallelismHint(1)
Trident不支持多个流,并将忽略为输出设置的任何流。然而,如果每个输出主题的字段不相同,它将抛出异常,而不会继续。
Custom RecordTranslators(自定义RecordTranslators)(高级)
在大多数情况下,内置的SimpleRecordTranslator和ByTopicRecordTranslator应该能覆盖您的用例。
如果您遇到需要定制的情况,则本文档将介绍如何正确执行此操作,以及一些不太明显的类。
适用的要点是使用ConsumerRecord并将其转换为可以发出的List 。不明显的是如何告诉spout将其发射到特定的流。为此,
您将需要返回一个org.apache.storm.kafka.spout.KafkaTuple的实例。这提供了一个routedTo方法,它将说明tuple应该去哪个特定的流。
例如:return new KafkaTuple(1, 2, 3, 4).routedTo("bar");
Will cause the tuple to be emitted on the “bar” stream(将导致元组在“bar”流中发出)
在编写自定义record translators时要小心,因为就像在storm spout中,它需要自我一致。stream方法应该返回一组完整的流,这个转换器将会尝试发送。
另外getFieldsFor应为每个流返回一个有效的Fields对象。
如果您正在为Trident执行此操作,则值必须位于通过应用该流的Fields对象中的每个字段返回的List中,否则trident可能会抛出异常。
手动分区控制(高级)
默认情况下,Kafka将自动将分区分配给当前的一组分支。它处理很多事情,但在某些情况下,您可能需要手动分配分区。
当spout go down 并重新启动时,这可能会导致更少的churn(搅拌),但是如果没有完成,可能会导致很多问题。
这都可以通过子类化Subscription来处理,我们有几个实现,您可以查看有关如何执行此操作的示例。
ManualPartitionNamedSubscription 和 ManualPartitionPatternSubscription。请使用这些或实现你自己的时候要小心。
我的热门文章
即使是一小步也想与你分享kafka是一种分布式的,基于发布订阅的消息系统。具有以下几个方面的特性。
1.能够提供常数时间的消息持久化及访问性能。
2.高吞吐率。廉价的商用机器上能够达到每秒100k条的消息传输。
3.支持kafka server间的消息消息分区,分布式消费,分区内消息的顺序性。
4.支持水平扩展。
5.支持离线数据处理和实时数据处理。
kafka的拓扑结构:
1.producer:消息生产者。
2.consumer:消息消费者。
3.broker:kafka集群由一个或者多个服务器组成。服务器被称为broker。消息由producer发送到broker。consumer从borker中消费消息。
4.Toptic:消息主题。每条发送到kafka集群的消息都有一个Topic,物理上不同topic的消息分开存储 。逻辑上一个topic的消息保存于一个或多个broker上。
5.partition:消息分区。每个topic包括一个或多个partition
6.consumer group:每个consumer属于特定的group,可以为每个consumer指定group name,不指定,则属于默认的group。
& & & & & & & & & & & & & & & & & & & & & & & & & & & & & kafka拓扑结构图
从图中我么可以看出。kafka集群由若干producer,consumer grouper,broker,zookeeper组成。kafka通过zookeeper来管理集群的配置,以及在consumer发生变化进行reblance。
topic & partion
topic在逻辑上可以被理解为一个队列,消息必须指明它的topic,可以理解为消息必须指定放到哪一个队列中。为了提高kafka的吞吐率,物理上把topic分成一个或多个partion,每个partion物理上对应一个文件夹。该文件夹下存储该partition下的消息及索引文件。
若创建两个topic,topic1和topic2,每个topic对应有13个和19个分区,其中集群中共有8个结点,则集群中会创建32个文件夹。如下图所示:
每个日志文件都是一个log entry序列,每个log entry序列包含一个四字节整形值(消息长度,1+4+n),一字节magic value,四字节的crc校验码,n字节的消息体长度组成。每条消息都有在当前partition下的唯一的64字节的offset。它指明了消息的的存储位置,磁盘上消息的存储格式如下:
message length : 4 bytes (value: 1+4+n)
"magic" value : 1 byte
crc : 4 bytes
payload : n bytes
这个log entries并非由一个文件组成,而是分成多个segment,每个segment以该segment下的第一条消息的offset命名并以kafka为后缀。另外会有一个索引文件,他标明每个segment下的log entry的offset的范围,如下图所示:kafka高吞吐率的一个很重要的保证就是消息会被顺序写到partition中。如下图所示:对于传统的消息系统,通常会删除已经消费过的消息,kafka会保存已经消费的消息。并且根据实际情况对已经消费的消息提供两种删除策略,分别是基于消息的消费时间以及partition文件的大小。我们可以通过配置文件$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据。配置如下所示:
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false对于consumer消费的消息,消息的offset由consumer来控制,对于kafka来说消息时无状态的。kafka也不保证一个消息只由consumer group的一个consumer来消费,从而不需要锁机制,这也是kafka高吞吐率的一个重要保证。push&pullkafka采用push机制来推送消息到broker,pull机制来消费消息,push与pull机制各由优缺点。kafka采取pull机制消费消息可以简化broker的设计,push机制采取尽快的投递消息,这样很可能导致consumer来不及处理消息从而导致网络拥塞或者拒绝服务,通过consumer自己来控制何时消费消息。即可批量消费又可逐条消费,能够选择不同的提交方式,从而实现不同传输语义。消息递送的保证机制。
At most once 消息可能会丢,但绝不会重复传输
At least one 消息绝不会丢,但可能会重复传输
Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。截止到目前(Kafka 0.8.2版本,),这一Feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)。
当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。截止到目前(Kafka 0.8.2版本,),这一Feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)。
读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)
如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,Consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)
总之,Kafka默认保证At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。
阅读(...) 评论()ELK+Kafka和遇到的问题 – 80h大叔
CAPTCHA Code*}

我要回帖

更多关于 您尚未进行安全认证 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信