必备知识|彻底读懂高性能消息组件Apache Pulsar
内容介绍
- Pulsar介绍
- Pulsar关键特性
- Pulsar vs Kafka
- Pulsar架构设计
- Pulsar消息机制
- Pulsar Schema
- Pulsar Functions
- Pulsar Connectors
- Pulsar Deployment
- Pulsar Admin
- Pulsar Manager
- Pulsar Flink
- 更多福利
什么是Pulsar?
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性。
Pulsar 的关键特性
- Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
- 极低的发布延迟和端到端延迟。
- 可无缝扩展到超过 一百万 个 topic。
- 简单的客户端 API,支持 Java、Go、Python 和 C 。
- 支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。
- 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
- 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
- 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
- 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。
Pulsar vs Kafka
下方链接为Pulsar与 Kafka详细对比报告,可自行下载查看
https://streamnative.io/en/blog/tech/2020-07-08-pulsar-vs-kafka-part-1
https://streamnative.io/zh/blog/tech/2020-07-22-pulsar-vs-kafka-part-2
基准测试(StreamNative)
数据来源
https://mp.weixin.qq.com/s/UZJTOEpzX8foUJv9XMJxOw
https://streamnative.io/en/blog/tech/2020-11-09-benchmark-pulsar-kafka-performance
https://streamnative.io/whitepaper/benchmark-pulsar-vs-kafka
- 吞吐量(Throughput)
在与 Kafka 的持久性保证相同的情况下,Pulsar 可达到 605 MB/s 的发布和端到端吞吐量(与 Kafka 相同)以及 3.5 GB/s 的 catch-up read 吞吐量(比 Kafka 高 3.5 倍)。Pulsar 的吞吐量不会因分区数量的增加和持久性级别的改变而受到影响,而 Kafka 的吞吐量会因分区数量或持久性级别的改变而受到严重影响。
- 延迟性(Latency)
在不同的测试实例(包括不同订阅数量、不同主题数量和不同持久性保证)中,Pulsar 的延迟显著低于 Kafka。Pulsar P99 延迟在 5 到 15 毫秒之间。Kafka P99 延迟可能长达数秒,并且会因主题数量、订阅数量和不同持久性保证而受到巨大影响。
- 多语言客户端(C/C 、Python、Java、Go ...)
- 管理工具(Pulsar Manager vs Kafka Manager)
- 内置流处理Built-In Stream Processing(Pulsar Function vs Kafka Streams)
- Rich Integrations (Pulsar Connectors)
- Exactly-Once Processing
- 日志压缩
- 多租户(Pulsar)
- 安全管理(Pulsar)
架构设计
Pulsar采用存储和计算分离的软件架构。在消息领域,Pulsar 是第一个将存储计算分离云原生架构落地的开源项目。由于在 Broker 层不存储任何数据,这种架构为用户带来了更高的可用性、更灵活的扩容和管理、避免数据的 reblance 和 catch-up。
在 Apache Pulsar 的分层架构中,服务层 Broker 和存储层 BookKeeper 的每个节点都是对等的。Broker 仅仅负责消息的服务支持,不存储数据。这为服务层和存储层提供了瞬时的节点扩展和无缝的失效恢复。
持久化存储(Persistent storage)
Pulsar 使用 BookKeeper 分布式日志存储数据库作为存储组件,在底层使用日志作为存储模型。
Pulsar 将所有未确认消息(即未处理消息)存储在 BookKeeper 中的多个“bookie”服务器上。
BookKeeper 通过 Quorum Vote 的方式来实现数据的一致性,跟 Master/Slave 模式不同,BookKeeper 中每个节点也是对等的,对一份数据会并发地同时写入指定数目的存储节点。
一个Topic实际上是一个ledgers流。Ledger本身就是一个日志。所以一系列的子日志(Ledgers)组成了一个父日志(Topic)。
Ledgers追加到一个Topic,条目(消息或者一组消息)追加到Ledgers。Ledger一旦关闭是不可变的。Ledger作为最小的删除单元,也就是说我们不能删除单个条目而是去删除整个Ledger。
Ledgers本身也被分解为多个Fragment。Fragment是BookKeeper集群中最小的分布单元。
每个Ledger(由一个或多个Fragment组成)可以跨多个BookKeeper节点(Bookies)进行复制,以实现数据容灾和提升读取性能。每个Fragment都在一组不同的Bookies中复制(存在足够的Bookies)。
conf/bookkeeper.conf
-
#############################################################################
-
## Server parameters
-
#############################################################################
-
# Directories BookKeeper outputs its write ahead log.
-
# Could define multi directories to store write head logs, separated by ','.
-
journalDirectories=/data/appData/pulsar/bookkeeper/journal
-
-
#############################################################################
-
## Ledger storage settings
-
#############################################################################
-
# Directory Bookkeeper outputs ledger snapshots
-
# could define multi directories to store snapshots, separated by ','
-
ledgerDirectories=/data/appData/pulsar/bookkeeper/ledgers
conf/broker.conf
-
### --- Managed Ledger --- ###
-
-
# Number of bookies to use when creating a ledger
-
managedLedgerDefaultEnsembleSize=2
-
-
# Number of copies to store for each message
-
managedLedgerDefaultWriteQuorum=2
-
-
# Number of guaranteed copies (acks to wait before write is complete)
-
managedLedgerDefaultAckQuorum=2
元数据存储(Metadata storage)
Pulsar和BookKeeper都使用Apache Zookeeper来存储元数据和监控节点健康状况。
-
$ $PULSAR_HOME/bin/pulsar zookeeper-shell
-
> ls /
-
[admin, bookies, counters, ledgers, loadbalance, managed-ledgers, namespace, pulsar, schemas, stream, zookeeper]
消息机制
Pulsar 采用发布-订阅(pub-sub)的设计模式 。 该设计模式中,producer 发布消息到 topic, Consumer 订阅 topic、处理发布的消息,并在处理完成后发送确认。
一旦创建订阅,即使 consumer 断开连接,Pulsar 仍然可以保存所有消息。 在 consumer 确认消息已处理成功后,才会删除消息。
主题(Topic)
逻辑上一个Topic是日志结构,每个消息都在这个日志结构中有一个偏移量。Apache Pulsar使用游标来跟踪偏移量(Cursor Tracking)。
Pulsar 支持两种基本的 topic 类型:持久 topic 与非持久 topic。
{persistent|non-persistent}://tenant/namespace/topic
- Non-Partitioned topics
-
-
$ $PULSAR_HOME/bin/pulsar-admin topics \
-
list public/default
-
-
$ $PULSAR_HOME/bin/pulsar-admin topics \
-
create persistent://public/default/input-seed-avro-topic
-
-
$ $PULSAR_HOME/bin/pulsar-admin topics \
-
lookup persistent://public/default/input-seed-avro-topic
-
-
$ $PULSAR_HOME/bin/pulsar-admin topics \
-
delete persistent://public/default/input-seed-avro-topic
-
-
$ $PULSAR_HOME/bin/pulsar-admin topics \
-
stats persistent://public/default/input-seed-avro-topic
-
-
$ curl http://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats | python -m json.tool
Partitioned topics
-
-
$ $PULSAR_HOME/bin/pulsar-admin topics \
-
create-partitioned-topic persistent://public/default/output-seed-avro-topic \
-
--partitions 2
-
-
$ $PULSAR_HOME/bin/pulsar-admin topics \
-
list-partitioned-topics public/default
-
-
$ $PULSAR_HOME/bin/pulsar-admin topics \
-
get-partitioned-topic-metadata persistent://public/default/output-seed-avro-topic
-
-
$ $PULSAR_HOME/bin/pulsar-admin topics \
-
delete-partitioned-topic persistent://public/default/output-seed-avro-topic
消息(Message)
Messages are the basic "unit" of Pulsar.
-
public interface Message<T> {
-
-
Map<String, String> getProperties();
-
-
boolean hasProperty(String var1);
-
-
String getProperty(String var1);
-
-
byte[] getData();
-
-
T getValue();
-
-
MessageId getMessageId();
-
-
long getPublishTime();
-
-
long getEventTime();
-
-
long getSequenceId();
-
-
String getProducerName();
-
-
boolean hasKey();
-
-
String getKey();
-
-
boolean hasBase64EncodedKey();
-
-
byte[] getKeyBytes();
-
-
boolean hasOrderingKey();
-
-
byte[] getOrderingKey();
-
-
String getTopicName();
-
-
Optional<EncryptionContext> getEncryptionCtx();
-
-
int getRedeliveryCount();
-
-
byte[] getSchemaVersion();
-
-
boolean isReplicated();
-
-
String getReplicatedFrom();
-
}
生产者(Producer)
-
public void send() throws PulsarClientException {
-
final String serviceUrl = "pulsar://server-100:6650";
-
// final String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650";
-
-
// http://pulsar.apache.org/docs/en/client-libraries-java/#client
-
final PulsarClient client = PulsarClient.builder()
-
.serviceUrl(serviceUrl)
-
.connectionTimeout(10000, TimeUnit.MILLISECONDS)
-
.build();
-
-
final String topic = "persistent://public/default/topic-sensor-temp";
-
// http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
-
final Producer<byte[]> producer = client.newProducer()
-
.producerName("sensor-temp")
-
.topic(topic)
-
-
.compressionType(CompressionType.LZ4)
-
-
.enableChunking(true)
-
-
.enableBatching(true)
-
.batchingMaxBytes(1024)
-
.batchingMaxMessages(10)
-
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
-
-
.blockIfQueueFull(true)
-
.maxPendingMessages(512)
-
-
.sendTimeout(1, TimeUnit.SECONDS)
-
.create();
-
-
MessageId mid = producer.send("sensor-temp".getBytes());
-
System.out.printf("\nmessage with ID %s successfully sent", mid);
-
-
mid = producer.newMessage()
-
.key("sensor-temp-key")
-
.value("sensor-temp-key".getBytes())
-
.property("my-key", "my-value")
-
.property("my-other-key", "my-other-value")
-
.send();
-
System.out.printf("message-key with ID %s successfully sent", mid);
-
-
producer.close();
-
client.close();
-
}
-
-
消费者(Consumer)
-
public void consume() throws PulsarClientException {
-
final String serviceUrl = "pulsar://server-101:6650";
-
final String topic = "input-seed-avro-topic";
-
-
final PulsarClient client = PulsarClient.builder()
-
.serviceUrl(serviceUrl)
-
.enableTcpNoDelay(true)
-
.build();
-
-
final Consumer<byte[]> consumer = client
-
.newConsumer()
-
.consumerName("seed-avro-consumer")
-
.subscriptionName("seed-avro-subscription")
-
.subscriptionType(SubscriptionType.Exclusive)
-
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-
.topic(topic)
-
.receiverQueueSize(10)
-
.subscribe();
-
-
final AvroSchema<SeedEvent> schema = AvroSchema.of(SeedEvent.class);
-
while (true) {
-
try {
-
final Message<byte[]> msg = consumer.receive();
-
LOG.info("接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}",
-
Thread.currentThread().getId(),
-
msg.getTopicName(),
-
msg.getMessageId(),
-
msg.getSequenceId(),
-
msg.getEventTime(),
-
msg.getPublishTime(),
-
msg.getProducerName(),
-
msg.getKey(), schema.decode(msg.getValue()));
-
try {
-
consumer.acknowledge(msg);
-
} catch (final PulsarClientException e) {
-
consumer.negativeAcknowledge(msg);
-
LOG.error("acknowledge:" e.getLocalizedMessage(), e);
-
}
-
} catch (final PulsarClientException e) {
-
LOG.error("receive:" e.getLocalizedMessage(), e);
-
}
-
}
-
}
订阅(Subscriptions)
消费者通过订阅来消费Topic中的消息。订阅是游标(跟踪偏移量)的逻辑实体,一个Topic可以添加多个订阅。订阅不包含消息的数据,只包含元数据和游标。
每个Subscription都存储一个Cursor。Cursor是日志中的当前偏移量。Subscription将其Cursor存储至BookKeeper的Ledger中。这使Cursor跟踪可以像Topic一样进行扩展。
订阅类型(subscription-type)
- Exclusive 独享
一个订阅只能有一个消息者消费消息。
- Failover 灾备
一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。
- Shared 共享
一个订阅中同时可以有多个消费者,多个消费者共享Topic中的消息。
- Key_Shared
有序性保证(Ordering guarantee)
如果对顺序性有要求,可以使用 Exclusive 和 Failover 的订阅模式,这样同一个 Topic 只有一个 Consumer 在消费,可以保证顺序性。
如果使用 Shared 订阅模式,多个 Consumer 可以并发消费同一个 Topic。通过动态增加 Consumer 的数量,可以加速 Topic 的消费,减少消息在服务端的堆积。
KeyShared 模式保证在 Shared 模式下同一个 Key 的消息也会发送到同一个 Consumer,在并发的同时也保证了顺序性。
多主题订阅(Multi-topic subscriptions)
Pattern:
- persistent://public/default/.*
- persistent://public/default/foo.*
Reader
-
public void read() throws IOException {
-
final String serviceUrl = "pulsar://server-101:6650";
-
final PulsarClient client = PulsarClient.builder()
-
.serviceUrl(serviceUrl)
-
.build();
-
-
// http://pulsar.apache.org/docs/en/client-libraries-java/#reader
-
final Reader<byte[]> reader = client.newReader()
-
.topic("my-topic")
-
.startMessageId(MessageId.earliest()) // MessageId.latest
-
.create();
-
-
while (true) {
-
final Message<byte[]> message = reader.readNext();
-
System.out.println(new String(message.getData()));
-
}
-
-
}
分片主题(Partitioned topics)
消息保留和过期(Message retention and expiry)
如果没有对Topic设置数据保留策略,一旦一个Topic的所有订阅的游标都已经成功消费到一个偏移量时,此偏移量前面的消息就会被自动删除。
如果Topic设置了数据保留策略,已经消费确认的消息超过保留策略阈值(Topic的消息存储大小、Topic中消息保留的时间)后会被删除。
conf/broker.conf
-
# Default message retention time
-
# 默认0, 修改为3天=60*24*3
-
defaultRetentionTimeInMinutes=4320
-
-
# Default retention size
-
# 默认为0, 修改为10G
-
defaultRetentionSizeInMB=10240
-
-
# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
-
ttlDurationDefaultInSeconds=0
retention policy (for a namespace)
-
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
-
get-retention public/default
-
-
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/retention | python -m json.tool
-
-
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
-
set-retention public/default \
-
--size 1024M \
-
--time 5m
-
-
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/retention \
-
--header "Content-Type:application/json" \
-
--data '{
-
"retentionTimeInMinutes" : 5,
-
"retentionSizeInMB" : 1024
-
}'
message expiry / message-ttl
-
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
-
get-message-ttl public/default
-
-
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/messageTTL
-
-
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
-
set-message-ttl public/default \
-
--messageTTL 1800
-
-
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/messageTTL \
-
--header "Content-Type:application/json" \
-
--data '1800'
Pulsar Schema
Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types to more complex application-specific types.
- 类型安全(序列化和反序列化)
- Schema 帮助 Pulsar 保留了数据在其他系统中原有的含义
Schema类型(Schema type)
- Primitive type
-
Producer<String> producer = client.newProducer(Schema.STRING).create();
-
producer.newMessage().value("Hello Pulsar!").send();
-
-
Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
-
consumer.receive();
- Complex type
1. keyvalue key/value pair.
-
Schema<KeyValue<Integer, String>> schema = Schema.KeyValue(
-
Schema.INT32,
-
Schema.STRING,
-
KeyValueEncodingType.SEPARATED
-
);
-
-
// Producer
-
Producer<KeyValue<Integer, String>> producer = client.newProducer(schema)
-
.topic(TOPIC)
-
.create();
-
final int key = 100;
-
final String value = "value-100";
-
producer.newMessage().value(new KeyValue<>(key, value)).send();
-
-
// Consumer
-
Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(schema)
-
.topic(TOPIC).subscriptionName(SubscriptionName).subscribe();
-
Message<KeyValue<Integer, String>> msg = consumer.receive();
2.struct AVRO, JSON, and Protobuf.
-
Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
-
producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send();
-
-
Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
-
User user = consumer.receive();
Schema工作方式(How does schema work)
Producer
Consumer
Schema管理(Schema manual management)
查询Schema
-
$ $PULSAR_HOME/bin/pulsar-admin schemas \
-
get persistent://public/default/spirit-avro-topic
-
-
$ $PULSAR_HOME/bin/pulsar-admin schemas \
-
get persistent://public/default/spirit-avro-topic \
-
--version=2
更新Schema
-
$ $PULSAR_HOME/bin/pulsar-admin schemas upload \
-
persistent://public/default/test-topic \
-
--filename $PULSAR_HOME/connectors/json-schema.json
提取Schema
-
$ $PULSAR_HOME/bin/pulsar-admin schemas \
-
extract persistent://public/default/test-topic \
-
--classname com.cloudwise.modal.Packet \
-
--jar ~/cloudwise-pulsar-1.0.0-RELEASE.jar \
-
--type json
-
-
public void schemaInfo() {
-
System.out.println("AvroSchema:" AvroSchema.of(SeedEvent.class).getSchemaInfo());
-
System.out.println("Schema.AVRO:" Schema.AVRO(SeedEvent.class).getSchemaInfo());
-
}
删除Schema
-
$ $PULSAR_HOME/bin/pulsar-admin schemas \
-
delete persistent://public/default/spirit-avro-topic
Pulsar Functions
编程模型(Programming model)
开启Functions
- conf/bookkeeper.conf
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
- conf/broker.conf
functionsWorkerEnabled=true
- conf/functions_worker.yml
-
pulsarFunctionsCluster: pulsar-cluster
-
-
numFunctionPackageReplicas: 2
窗口(window)
- windowLengthCount 每个窗口的消息数量
- slidingIntervalCount 窗口滑动后的消息数量
- windowLengthDurationMs 窗口时间
- slidingIntervalDurationMs 窗口滑动后的时间
开窗函数
-
public class WordCountWindowFunction implements org.apache.pulsar.functions.api.WindowFunction<String, Void> {
-
-
-
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
-
for (Record<String> input : inputs) {
-
}
-
return null;
-
}
-
-
}
运行函数
- 时间,滑动窗口
--user-config '{"windowLengthDurationMs":"60000", "slidingIntervalDurationMs":"1000"}'
- 时间,滚动窗口
--user-config '{"windowLengthDurationMs":"60000"}'
- 数量,滑动窗口
--user-config '{"windowLengthCount":"100", "slidingIntervalCount":"10"}'
- 数量,滚动窗口
--user-config '{"windowLengthCount":"100"}'
Java编程
pom.xml
-
<dependency>
-
<groupId>org.apache.pulsar</groupId>
-
<artifactId>pulsar-client</artifactId>
-
<version>${pulsar.version}</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.pulsar</groupId>
-
<artifactId>pulsar-functions-api</artifactId>
-
<version>${pulsar.version}</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.pulsar</groupId>
-
<artifactId>pulsar-functions-local-runner</artifactId>
-
<version>${pulsar.version}</version>
-
</dependency>
- WordCount
-
public class WordCountFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
-
-
-
public Void process(String input, Context context) throws Exception {
-
Arrays.asList(input.split(" ")).forEach(word -> {
-
String counterKey = word.toLowerCase();
-
if (context.getCounter(counterKey) == 0) {
-
context.putState(counterKey, ByteBuffer.wrap(ByteUtils.from(100)));
-
}
-
context.incrCounter(counterKey, 1);
-
});
-
return null;
-
}
-
-
}
-
$ $PULSAR_HOME/bin/pulsar-admin functions create \
-
--broker-service-url pulsar://server-101:6650 \
-
--jar target/cloudwise-pulsar-functions-with-dependencies.jar \
-
--classname com.cloudwise.quickstart.pulsar.functions.WordCountFunction \
-
--tenant public \
-
--namespace default \
-
--name word-count-function \
-
--inputs persistent://public/default/sentences \
-
--output persistent://public/default/wordcount
- 动态路由
-
/**
-
* 基本思路是检查每条消息的内容,根据消息内容将消息路由到不同目的地。
-
*/
-
public class RoutingFunction implements org.apache.pulsar.functions.api.Function<String, String> {
-
-
-
public String process(String input, Context context) throws Exception {
-
String regex = context.getUserConfigValue("regex").toString();
-
String matchedTopic = context.getUserConfigValue("matched-topic").toString();
-
String unmatchedTopic = context.getUserConfigValue("unmatched-topic").toString();
-
-
Pattern pattern = Pattern.compile(regex);
-
Matcher matcher = pattern.matcher(input);
-
if (matcher.matches()) {
-
context.newOutputMessage(matchedTopic, Schema.STRING).value(input).send();
-
} else {
-
context.newOutputMessage(unmatchedTopic, Schema.STRING).value(input).send();
-
}
-
return null;
-
}
-
-
}
- log-topic
-
public class LoggingFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
-
-
-
public Void process(String s, Context context) throws Exception {
-
Logger LOG = context.getLogger();
-
String messageId = context.getFunctionId();
-
if (s.contains("danger")) {
-
LOG.warn("A warning was received in message {}", messageId);
-
} else {
-
LOG.info("Message {} received\nContent: {}", messageId, s);
-
}
-
return null;
-
}
-
-
}
-
$ $PULSAR_HOME/bin/pulsar-admin functions create \
-
--jar cloudwise-pulsar-functions-1.0.0.jar \
-
--classname com.cloudwise.quickstart.pulsar.functions.LoggingFunction \
-
--log-topic persistent://public/default/logging-function-logs
- user-config
-
public class UserConfigFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
-
-
-
public Void process(String s, Context context) throws Exception {
-
Logger log = context.getLogger();
-
Optional<Object> value = context.getUserConfigValue("word-of-the-day");
-
if (value.isPresent()) {
-
log.info("The word of the day is {}", value);
-
} else {
-
log.warn("No word of the day provided");
-
}
-
return null;
-
}
-
-
}
-
$ $PULSAR_HOME/bin/pulsar-admin functions create \
-
--broker-service-url pulsar://server-101:6650 \
-
--jar target/cloudwise-pulsar-functions-with-dependencies.jar \
-
--classname com.cloudwise.quickstart.pulsar.functions.UserConfigFunction \
-
--tenant public \
-
--namespace default \
-
--name word-count-function \
-
--inputs persistent://public/default/userconfig \
-
--user-config '{"word-of-the-day":"verdure"}'
Pulsar Connectors
消息处理(Processing guarantee)
- at-most-once
- at-least-once
- effectively-once
操作流程(JDBC sink)
- Add a configuration file.
- Create a schema.
- Upload a schema to a topic.
- Create a JDBC sink
- Stop a JDBC sink
- Restart a JDBC sink
- Update a JDBC sink
内建连接器(Built-in connector)
Source connector
- Canal
- File
- Flume
- Kafka
- RabbitMQ
Sink connector
- ElasticSearch/Solr
- Flume
- HBase
- HDFS2/HDFS3
- InfluxDB
- JDBC ClickHouse/MariaDB/PostgreSQL
- Kafka
- MongoDB
- RabbitMQ
- Redis
ClickHouse Sink
- 创建表
-
CREATE DATABASE IF NOT EXISTS monitor;
-
CREATE TABLE IF NOT EXISTS monitor.pulsar_clickhouse_jdbc_sink
-
(
-
id UInt32,
-
name String
-
) ENGINE = TinyLog;
-
-
INSERT INTO monitor.pulsar_clickhouse_jdbc_sink (id, name)
-
VALUES (1, 'tmp');
-
-
SELECT *
-
FROM monitor.pulsar_clickhouse_jdbc_sink;
- 创建配置
-
$ vi $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml
-
-
{
-
"userName": "sysop",
-
"password": "123456",
-
"jdbcUrl": "jdbc:clickhouse://server-101:8123/monitor",
-
"tableName": "pulsar_clickhouse_jdbc_sink"
-
}
- 创建schema
$ vi $PULSAR_HOME/connectors/json-schema.json
-
{
-
"name": "",
-
"schema": {
-
"type": "record",
-
"name": "SeedEvent",
-
"namespace": "com.cloudwise.quickstart.model",
-
"fields": [
-
{
-
"name": "id",
-
"type": [
-
"null",
-
"int"
-
]
-
},
-
{
-
"name": "name",
-
"type": [
-
"null",
-
"string"
-
]
-
}
-
]
-
},
-
"type": "JSON",
-
"properties": {
-
"__alwaysAllowNull": "true",
-
"__jsr310ConversionEnabled": "false"
-
}
-
}
- 上传schema
-
$ $PULSAR_HOME/bin/pulsar-admin schemas upload \
-
pulsar-postgres-jdbc-sink-topic \
-
-f $PULSAR_HOME/connectors/json-schema.json
- 运行
-
$ $PULSAR_HOME/bin/pulsar-admin sinks create \
-
--tenant public \
-
--namespace default \
-
--name pulsar-clickhouse-jdbc-sink \
-
--inputs pulsar-clickhouse-jdbc-sink-topic \
-
--sink-config-file $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml \
-
--archive $PULSAR_HOME/connectors/pulsar-io-jdbc-clickhouse-2.6.2.nar \
-
--processing-guarantees EFFECTIVELY_ONCE \
-
--parallelism 1
Pulsar Deployment
目录结构
/opt/pulsar-2.6.2
├── bin
│ ├── bookkeeper
│ ├── function-localrunner
│ ├── proto
│ ├── pulsar
│ ├── pulsar-admin
│ ├── pulsar-admin-common.sh
│ ├── pulsar-client
│ ├── pulsar-daemon
│ ├── pulsar-managed-ledger-admin
│ └── pulsar-perf
├── conf
│ ├── bkenv.sh
│ ├── bookkeeper.conf
│ ├── broker.conf
│ ├── client.conf
│ ├── discovery.conf
│ ├── filesystem_offload_core_site.xml
│ ├── functions-logging
│ ├── functions_worker.yml
│ ├── global_zookeeper.conf
│ ├── log4j2-scripts
│ ├── log4j2.yaml
│ ├── presto
│ ├── proxy.conf
│ ├── pulsar_env.sh
│ ├── pulsar_tools_env.sh
│ ├── schema_example.conf
│ ├── standalone.conf
│ ├── websocket.conf
│ └── zookeeper.conf
├── examples
│ ├── api-examples.jar
│ ├── example-function-config.yaml
│ ├── example-window-function-config.yaml
│ └── python-examples
├── instances
│ ├── deps
│ ├── java-instance.jar
│ └── python-instance
├── lib
│ └── presto
├── LICENSE
├── licenses
├── NOTICE
└── README
单机(Standalone)
-
# 前台启动
-
$ $PULSAR_HOME/bin/pulsar standalone
-
-
# 后台启动
-
$ $PULSAR_HOME/bin/pulsar-daemon start standalone
-
$ jps | grep -v Jps
-
1873 PulsarStandaloneStarter
-
-
# 后台停止
-
$ $PULSAR_HOME/bin/pulsar-daemon stop standalone -force
集群(Cluster)
- 部署ZooKeeper集群
- 初始化集群元信息
- 部署BookKeeper集群
- 部署一个或多个PulsarBroker
客户端(Client)
-
# consumer
-
$ $PULSAR_HOME/bin/pulsar-client consume \
-
persistent://public/default/seed-avro-topic \
-
--subscription-name cli-pack-avro-subscription \
-
--subscription-type Exclusive \
-
--subscription-position Latest \
-
--num-messages 0
-
-
# producer
-
$ $PULSAR_HOME/bin/pulsar-client produce \
-
persistent://public/default/seed-avro-topic \
-
--num-produce 100 \
-
--messages "Hello Pulsar" \
-
--separator ","
Pulsar Admin
API
- pulsar-admin
- REST API
源码:apache-pulsar-2.6.2-src/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/*.java
Bookies.java Namespaces.java Tenants.java
BrokerStats.java NonPersistentTopics.java Worker.java
Brokers.java PersistentTopics.java WorkerStats.java
Clusters.java ResourceQuotas.java
Functions.java SchemasResource.java
- Java admin client
-
public void createNonPartitionedTopic() throws PulsarClientException {
-
final String serviceHttpUrl = "http://10.2.2.26:8080";
-
-
final PulsarAdmin admin = PulsarAdmin.builder()
-
.serviceHttpUrl(serviceHttpUrl)
-
.build();
-
try {
-
final String namespace = "public/monitor";
-
-
List<String> topics = admin.topics().getList(namespace);
-
topics.forEach(t -> System.err.println("before topic:" t));
-
-
// 以下几种写法是等效的
-
// final String topic = "input-3-seed-avro-topic";
-
// final String topic = "public/monitor/input-seed-avro-topic";
-
final String topic = "persistent://public/default/input-5-seed-avro-topic";
-
if (topics.indexOf(topic) == -1) {
-
admin.topics().createNonPartitionedTopic(topic);
-
admin.schemas().createSchema(topic,
-
AvroSchema.of(SeedEvent.class).getSchemaInfo());
-
}
-
-
topics = admin.topics().getList(namespace);
-
topics.forEach(t -> System.err.println("after topic:" t));
-
-
System.err.println("schema:" admin.schemas().getSchemaInfo(topic));
-
} catch (final PulsarAdminException e) {
-
e.printStackTrace();
-
}
-
-
admin.close();
-
}
Manage Pulsar
- Clusters
$ $PULSAR_HOME/bin/pulsar-admin clusters
- Tenants
$ $PULSAR_HOME/bin/pulsar-admin tenants
- Brokers
$ $PULSAR_HOME/bin/pulsar-admin brokers
- Namespaces
$ $PULSAR_HOME/bin/pulsar-admin namespaces
- Permissions
- Persistent topics
- Non-Persistent topics
- Partitioned topics
- Non-Partitioned topics
$ $PULSAR_HOME/bin/pulsar-admin topics
- Schemas
$ $PULSAR_HOME/bin/pulsar-admin schemas
- Functions
$ $PULSAR_HOME/bin/pulsar-admin functions
Pulsar Manager
http://pulsar.apache.org/docs/zh-CN/administration-pulsar-manager/
https://github.com/apache/pulsar-manager
WebUI
http://localhost:7750/ui/index.html
username/password: admin/123456
- Environments
- Management
- Clusters
- Tenants
- Namespaces
- Topics
- Tokents
Pulsar Flink
https://github.com/streamnative/pulsar-flink
https://dl.bintray.com/streamnative/maven/io/streamnative/connectors/
pom.xml
-
<properties>
-
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-
<java.version>1.8</java.version>
-
<flink.version>1.11.2</flink.version>
-
<scala.binary.version>2.12</scala.binary.version>
-
<maven.compiler.source>${java.version}</maven.compiler.source>
-
<maven.compiler.target>${java.version}</maven.compiler.target>
-
</properties>
-
-
<repositories>
-
<repository>
-
<id>central</id>
-
<layout>default</layout>
-
<url>https://repo1.maven.org/maven2</url>
-
</repository>
-
<repository>
-
<id>bintray</id>
-
<name>bintray</name>
-
<url>https://dl.bintray.com/streamnative/maven</url>
-
</repository>
-
</repositories>
-
-
<dependencies>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-clients_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>provided</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-java</artifactId>
-
<version>${flink.version}</version>
-
<scope>provided</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>provided</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>provided</scope>
-
</dependency>
-
-
<!--statebackend -->
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>provided</scope>
-
</dependency>
-
-
<!--pulsar -->
-
<dependency>
-
<groupId>io.streamnative.connectors</groupId>
-
<artifactId>pulsar-flink-${scala.binary.version}-${flink.version}</artifactId>
-
<version>2.5.4.1</version>
-
</dependency>
-
-
<!-- format -->
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-avro</artifactId>
-
<version>${flink.version}</version>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-json</artifactId>
-
<version>${flink.version}</version>
-
</dependency><!-- -->
-
-
<dependency>
-
<groupId>org.slf4j</groupId>
-
<artifactId>slf4j-log4j12</artifactId>
-
<version>1.7.7</version>
-
<scope>runtime</scope>
-
</dependency>
-
<dependency>
-
<groupId>log4j</groupId>
-
<artifactId>log4j</artifactId>
-
<version>1.2.17</version>
-
<scope>runtime</scope>
-
</dependency>
-
-
<dependency>
-
<groupId>junit</groupId>
-
<artifactId>junit</artifactId>
-
<version>4.12</version>
-
<scope>test</scope>
-
</dependency>
-
</dependencies>
FlinkPulsarSink
-
public class PulsarSinkJob {
-
-
private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkJob.class);
-
-
public static SourceFunction<SeedEvent> getSeedSource() {
-
final int interval = 5000;
-
return new PeriodicEventSource<>(
-
Integer.MAX_VALUE, interval, new PeriodicEventSource.Creator<SeedEvent>() {
-
-
private static final long serialVersionUID = 1L;
-
-
-
public Collection<SeedEvent> build(long i) {
-
return Arrays.stream(new String[]{"TEM-A-01", "HUM-A-01", "PRS-A-01"})
-
.map(code -> {
-
final SeedEvent event = new SeedEvent(
-
Instant.now().toEpochMilli(), code, Long.toString(i));
-
LOG.info("创建消息:[{}] {}", Thread.currentThread().getId(), event);
-
return event;
-
})
-
.collect(Collectors.toList());
-
}
-
-
-
public Class<SeedEvent> clazz() {
-
return SeedEvent.class;
-
}
-
-
});
-
}
-
-
public static FlinkPulsarSink<SeedEvent> getPulsarSink(ParameterTool params) {
-
// String adminUrl = "http://server-101:8080,server-102:8080,server-103:8080";
-
final String serviceUrl = params.get("serviceUrl", "pulsar://10.2.2.26:6650");
-
final String adminUrl = params.get("adminUrl", "http://10.2.2.26:8080");
-
-
final String outputTopic = params.get("topic", "output-seed-avro-topic");
-
-
final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
-
final String authParams = params.get("authParams");
-
-
final Properties props = new Properties();
-
props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "true");
-
props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000");
-
-
final ClientConfigurationData clientConf = new ClientConfigurationData();
-
clientConf.setServiceUrl(serviceUrl);
-
clientConf.setConnectionTimeoutMs(6000);
-
clientConf.setUseTcpNoDelay(true);
-
if (!StringUtils.isNullOrWhitespaceOnly(authParams)) {
-
clientConf.setUseTls(true);
-
clientConf.setAuthPluginClassName(authPlugin);
-
clientConf.setAuthParams(authParams);
-
}
-
-
final TopicKeyExtractor<SeedEvent> topicKeyExtractor = new TopicKeyExtractor<SeedEvent>() {
-
-
private static final long serialVersionUID = 1L;
-
-
-
public byte[] serializeKey(SeedEvent element) {
-
LOG.info("serializeKey:[{}] {}", Thread.currentThread().getId(), element);
-
return element.getCode().getBytes();
-
}
-
-
-
public String getTopic(SeedEvent element) {
-
return null;
-
}
-
};
-
-
final FlinkPulsarSink<SeedEvent> sink = new FlinkPulsarSink<>(
-
adminUrl, Optional.of(outputTopic), clientConf, props, topicKeyExtractor, SeedEvent.class);
-
-
return sink;
-
}
-
-
-
public static void main(String[] args) {
-
final ParameterTool params = ParameterTool.fromArgs(args);
-
-
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
-
env.setStateBackend(new RocksDBStateBackend(new FsStateBackend("file:///tmp/checkpoint/")));
-
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
-
// Job取消和故障时会保留Checkpoint数据, 以便根据实际需要恢复到指定的Checkpoint
-
env.getCheckpointConfig().enableExternalizedCheckpoints(
-
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
-
// 确保Checkpoint之间有至少500ms的间隔(Checkpoint最小间隔)
-
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
-
// 检查点必须在一分钟内完成, 否则被丢弃(Checkpoint的超时时间)
-
env.getCheckpointConfig().setCheckpointTimeout(60000);
-
// 同一时间只允许进行一个检查点
-
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-
env.getConfig().setGlobalJobParameters(params);
-
-
// DataStream<SeedEvent> stream = env.fromCollection(getSeedEvents()).name("Collection");
-
final DataStream<SeedEvent> stream = env.addSource(getSeedSource()).name("SourceFunction");
-
-
final DataStream<SeedEvent> result = stream
-
.keyBy(new KeySelector<SeedEvent, String>() {
-
-
private static final long serialVersionUID = 1L;
-
-
-
public String getKey(SeedEvent value) throws Exception {
-
return value.getCode();
-
}
-
-
})
-
.process(new KeyedProcessFunction<String, SeedEvent, SeedEvent>() {
-
-
private static final long serialVersionUID = 1L;
-
-
private Map<String, String> infos;
-
private transient ListState<String> state;
-
-
-
public void open(Configuration parameters) throws Exception {
-
LOG.info("open...");
-
this.state = getRuntimeContext().getListState(
-
new ListStateDescriptor<>("state", String.class));
-
-
this.infos = new HashMap<>();
-
this.infos.put("open", LocalDateTime.now().toString());
-
}
-
-
-
public void close() throws Exception {
-
LOG.info("close...");
-
}
-
-
-
public void processElement(SeedEvent value,
-
KeyedProcessFunction<String, SeedEvent, SeedEvent>.Context ctx, Collector<SeedEvent> out)
-
throws Exception {
-
LOG.info("processElement...");
-
-
final StringBuffer buffer = new StringBuffer();
-
this.state.get().forEach(t -> buffer.append(t));
-
LOG.info("CurrentKey:{} Input:{} State:{} Infos:{}",
-
ctx.getCurrentKey(), value, buffer, this.infos);
-
-
value.setPayload("[Prev]" value.getPayload());
-
-
this.state.clear();
-
this.state.add(value.toString());
-
-
out.collect(value);
-
}
-
-
})
-
.setParallelism(1);
-
-
result
-
.print()
-
.setParallelism(1);
-
-
result
-
.addSink(getPulsarSink(params))
-
.name("FlinkPulsarSink")
-
.setParallelism(2);
-
-
LOG.info("ExecutionPlan:{}", env.getExecutionPlan());
-
-
try {
-
env.execute("PulsarSinkJob");
-
} catch (final Exception e) {
-
e.printStackTrace();
-
}
-
}
-
-
}
FlinkPulsarSource
-
public class PulsarSourceJob {
-
-
private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceJob.class);
-
-
public static FlinkPulsarSource<SeedEvent> getPulsarSource(ParameterTool params) {
-
// String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650";
-
// String adminUrl = "http://server-101:8080,server-102:8080,server-103:8080";
-
// final String serviceUrl = params.get("serviceUrl", "pulsar://server-101:6650");
-
// final String adminUrl = params.get("adminUrl", "http://server-101:8080");
-
final String serviceUrl = params.get("serviceUrl", "pulsar://10.2.2.26:6650");
-
final String adminUrl = params.get("adminUrl", "http://10.2.2.26:8080");
-
-
// final String inputTopic = params.get("topic", "input-1-seed-avro-topic");
-
// final String subscription = params.get("subscription", "seed-subscription");
-
-
final String inputTopics = params.get("topic", "persistent://public/yang11/zlp.gjsjbz.gjbzcd3");
-
// final String inputTopics = params.get("topic", "public/monitor/input-0-seed-avro-topic");
-
final String subscription = params.get("subscription", "mutil-seed-subscription");
-
-
// final String inputTopicPatten = params.get("topicPatten", "input-1-seed-avro-topic");
-
// final String subscription = params.get("subscription", "patten-seed-subscription");
-
-
final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
-
final String authParams = params.get("authParams");
-
-
final Properties props = new Properties();
-
// http://pulsar.apache.org/docs/en/client-libraries-java/#reader
-
props.setProperty(PulsarOptions.PULSAR_READER_OPTION_KEY_PREFIX "receiverQueueSize", "2000");
-
-
// props.setProperty(PulsarOptions.TOPIC_SINGLE_OPTION_KEY, inputTopic);
-
props.setProperty(PulsarOptions.TOPIC_MULTI_OPTION_KEY, inputTopics);
-
// props.setProperty(PulsarOptions.TOPIC_PATTERN_OPTION_KEY, inputTopicPatten);
-
-
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getPartitionDiscoveryIntervalInMillis
-
props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000"); // 自动发现topic时间间隔,默认-1
-
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getClientCacheSize
-
props.setProperty(PulsarOptions.CLIENT_CACHE_SIZE_OPTION_KEY, "5");
-
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.flushOnCheckpoint
-
props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "true");
-
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.failOnWrite
-
props.setProperty(PulsarOptions.FAIL_ON_WRITE_OPTION_KEY, "false");
-
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getPollTimeoutMs
-
props.setProperty(PulsarOptions.POLL_TIMEOUT_MS_OPTION_KEY, "120000");
-
// org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher
-
props.setProperty(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, subscription);
-
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getCommitMaxRetries
-
props.setProperty(PulsarOptions.COMMIT_MAX_RETRIES, "3");
-
// org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.PulsarFetcher
-
props.setProperty(PulsarOptions.FAIL_ON_DATA_LOSS_OPTION_KEY, "false");
-
-
final ClientConfigurationData clientConf = new ClientConfigurationData();
-
clientConf.setServiceUrl(serviceUrl);
-
clientConf.setConnectionTimeoutMs(6000);
-
if (!StringUtils.isNullOrWhitespaceOnly(authParams)) {
-
clientConf.setUseTls(true);
-
clientConf.setAuthPluginClassName(authPlugin);
-
clientConf.setAuthParams(authParams);
-
}
-
-
PulsarDeserializationSchema<SeedEvent> deserializer = null;
-
deserializer = new PulsarDeserializationSchemaWrapper<>(AvroDeser.of(SeedEvent.class));
-
deserializer = new PulsarDeserializationSchema<SeedEvent>() {
-
-
private static final long serialVersionUID = 1L;
-
-
private final DeserializationSchema<SeedEvent> schema = AvroDeser.of(SeedEvent.class);
-
-
public void open(DeserializationSchema.InitializationContext context) throws Exception {
-
this.schema.open(context);
-
}
-
-
-
public TypeInformation<SeedEvent> getProducedType() {
-
return this.schema.getProducedType();
-
}
-
-
-
public boolean isEndOfStream(SeedEvent nextElement) {
-
return this.schema.isEndOfStream(nextElement);
-
}
-
-
-
public SeedEvent deserialize( Message message)throws IOException {
-
LOG.info("{}", new String(message.getData()));
-
final SeedEvent value = new SeedEvent();
-
// final SeedEvent value = this.schema.deserialize(message.getData());
-
LOG.info("接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}",
-
Thread.currentThread().getId(),
-
message.getTopicName(),
-
message.getMessageId(),
-
message.getSequenceId(),
-
message.getEventTime(),
-
message.getPublishTime(),
-
message.getProducerName(),
-
message.getKey(), value);
-
return value;
-
}
-
-
};
-
-
final FlinkPulsarSource<SeedEvent> source = new FlinkPulsarSource<>(
-
adminUrl, clientConf, deserializer, props);
-
source.setStartFromEarliest();
-
// source.setStartFromSubscription(subscription);
-
// source.setStartFromLatest();
-
-
return source;
-
}
-
-
-
public static void main(String[] args) {
-
final ParameterTool params = ParameterTool.fromArgs(args);
-
-
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
env.setStateBackend(new RocksDBStateBackend(new FsStateBackend("file:///tmp/checkpoint/")));
-
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
-
// 必须开启Checkpoint, 才能从上一次未消费处开始消费, 否则从头开始消费
-
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
-
env.getConfig().setGlobalJobParameters(params);
-
env.setParallelism(1);
-
-
final DataStream<String> stream = env
-
.addSource(getPulsarSource(params))
-
.name("FlinkPulsarSource")
-
.uid("PulsarSource")
-
.setParallelism(1)
-
.map(new MapFunction<SeedEvent, String>() {
-
-
private static final long serialVersionUID = 1L;
-
-
-
public String map(SeedEvent value) throws Exception {
-
return "[SourceJob]" value;
-
}
-
});
-
-
stream
-
.print()
-
.name("[Print]")
-
.uid("PrintSink")
-
.setParallelism(1);
-
-
try {
-
env.execute("PulsarSourceJob");
-
} catch (final Exception e) {
-
e.printStackTrace();
-
}
-
}
-
-
}
写在最后
近年来,在AIOps领域快速发展的背景下,IT工具、平台能力、解决方案、AI场景及可用数据集的迫切需求在各行业迸发。基于此,云智慧在2021年8月发布了AIOps社区,旨在树起一面开源旗帜,为各行业客户、用户、研究者和开发者们构建活跃的用户及开发者社区,共同贡献及解决行业难题、促进该领域技术发展。
社区先后开源了数据可视化编排平台-FlyFish、运维管理平台OMP、云服务管理平台-摩尔平台、Hours算法等产品。
可视化编排平台-FlyFish:
项目介绍:https://www.cloudwise.ai/flyFish.html
Github地址: https://github.com/CloudWise-OpenSource/FlyFish
Gitee地址: https://gitee.com/CloudWise/fly-fish
行业案例:https://www.bilibili.com/video/BV1z44y1n77Y/
部分大屏案例:
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhiaeigg
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22 -
excel打印预览压线压字怎么办
PHP中文网 06-22