• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

必备知识|彻底读懂高性能消息组件Apache Pulsar

武飞扬头像
云智慧AIOps社区
帮助2

     内容介绍

  • Pulsar介绍
  • Pulsar关键特性
  • Pulsar vs Kafka
  • Pulsar架构设计
  • Pulsar消息机制
  • Pulsar Schema
  • Pulsar Functions
  • Pulsar Connectors
  • Pulsar Deployment
  • Pulsar Admin
  • Pulsar Manager
  • Pulsar Flink
  • 更多福利

什么是Pulsar?

Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性。

Pulsar 的关键特性

  • Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
  • 极低的发布延迟和端到端延迟。
  • 可无缝扩展到超过 一百万 个 topic。
  • 简单的客户端 API,支持 Java、Go、Python 和 C 。
  • 支持多种 topic 订阅模式(独占订阅、共享订阅、故障转移订阅)。
  • 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
  • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
  • 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如S3、GCS)中。

Pulsar vs Kafka

下方链接为Pulsar与 Kafka详细对比报告,可自行下载查看

https://streamnative.io/en/blog/tech/2020-07-08-pulsar-vs-kafka-part-1

https://streamnative.io/zh/blog/tech/2020-07-22-pulsar-vs-kafka-part-2

  • 性能与可用性

基准测试(StreamNative)

数据来源

https://mp.weixin.qq.com/s/UZJTOEpzX8foUJv9XMJxOw

https://streamnative.io/en/blog/tech/2020-11-09-benchmark-pulsar-kafka-performance

https://streamnative.io/whitepaper/benchmark-pulsar-vs-kafka

  • 吞吐量(Throughput)

学新通

在与 Kafka 的持久性保证相同的情况下,Pulsar 可达到 605 MB/s 的发布和端到端吞吐量(与 Kafka 相同)以及 3.5 GB/s 的 catch-up read 吞吐量(比 Kafka 高 3.5 倍)。Pulsar 的吞吐量不会因分区数量的增加和持久性级别的改变而受到影响,而 Kafka 的吞吐量会因分区数量或持久性级别的改变而受到严重影响。

  • 延迟性(Latency)

学新通

在不同的测试实例(包括不同订阅数量、不同主题数量和不同持久性保证)中,Pulsar 的延迟显著低于 Kafka。Pulsar P99 延迟在 5 到 15 毫秒之间。Kafka P99 延迟可能长达数秒,并且会因主题数量、订阅数量和不同持久性保证而受到巨大影响。

  • 功能性

  1. 多语言客户端(C/C 、Python、Java、Go ...)
  2. 管理工具(Pulsar Manager vs Kafka Manager)
  3. 内置流处理Built-In Stream Processing(Pulsar Function vs Kafka Streams)
  4. Rich Integrations (Pulsar Connectors)
  5. Exactly-Once Processing
  6. 日志压缩
  7. 多租户(Pulsar)
  8. 安全管理(Pulsar)

架构设计

Pulsar采用存储和计算分离的软件架构。在消息领域,Pulsar 是第一个将存储计算分离云原生架构落地的开源项目。由于在 Broker 层不存储任何数据,这种架构为用户带来了更高的可用性、更灵活的扩容和管理、避免数据的 reblance 和 catch-up。

在 Apache Pulsar 的分层架构中,服务层 Broker 和存储层 BookKeeper 的每个节点都是对等的。Broker 仅仅负责消息的服务支持,不存储数据。这为服务层和存储层提供了瞬时的节点扩展和无缝的失效恢复。

学新通

持久化存储(Persistent storage)

Pulsar 使用 BookKeeper 分布式日志存储数据库作为存储组件,在底层使用日志作为存储模型。

Pulsar 将所有未确认消息(即未处理消息)存储在 BookKeeper 中的多个“bookie”服务器上。

BookKeeper 通过 Quorum Vote 的方式来实现数据的一致性,跟 Master/Slave 模式不同,BookKeeper 中每个节点也是对等的,对一份数据会并发地同时写入指定数目的存储节点。

学新通

一个Topic实际上是一个ledgers流。Ledger本身就是一个日志。所以一系列的子日志(Ledgers)组成了一个父日志(Topic)。

Ledgers追加到一个Topic,条目(消息或者一组消息)追加到Ledgers。Ledger一旦关闭是不可变的。Ledger作为最小的删除单元,也就是说我们不能删除单个条目而是去删除整个Ledger。

Ledgers本身也被分解为多个Fragment。Fragment是BookKeeper集群中最小的分布单元。

每个Ledger(由一个或多个Fragment组成)可以跨多个BookKeeper节点(Bookies)进行复制,以实现数据容灾和提升读取性能。每个Fragment都在一组不同的Bookies中复制(存在足够的Bookies)。

conf/bookkeeper.conf

  1.  
    #############################################################################
  2.  
    ## Server parameters
  3.  
    #############################################################################
  4.  
    # Directories BookKeeper outputs its write ahead log.
  5.  
    # Could define multi directories to store write head logs, separated by ','.
  6.  
    journalDirectories=/data/appData/pulsar/bookkeeper/journal
  7.  
     
  8.  
    #############################################################################
  9.  
    ## Ledger storage settings
  10.  
    #############################################################################
  11.  
    # Directory Bookkeeper outputs ledger snapshots
  12.  
    # could define multi directories to store snapshots, separated by ','
  13.  
    ledgerDirectories=/data/appData/pulsar/bookkeeper/ledgers

conf/broker.conf

  1.  
    ### --- Managed Ledger --- ###
  2.  
     
  3.  
    # Number of bookies to use when creating a ledger
  4.  
    managedLedgerDefaultEnsembleSize=2
  5.  
     
  6.  
    # Number of copies to store for each message
  7.  
    managedLedgerDefaultWriteQuorum=2
  8.  
     
  9.  
    # Number of guaranteed copies (acks to wait before write is complete)
  10.  
    managedLedgerDefaultAckQuorum=2

元数据存储(Metadata storage)

Pulsar和BookKeeper都使用Apache Zookeeper来存储元数据和监控节点健康状况。

  1.  
    $ $PULSAR_HOME/bin/pulsar zookeeper-shell
  2.  
    > ls /
  3.  
    [admin, bookies, counters, ledgers, loadbalance, managed-ledgers, namespace, pulsar, schemas, stream, zookeeper]

消息机制

Pulsar 采用发布-订阅(pub-sub)的设计模式 。 该设计模式中,producer 发布消息到 topic, Consumer 订阅 topic、处理发布的消息,并在处理完成后发送确认。

一旦创建订阅,即使 consumer 断开连接,Pulsar 仍然可以保存所有消息。 在 consumer 确认消息已处理成功后,才会删除消息。

主题(Topic)

逻辑上一个Topic是日志结构,每个消息都在这个日志结构中有一个偏移量。Apache Pulsar使用游标来跟踪偏移量(Cursor Tracking)。

Pulsar 支持两种基本的 topic 类型:持久 topic 与非持久 topic。

{persistent|non-persistent}://tenant/namespace/topic
  • Non-Partitioned topics
  1.  
  2.  
    $ $PULSAR_HOME/bin/pulsar-admin topics \
  3.  
    list public/default
  4.  
     
  5.  
    $ $PULSAR_HOME/bin/pulsar-admin topics \
  6.  
    create persistent://public/default/input-seed-avro-topic
  7.  
     
  8.  
    $ $PULSAR_HOME/bin/pulsar-admin topics \
  9.  
    lookup persistent://public/default/input-seed-avro-topic
  10.  
     
  11.  
    $ $PULSAR_HOME/bin/pulsar-admin topics \
  12.  
    delete persistent://public/default/input-seed-avro-topic
  13.  
     
  14.  
    $ $PULSAR_HOME/bin/pulsar-admin topics \
  15.  
    stats persistent://public/default/input-seed-avro-topic
  16.  
     
  17.  
    $ curl http://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats | python -m json.tool
学新通

Partitioned topics

  1.  
  2.  
    $ $PULSAR_HOME/bin/pulsar-admin topics \
  3.  
    create-partitioned-topic persistent://public/default/output-seed-avro-topic \
  4.  
    --partitions 2
  5.  
     
  6.  
    $ $PULSAR_HOME/bin/pulsar-admin topics \
  7.  
    list-partitioned-topics public/default
  8.  
     
  9.  
    $ $PULSAR_HOME/bin/pulsar-admin topics \
  10.  
    get-partitioned-topic-metadata persistent://public/default/output-seed-avro-topic
  11.  
     
  12.  
    $ $PULSAR_HOME/bin/pulsar-admin topics \
  13.  
    delete-partitioned-topic persistent://public/default/output-seed-avro-topic

消息(Message)

Messages are the basic "unit" of Pulsar.

  1.  
    public interface Message<T> {
  2.  
     
  3.  
    Map<String, String> getProperties();
  4.  
     
  5.  
    boolean hasProperty(String var1);
  6.  
     
  7.  
    String getProperty(String var1);
  8.  
     
  9.  
    byte[] getData();
  10.  
     
  11.  
    T getValue();
  12.  
     
  13.  
    MessageId getMessageId();
  14.  
     
  15.  
    long getPublishTime();
  16.  
     
  17.  
    long getEventTime();
  18.  
     
  19.  
    long getSequenceId();
  20.  
     
  21.  
    String getProducerName();
  22.  
     
  23.  
    boolean hasKey();
  24.  
     
  25.  
    String getKey();
  26.  
     
  27.  
    boolean hasBase64EncodedKey();
  28.  
     
  29.  
    byte[] getKeyBytes();
  30.  
     
  31.  
    boolean hasOrderingKey();
  32.  
     
  33.  
    byte[] getOrderingKey();
  34.  
     
  35.  
    String getTopicName();
  36.  
     
  37.  
    Optional<EncryptionContext> getEncryptionCtx();
  38.  
     
  39.  
    int getRedeliveryCount();
  40.  
     
  41.  
    byte[] getSchemaVersion();
  42.  
     
  43.  
    boolean isReplicated();
  44.  
     
  45.  
    String getReplicatedFrom();
  46.  
    }
学新通

生产者(Producer)

  1.  
    public void send() throws PulsarClientException {
  2.  
    final String serviceUrl = "pulsar://server-100:6650";
  3.  
    // final String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650";
  4.  
     
  5.  
    // http://pulsar.apache.org/docs/en/client-libraries-java/#client
  6.  
    final PulsarClient client = PulsarClient.builder()
  7.  
    .serviceUrl(serviceUrl)
  8.  
    .connectionTimeout(10000, TimeUnit.MILLISECONDS)
  9.  
    .build();
  10.  
     
  11.  
    final String topic = "persistent://public/default/topic-sensor-temp";
  12.  
    // http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
  13.  
    final Producer<byte[]> producer = client.newProducer()
  14.  
    .producerName("sensor-temp")
  15.  
    .topic(topic)
  16.  
     
  17.  
    .compressionType(CompressionType.LZ4)
  18.  
     
  19.  
    .enableChunking(true)
  20.  
     
  21.  
    .enableBatching(true)
  22.  
    .batchingMaxBytes(1024)
  23.  
    .batchingMaxMessages(10)
  24.  
    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
  25.  
     
  26.  
    .blockIfQueueFull(true)
  27.  
    .maxPendingMessages(512)
  28.  
     
  29.  
    .sendTimeout(1, TimeUnit.SECONDS)
  30.  
    .create();
  31.  
     
  32.  
    MessageId mid = producer.send("sensor-temp".getBytes());
  33.  
    System.out.printf("\nmessage with ID %s successfully sent", mid);
  34.  
     
  35.  
    mid = producer.newMessage()
  36.  
    .key("sensor-temp-key")
  37.  
    .value("sensor-temp-key".getBytes())
  38.  
    .property("my-key", "my-value")
  39.  
    .property("my-other-key", "my-other-value")
  40.  
    .send();
  41.  
    System.out.printf("message-key with ID %s successfully sent", mid);
  42.  
     
  43.  
    producer.close();
  44.  
    client.close();
  45.  
    }
  46.  
     
  47.  
学新通

消费者(Consumer)

  1.  
    public void consume() throws PulsarClientException {
  2.  
    final String serviceUrl = "pulsar://server-101:6650";
  3.  
    final String topic = "input-seed-avro-topic";
  4.  
     
  5.  
    final PulsarClient client = PulsarClient.builder()
  6.  
    .serviceUrl(serviceUrl)
  7.  
    .enableTcpNoDelay(true)
  8.  
    .build();
  9.  
     
  10.  
    final Consumer<byte[]> consumer = client
  11.  
    .newConsumer()
  12.  
    .consumerName("seed-avro-consumer")
  13.  
    .subscriptionName("seed-avro-subscription")
  14.  
    .subscriptionType(SubscriptionType.Exclusive)
  15.  
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
  16.  
    .topic(topic)
  17.  
    .receiverQueueSize(10)
  18.  
    .subscribe();
  19.  
     
  20.  
    final AvroSchema<SeedEvent> schema = AvroSchema.of(SeedEvent.class);
  21.  
    while (true) {
  22.  
    try {
  23.  
    final Message<byte[]> msg = consumer.receive();
  24.  
    LOG.info("接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}",
  25.  
    Thread.currentThread().getId(),
  26.  
    msg.getTopicName(),
  27.  
    msg.getMessageId(),
  28.  
    msg.getSequenceId(),
  29.  
    msg.getEventTime(),
  30.  
    msg.getPublishTime(),
  31.  
    msg.getProducerName(),
  32.  
    msg.getKey(), schema.decode(msg.getValue()));
  33.  
    try {
  34.  
    consumer.acknowledge(msg);
  35.  
    } catch (final PulsarClientException e) {
  36.  
    consumer.negativeAcknowledge(msg);
  37.  
    LOG.error("acknowledge:" e.getLocalizedMessage(), e);
  38.  
    }
  39.  
    } catch (final PulsarClientException e) {
  40.  
    LOG.error("receive:" e.getLocalizedMessage(), e);
  41.  
    }
  42.  
    }
  43.  
    }
学新通

订阅(Subscriptions)

消费者通过订阅来消费Topic中的消息。订阅是游标(跟踪偏移量)的逻辑实体,一个Topic可以添加多个订阅。订阅不包含消息的数据,只包含元数据和游标。

每个Subscription都存储一个Cursor。Cursor是日志中的当前偏移量。Subscription将其Cursor存储至BookKeeper的Ledger中。这使Cursor跟踪可以像Topic一样进行扩展。

订阅类型(subscription-type)

  • Exclusive 独享

学新通

一个订阅只能有一个消息者消费消息。

学新通

  • Failover 灾备

一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。

学新通

  • Shared 共享

一个订阅中同时可以有多个消费者,多个消费者共享Topic中的消息。

学新通

  • Key_Shared

学新通

有序性保证(Ordering guarantee)

如果对顺序性有要求,可以使用 Exclusive 和 Failover 的订阅模式,这样同一个 Topic 只有一个 Consumer 在消费,可以保证顺序性。

如果使用 Shared 订阅模式,多个 Consumer 可以并发消费同一个 Topic。通过动态增加 Consumer 的数量,可以加速 Topic 的消费,减少消息在服务端的堆积。

KeyShared 模式保证在 Shared 模式下同一个 Key 的消息也会发送到同一个 Consumer,在并发的同时也保证了顺序性。

多主题订阅(Multi-topic subscriptions)

Pattern:

  • persistent://public/default/.*
  • persistent://public/default/foo.*

Reader

  1.  
    public void read() throws IOException {
  2.  
    final String serviceUrl = "pulsar://server-101:6650";
  3.  
    final PulsarClient client = PulsarClient.builder()
  4.  
    .serviceUrl(serviceUrl)
  5.  
    .build();
  6.  
     
  7.  
    // http://pulsar.apache.org/docs/en/client-libraries-java/#reader
  8.  
    final Reader<byte[]> reader = client.newReader()
  9.  
    .topic("my-topic")
  10.  
    .startMessageId(MessageId.earliest()) // MessageId.latest
  11.  
    .create();
  12.  
     
  13.  
    while (true) {
  14.  
    final Message<byte[]> message = reader.readNext();
  15.  
    System.out.println(new String(message.getData()));
  16.  
    }
  17.  
     
  18.  
    }
学新通

学新通

分片主题(Partitioned topics)

学新通

消息保留和过期(Message retention and expiry)

如果没有对Topic设置数据保留策略,一旦一个Topic的所有订阅的游标都已经成功消费到一个偏移量时,此偏移量前面的消息就会被自动删除。

如果Topic设置了数据保留策略,已经消费确认的消息超过保留策略阈值(Topic的消息存储大小、Topic中消息保留的时间)后会被删除。

学新通

conf/broker.conf

  1.  
    # Default message retention time
  2.  
    # 默认0, 修改为3天=60*24*3
  3.  
    defaultRetentionTimeInMinutes=4320
  4.  
     
  5.  
    # Default retention size
  6.  
    # 默认为0, 修改为10G
  7.  
    defaultRetentionSizeInMB=10240
  8.  
     
  9.  
    # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
  10.  
    ttlDurationDefaultInSeconds=0

retention policy (for a namespace)

  1.  
    $ $PULSAR_HOME/bin/pulsar-admin namespaces \
  2.  
    get-retention public/default
  3.  
     
  4.  
    $ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/retention | python -m json.tool
  5.  
     
  6.  
    $ $PULSAR_HOME/bin/pulsar-admin namespaces \
  7.  
    set-retention public/default \
  8.  
    --size 1024M \
  9.  
    --time 5m
  10.  
     
  11.  
    $ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/retention \
  12.  
    --header "Content-Type:application/json" \
  13.  
    --data '{
  14.  
    "retentionTimeInMinutes" : 5,
  15.  
    "retentionSizeInMB" : 1024
  16.  
    }'
学新通

message expiry / message-ttl

  1.  
    $ $PULSAR_HOME/bin/pulsar-admin namespaces \
  2.  
    get-message-ttl public/default
  3.  
     
  4.  
    $ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/messageTTL
  5.  
     
  6.  
    $ $PULSAR_HOME/bin/pulsar-admin namespaces \
  7.  
    set-message-ttl public/default \
  8.  
    --messageTTL 1800
  9.  
     
  10.  
    $ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/messageTTL \
  11.  
    --header "Content-Type:application/json" \
  12.  
    --data '1800'

Pulsar Schema

Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types to more complex application-specific types.

  • 类型安全(序列化和反序列化)
  • Schema 帮助 Pulsar 保留了数据在其他系统中原有的含义

Schema类型(Schema type)

  • Primitive type
  1.  
    Producer<String> producer = client.newProducer(Schema.STRING).create();
  2.  
    producer.newMessage().value("Hello Pulsar!").send();
  3.  
     
  4.  
    Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
  5.  
    consumer.receive();
  • Complex type

1. keyvalue key/value pair.

  1.  
    Schema<KeyValue<Integer, String>> schema = Schema.KeyValue(
  2.  
    Schema.INT32,
  3.  
    Schema.STRING,
  4.  
    KeyValueEncodingType.SEPARATED
  5.  
    );
  6.  
     
  7.  
    // Producer
  8.  
    Producer<KeyValue<Integer, String>> producer = client.newProducer(schema)
  9.  
    .topic(TOPIC)
  10.  
    .create();
  11.  
    final int key = 100;
  12.  
    final String value = "value-100";
  13.  
    producer.newMessage().value(new KeyValue<>(key, value)).send();
  14.  
     
  15.  
    // Consumer
  16.  
    Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(schema)
  17.  
    .topic(TOPIC).subscriptionName(SubscriptionName).subscribe();
  18.  
    Message<KeyValue<Integer, String>> msg = consumer.receive();
学新通

2.struct AVRO, JSON, and Protobuf.

  1.  
    Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
  2.  
    producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send();
  3.  
     
  4.  
    Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
  5.  
    User user = consumer.receive();

Schema工作方式(How does schema work)

Producer

学新通

Consumer

学新通

Schema管理(Schema manual management)

查询Schema

  1.  
    $ $PULSAR_HOME/bin/pulsar-admin schemas \
  2.  
    get persistent://public/default/spirit-avro-topic
  3.  
     
  4.  
    $ $PULSAR_HOME/bin/pulsar-admin schemas \
  5.  
    get persistent://public/default/spirit-avro-topic \
  6.  
    --version=2

更新Schema

  1.  
    $ $PULSAR_HOME/bin/pulsar-admin schemas upload \
  2.  
    persistent://public/default/test-topic \
  3.  
    --filename $PULSAR_HOME/connectors/json-schema.json

提取Schema

  1.  
    $ $PULSAR_HOME/bin/pulsar-admin schemas \
  2.  
    extract persistent://public/default/test-topic \
  3.  
    --classname com.cloudwise.modal.Packet \
  4.  
    --jar ~/cloudwise-pulsar-1.0.0-RELEASE.jar \
  5.  
    --type json
  6.  
     
  7.  
    public void schemaInfo() {
  8.  
    System.out.println("AvroSchema:" AvroSchema.of(SeedEvent.class).getSchemaInfo());
  9.  
    System.out.println("Schema.AVRO:" Schema.AVRO(SeedEvent.class).getSchemaInfo());
  10.  
    }

删除Schema

  1.  
    $ $PULSAR_HOME/bin/pulsar-admin schemas \
  2.  
    delete persistent://public/default/spirit-avro-topic

Pulsar Functions

编程模型(Programming model)

学新通

开启Functions

  • conf/bookkeeper.conf
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
  • conf/broker.conf
functionsWorkerEnabled=true
  • conf/functions_worker.yml 
  1.  
    pulsarFunctionsCluster: pulsar-cluster
  2.  
     
  3.  
    numFunctionPackageReplicas: 2

窗口(window)

  • windowLengthCount 每个窗口的消息数量
  • slidingIntervalCount 窗口滑动后的消息数量
  • windowLengthDurationMs 窗口时间
  • slidingIntervalDurationMs 窗口滑动后的时间

开窗函数

  1.  
    public class WordCountWindowFunction implements org.apache.pulsar.functions.api.WindowFunction<String, Void> {
  2.  
     
  3.  
    @Override
  4.  
    public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
  5.  
    for (Record<String> input : inputs) {
  6.  
    }
  7.  
    return null;
  8.  
    }
  9.  
     
  10.  
    }

运行函数

  • 时间,滑动窗口

--user-config '{"windowLengthDurationMs":"60000", "slidingIntervalDurationMs":"1000"}'

  • 时间,滚动窗口

--user-config '{"windowLengthDurationMs":"60000"}'

  • 数量,滑动窗口

--user-config '{"windowLengthCount":"100", "slidingIntervalCount":"10"}'

  • 数量,滚动窗口

--user-config '{"windowLengthCount":"100"}'

Java编程

pom.xml

  1.  
    <dependency>
  2.  
    <groupId>org.apache.pulsar</groupId>
  3.  
    <artifactId>pulsar-client</artifactId>
  4.  
    <version>${pulsar.version}</version>
  5.  
    </dependency>
  6.  
    <dependency>
  7.  
    <groupId>org.apache.pulsar</groupId>
  8.  
    <artifactId>pulsar-functions-api</artifactId>
  9.  
    <version>${pulsar.version}</version>
  10.  
    </dependency>
  11.  
    <dependency>
  12.  
    <groupId>org.apache.pulsar</groupId>
  13.  
    <artifactId>pulsar-functions-local-runner</artifactId>
  14.  
    <version>${pulsar.version}</version>
  15.  
    </dependency>
学新通
  1. WordCount
  1.  
    public class WordCountFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
  2.  
     
  3.  
    @Override
  4.  
    public Void process(String input, Context context) throws Exception {
  5.  
    Arrays.asList(input.split(" ")).forEach(word -> {
  6.  
    String counterKey = word.toLowerCase();
  7.  
    if (context.getCounter(counterKey) == 0) {
  8.  
    context.putState(counterKey, ByteBuffer.wrap(ByteUtils.from(100)));
  9.  
    }
  10.  
    context.incrCounter(counterKey, 1);
  11.  
    });
  12.  
    return null;
  13.  
    }
  14.  
     
  15.  
    }
学新通
  1.  
    $ $PULSAR_HOME/bin/pulsar-admin functions create \
  2.  
    --broker-service-url pulsar://server-101:6650 \
  3.  
    --jar target/cloudwise-pulsar-functions-with-dependencies.jar \
  4.  
    --classname com.cloudwise.quickstart.pulsar.functions.WordCountFunction \
  5.  
    --tenant public \
  6.  
    --namespace default \
  7.  
    --name word-count-function \
  8.  
    --inputs persistent://public/default/sentences \
  9.  
    --output persistent://public/default/wordcount
  1. 动态路由
  1.  
    /**
  2.  
    * 基本思路是检查每条消息的内容,根据消息内容将消息路由到不同目的地。
  3.  
    */
  4.  
    public class RoutingFunction implements org.apache.pulsar.functions.api.Function<String, String> {
  5.  
     
  6.  
    @Override
  7.  
    public String process(String input, Context context) throws Exception {
  8.  
    String regex = context.getUserConfigValue("regex").toString();
  9.  
    String matchedTopic = context.getUserConfigValue("matched-topic").toString();
  10.  
    String unmatchedTopic = context.getUserConfigValue("unmatched-topic").toString();
  11.  
     
  12.  
    Pattern pattern = Pattern.compile(regex);
  13.  
    Matcher matcher = pattern.matcher(input);
  14.  
    if (matcher.matches()) {
  15.  
    context.newOutputMessage(matchedTopic, Schema.STRING).value(input).send();
  16.  
    } else {
  17.  
    context.newOutputMessage(unmatchedTopic, Schema.STRING).value(input).send();
  18.  
    }
  19.  
    return null;
  20.  
    }
  21.  
     
  22.  
    }
学新通
  1. log-topic
  1.  
    public class LoggingFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
  2.  
     
  3.  
    @Override
  4.  
    public Void process(String s, Context context) throws Exception {
  5.  
    Logger LOG = context.getLogger();
  6.  
    String messageId = context.getFunctionId();
  7.  
    if (s.contains("danger")) {
  8.  
    LOG.warn("A warning was received in message {}", messageId);
  9.  
    } else {
  10.  
    LOG.info("Message {} received\nContent: {}", messageId, s);
  11.  
    }
  12.  
    return null;
  13.  
    }
  14.  
     
  15.  
    }
学新通
  1.  
    $ $PULSAR_HOME/bin/pulsar-admin functions create \
  2.  
    --jar cloudwise-pulsar-functions-1.0.0.jar \
  3.  
    --classname com.cloudwise.quickstart.pulsar.functions.LoggingFunction \
  4.  
    --log-topic persistent://public/default/logging-function-logs
  1. user-config
  1.  
    public class UserConfigFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
  2.  
     
  3.  
    @Override
  4.  
    public Void process(String s, Context context) throws Exception {
  5.  
    Logger log = context.getLogger();
  6.  
    Optional<Object> value = context.getUserConfigValue("word-of-the-day");
  7.  
    if (value.isPresent()) {
  8.  
    log.info("The word of the day is {}", value);
  9.  
    } else {
  10.  
    log.warn("No word of the day provided");
  11.  
    }
  12.  
    return null;
  13.  
    }
  14.  
     
  15.  
    }
学新通
  1.  
    $ $PULSAR_HOME/bin/pulsar-admin functions create \
  2.  
    --broker-service-url pulsar://server-101:6650 \
  3.  
    --jar target/cloudwise-pulsar-functions-with-dependencies.jar \
  4.  
    --classname com.cloudwise.quickstart.pulsar.functions.UserConfigFunction \
  5.  
    --tenant public \
  6.  
    --namespace default \
  7.  
    --name word-count-function \
  8.  
    --inputs persistent://public/default/userconfig \
  9.  
    --user-config '{"word-of-the-day":"verdure"}'

Pulsar Connectors

学新通

消息处理(Processing guarantee)

  • at-most-once
  • at-least-once
  • effectively-once

操作流程(JDBC sink)

  1. Add a configuration file.
  2. Create a schema.
  3. Upload a schema to a topic.
  4. Create a JDBC sink
  5. Stop a JDBC sink
  6. Restart a JDBC sink
  7. Update a JDBC sink

内建连接器(Built-in connector)

Source connector

  • Canal
  • File
  • Flume
  • Kafka
  • RabbitMQ

Sink connector

  • ElasticSearch/Solr
  • Flume
  • HBase
  • HDFS2/HDFS3
  • InfluxDB
  • JDBC ClickHouse/MariaDB/PostgreSQL
  • Kafka
  • MongoDB
  • RabbitMQ
  • Redis

ClickHouse Sink

  1. 创建表
  1.  
    CREATE DATABASE IF NOT EXISTS monitor;
  2.  
    CREATE TABLE IF NOT EXISTS monitor.pulsar_clickhouse_jdbc_sink
  3.  
    (
  4.  
    id UInt32,
  5.  
    name String
  6.  
    ) ENGINE = TinyLog;
  7.  
     
  8.  
    INSERT INTO monitor.pulsar_clickhouse_jdbc_sink (id, name)
  9.  
    VALUES (1, 'tmp');
  10.  
     
  11.  
    SELECT *
  12.  
    FROM monitor.pulsar_clickhouse_jdbc_sink;
  1. 创建配置
  1.  
    $ vi $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml
  2.  
     
  3.  
    {
  4.  
    "userName": "sysop",
  5.  
    "password": "123456",
  6.  
    "jdbcUrl": "jdbc:clickhouse://server-101:8123/monitor",
  7.  
    "tableName": "pulsar_clickhouse_jdbc_sink"
  8.  
    }
  1. 创建schema 
$ vi $PULSAR_HOME/connectors/json-schema.json
  1.  
    {
  2.  
    "name": "",
  3.  
    "schema": {
  4.  
    "type": "record",
  5.  
    "name": "SeedEvent",
  6.  
    "namespace": "com.cloudwise.quickstart.model",
  7.  
    "fields": [
  8.  
    {
  9.  
    "name": "id",
  10.  
    "type": [
  11.  
    "null",
  12.  
    "int"
  13.  
    ]
  14.  
    },
  15.  
    {
  16.  
    "name": "name",
  17.  
    "type": [
  18.  
    "null",
  19.  
    "string"
  20.  
    ]
  21.  
    }
  22.  
    ]
  23.  
    },
  24.  
    "type": "JSON",
  25.  
    "properties": {
  26.  
    "__alwaysAllowNull": "true",
  27.  
    "__jsr310ConversionEnabled": "false"
  28.  
    }
  29.  
    }
学新通
  1. 上传schema
  1.  
    $ $PULSAR_HOME/bin/pulsar-admin schemas upload \
  2.  
    pulsar-postgres-jdbc-sink-topic \
  3.  
    -f $PULSAR_HOME/connectors/json-schema.json
  1. 运行 
  1.  
    $ $PULSAR_HOME/bin/pulsar-admin sinks create \
  2.  
    --tenant public \
  3.  
    --namespace default \
  4.  
    --name pulsar-clickhouse-jdbc-sink \
  5.  
    --inputs pulsar-clickhouse-jdbc-sink-topic \
  6.  
    --sink-config-file $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml \
  7.  
    --archive $PULSAR_HOME/connectors/pulsar-io-jdbc-clickhouse-2.6.2.nar \
  8.  
    --processing-guarantees EFFECTIVELY_ONCE \
  9.  
    --parallelism 1

Pulsar Deployment

目录结构

/opt/pulsar-2.6.2

├── bin

│ ├── bookkeeper

│ ├── function-localrunner

│ ├── proto

│ ├── pulsar

│ ├── pulsar-admin

│ ├── pulsar-admin-common.sh

│ ├── pulsar-client

│ ├── pulsar-daemon

│ ├── pulsar-managed-ledger-admin

│ └── pulsar-perf

├── conf

│ ├── bkenv.sh

│ ├── bookkeeper.conf

│ ├── broker.conf

│ ├── client.conf

│ ├── discovery.conf

│ ├── filesystem_offload_core_site.xml

│ ├── functions-logging

│ ├── functions_worker.yml

│ ├── global_zookeeper.conf

│ ├── log4j2-scripts

│ ├── log4j2.yaml

│ ├── presto

│ ├── proxy.conf

│ ├── pulsar_env.sh

│ ├── pulsar_tools_env.sh

│ ├── schema_example.conf

│ ├── standalone.conf

│ ├── websocket.conf

│ └── zookeeper.conf

├── examples

│ ├── api-examples.jar

│ ├── example-function-config.yaml

│ ├── example-window-function-config.yaml

│ └── python-examples

├── instances

│ ├── deps

│ ├── java-instance.jar

│ └── python-instance

├── lib

│ └── presto

├── LICENSE

├── licenses

├── NOTICE

└── README

单机(Standalone)

  1.  
    # 前台启动
  2.  
    $ $PULSAR_HOME/bin/pulsar standalone
  3.  
     
  4.  
    # 后台启动
  5.  
    $ $PULSAR_HOME/bin/pulsar-daemon start standalone
  6.  
    $ jps | grep -v Jps
  7.  
    1873 PulsarStandaloneStarter
  8.  
     
  9.  
    # 后台停止
  10.  
    $ $PULSAR_HOME/bin/pulsar-daemon stop standalone -force

集群(Cluster)

  1. 部署ZooKeeper集群
  2. 初始化集群元信息
  3. 部署BookKeeper集群
  4. 部署一个或多个PulsarBroker

客户端(Client)

  1.  
    # consumer
  2.  
    $ $PULSAR_HOME/bin/pulsar-client consume \
  3.  
    persistent://public/default/seed-avro-topic \
  4.  
    --subscription-name cli-pack-avro-subscription \
  5.  
    --subscription-type Exclusive \
  6.  
    --subscription-position Latest \
  7.  
    --num-messages 0
  8.  
     
  9.  
    # producer
  10.  
    $ $PULSAR_HOME/bin/pulsar-client produce \
  11.  
    persistent://public/default/seed-avro-topic \
  12.  
    --num-produce 100 \
  13.  
    --messages "Hello Pulsar" \
  14.  
    --separator ","

Pulsar Admin

API

  • pulsar-admin
  • REST API

源码:apache-pulsar-2.6.2-src/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/*.java

Bookies.java Namespaces.java Tenants.java

BrokerStats.java NonPersistentTopics.java Worker.java

Brokers.java PersistentTopics.java WorkerStats.java

Clusters.java ResourceQuotas.java

Functions.java SchemasResource.java

  • Java admin client
  1.  
    public void createNonPartitionedTopic() throws PulsarClientException {
  2.  
    final String serviceHttpUrl = "http://10.2.2.26:8080";
  3.  
     
  4.  
    final PulsarAdmin admin = PulsarAdmin.builder()
  5.  
    .serviceHttpUrl(serviceHttpUrl)
  6.  
    .build();
  7.  
    try {
  8.  
    final String namespace = "public/monitor";
  9.  
     
  10.  
    List<String> topics = admin.topics().getList(namespace);
  11.  
    topics.forEach(t -> System.err.println("before topic:" t));
  12.  
     
  13.  
    // 以下几种写法是等效的
  14.  
    // final String topic = "input-3-seed-avro-topic";
  15.  
    // final String topic = "public/monitor/input-seed-avro-topic";
  16.  
    final String topic = "persistent://public/default/input-5-seed-avro-topic";
  17.  
    if (topics.indexOf(topic) == -1) {
  18.  
    admin.topics().createNonPartitionedTopic(topic);
  19.  
    admin.schemas().createSchema(topic,
  20.  
    AvroSchema.of(SeedEvent.class).getSchemaInfo());
  21.  
    }
  22.  
     
  23.  
    topics = admin.topics().getList(namespace);
  24.  
    topics.forEach(t -> System.err.println("after topic:" t));
  25.  
     
  26.  
    System.err.println("schema:" admin.schemas().getSchemaInfo(topic));
  27.  
    } catch (final PulsarAdminException e) {
  28.  
    e.printStackTrace();
  29.  
    }
  30.  
     
  31.  
    admin.close();
  32.  
    }
学新通

Manage Pulsar

  • Clusters 
$ $PULSAR_HOME/bin/pulsar-admin clusters
  • Tenants
$ $PULSAR_HOME/bin/pulsar-admin tenants
  • Brokers
$ $PULSAR_HOME/bin/pulsar-admin brokers
  • Namespaces
$ $PULSAR_HOME/bin/pulsar-admin namespaces
  • Permissions
  • Persistent topics
  • Non-Persistent topics
  • Partitioned topics
  • Non-Partitioned topics
$ $PULSAR_HOME/bin/pulsar-admin topics
  • Schemas
$ $PULSAR_HOME/bin/pulsar-admin schemas
  • Functions 
$ $PULSAR_HOME/bin/pulsar-admin functions

Pulsar Manager

http://pulsar.apache.org/docs/zh-CN/administration-pulsar-manager/

https://github.com/apache/pulsar-manager

WebUI

http://localhost:7750/ui/index.html

username/password: admin/123456

  • Environments

学新通

  • Management

学新通

  • Clusters 

学新通

  •  
  •  
  •  
  • Tenants

学新通

  • Namespaces

学新通

  • Topics

学新通

学新通

  • Tokents

Pulsar Flink

https://github.com/streamnative/pulsar-flink

https://dl.bintray.com/streamnative/maven/io/streamnative/connectors/

pom.xml 

  1.  
    <properties>
  2.  
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3.  
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  4.  
    <java.version>1.8</java.version>
  5.  
    <flink.version>1.11.2</flink.version>
  6.  
    <scala.binary.version>2.12</scala.binary.version>
  7.  
    <maven.compiler.source>${java.version}</maven.compiler.source>
  8.  
    <maven.compiler.target>${java.version}</maven.compiler.target>
  9.  
    </properties>
  10.  
     
  11.  
    <repositories>
  12.  
    <repository>
  13.  
    <id>central</id>
  14.  
    <layout>default</layout>
  15.  
    <url>https://repo1.maven.org/maven2</url>
  16.  
    </repository>
  17.  
    <repository>
  18.  
    <id>bintray</id>
  19.  
    <name>bintray</name>
  20.  
    <url>https://dl.bintray.com/streamnative/maven</url>
  21.  
    </repository>
  22.  
    </repositories>
  23.  
     
  24.  
    <dependencies>
  25.  
    <dependency>
  26.  
    <groupId>org.apache.flink</groupId>
  27.  
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
  28.  
    <version>${flink.version}</version>
  29.  
    <scope>provided</scope>
  30.  
    </dependency>
  31.  
    <dependency>
  32.  
    <groupId>org.apache.flink</groupId>
  33.  
    <artifactId>flink-java</artifactId>
  34.  
    <version>${flink.version}</version>
  35.  
    <scope>provided</scope>
  36.  
    </dependency>
  37.  
    <dependency>
  38.  
    <groupId>org.apache.flink</groupId>
  39.  
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
  40.  
    <version>${flink.version}</version>
  41.  
    <scope>provided</scope>
  42.  
    </dependency>
  43.  
    <dependency>
  44.  
    <groupId>org.apache.flink</groupId>
  45.  
    <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
  46.  
    <version>${flink.version}</version>
  47.  
    <scope>provided</scope>
  48.  
    </dependency>
  49.  
     
  50.  
    <!--statebackend -->
  51.  
    <dependency>
  52.  
    <groupId>org.apache.flink</groupId>
  53.  
    <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
  54.  
    <version>${flink.version}</version>
  55.  
    <scope>provided</scope>
  56.  
    </dependency>
  57.  
     
  58.  
    <!--pulsar -->
  59.  
    <dependency>
  60.  
    <groupId>io.streamnative.connectors</groupId>
  61.  
    <artifactId>pulsar-flink-${scala.binary.version}-${flink.version}</artifactId>
  62.  
    <version>2.5.4.1</version>
  63.  
    </dependency>
  64.  
     
  65.  
    <!-- format -->
  66.  
    <dependency>
  67.  
    <groupId>org.apache.flink</groupId>
  68.  
    <artifactId>flink-avro</artifactId>
  69.  
    <version>${flink.version}</version>
  70.  
    </dependency>
  71.  
    <dependency>
  72.  
    <groupId>org.apache.flink</groupId>
  73.  
    <artifactId>flink-json</artifactId>
  74.  
    <version>${flink.version}</version>
  75.  
    </dependency><!-- -->
  76.  
     
  77.  
    <dependency>
  78.  
    <groupId>org.slf4j</groupId>
  79.  
    <artifactId>slf4j-log4j12</artifactId>
  80.  
    <version>1.7.7</version>
  81.  
    <scope>runtime</scope>
  82.  
    </dependency>
  83.  
    <dependency>
  84.  
    <groupId>log4j</groupId>
  85.  
    <artifactId>log4j</artifactId>
  86.  
    <version>1.2.17</version>
  87.  
    <scope>runtime</scope>
  88.  
    </dependency>
  89.  
     
  90.  
    <dependency>
  91.  
    <groupId>junit</groupId>
  92.  
    <artifactId>junit</artifactId>
  93.  
    <version>4.12</version>
  94.  
    <scope>test</scope>
  95.  
    </dependency>
  96.  
    </dependencies>
学新通

FlinkPulsarSink

  1.  
    public class PulsarSinkJob {
  2.  
     
  3.  
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkJob.class);
  4.  
     
  5.  
    public static SourceFunction<SeedEvent> getSeedSource() {
  6.  
    final int interval = 5000;
  7.  
    return new PeriodicEventSource<>(
  8.  
    Integer.MAX_VALUE, interval, new PeriodicEventSource.Creator<SeedEvent>() {
  9.  
     
  10.  
    private static final long serialVersionUID = 1L;
  11.  
     
  12.  
    @Override
  13.  
    public Collection<SeedEvent> build(long i) {
  14.  
    return Arrays.stream(new String[]{"TEM-A-01", "HUM-A-01", "PRS-A-01"})
  15.  
    .map(code -> {
  16.  
    final SeedEvent event = new SeedEvent(
  17.  
    Instant.now().toEpochMilli(), code, Long.toString(i));
  18.  
    LOG.info("创建消息:[{}] {}", Thread.currentThread().getId(), event);
  19.  
    return event;
  20.  
    })
  21.  
    .collect(Collectors.toList());
  22.  
    }
  23.  
     
  24.  
    @Override
  25.  
    public Class<SeedEvent> clazz() {
  26.  
    return SeedEvent.class;
  27.  
    }
  28.  
     
  29.  
    });
  30.  
    }
  31.  
     
  32.  
    public static FlinkPulsarSink<SeedEvent> getPulsarSink(ParameterTool params) {
  33.  
    // String adminUrl = "http://server-101:8080,server-102:8080,server-103:8080";
  34.  
    final String serviceUrl = params.get("serviceUrl", "pulsar://10.2.2.26:6650");
  35.  
    final String adminUrl = params.get("adminUrl", "http://10.2.2.26:8080");
  36.  
     
  37.  
    final String outputTopic = params.get("topic", "output-seed-avro-topic");
  38.  
     
  39.  
    final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
  40.  
    final String authParams = params.get("authParams");
  41.  
     
  42.  
    final Properties props = new Properties();
  43.  
    props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "true");
  44.  
    props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000");
  45.  
     
  46.  
    final ClientConfigurationData clientConf = new ClientConfigurationData();
  47.  
    clientConf.setServiceUrl(serviceUrl);
  48.  
    clientConf.setConnectionTimeoutMs(6000);
  49.  
    clientConf.setUseTcpNoDelay(true);
  50.  
    if (!StringUtils.isNullOrWhitespaceOnly(authParams)) {
  51.  
    clientConf.setUseTls(true);
  52.  
    clientConf.setAuthPluginClassName(authPlugin);
  53.  
    clientConf.setAuthParams(authParams);
  54.  
    }
  55.  
     
  56.  
    final TopicKeyExtractor<SeedEvent> topicKeyExtractor = new TopicKeyExtractor<SeedEvent>() {
  57.  
     
  58.  
    private static final long serialVersionUID = 1L;
  59.  
     
  60.  
    @Override
  61.  
    public byte[] serializeKey(SeedEvent element) {
  62.  
    LOG.info("serializeKey:[{}] {}", Thread.currentThread().getId(), element);
  63.  
    return element.getCode().getBytes();
  64.  
    }
  65.  
     
  66.  
    @Override
  67.  
    public String getTopic(SeedEvent element) {
  68.  
    return null;
  69.  
    }
  70.  
    };
  71.  
     
  72.  
    final FlinkPulsarSink<SeedEvent> sink = new FlinkPulsarSink<>(
  73.  
    adminUrl, Optional.of(outputTopic), clientConf, props, topicKeyExtractor, SeedEvent.class);
  74.  
     
  75.  
    return sink;
  76.  
    }
  77.  
     
  78.  
    @SuppressWarnings("deprecation")
  79.  
    public static void main(String[] args) {
  80.  
    final ParameterTool params = ParameterTool.fromArgs(args);
  81.  
     
  82.  
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  83.  
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  84.  
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
  85.  
    env.setStateBackend(new RocksDBStateBackend(new FsStateBackend("file:///tmp/checkpoint/")));
  86.  
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
  87.  
    // Job取消和故障时会保留Checkpoint数据, 以便根据实际需要恢复到指定的Checkpoint
  88.  
    env.getCheckpointConfig().enableExternalizedCheckpoints(
  89.  
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  90.  
    // 确保Checkpoint之间有至少500ms的间隔(Checkpoint最小间隔)
  91.  
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  92.  
    // 检查点必须在一分钟内完成, 否则被丢弃(Checkpoint的超时时间)
  93.  
    env.getCheckpointConfig().setCheckpointTimeout(60000);
  94.  
    // 同一时间只允许进行一个检查点
  95.  
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  96.  
    env.getConfig().setGlobalJobParameters(params);
  97.  
     
  98.  
    // DataStream<SeedEvent> stream = env.fromCollection(getSeedEvents()).name("Collection");
  99.  
    final DataStream<SeedEvent> stream = env.addSource(getSeedSource()).name("SourceFunction");
  100.  
     
  101.  
    final DataStream<SeedEvent> result = stream
  102.  
    .keyBy(new KeySelector<SeedEvent, String>() {
  103.  
     
  104.  
    private static final long serialVersionUID = 1L;
  105.  
     
  106.  
    @Override
  107.  
    public String getKey(SeedEvent value) throws Exception {
  108.  
    return value.getCode();
  109.  
    }
  110.  
     
  111.  
    })
  112.  
    .process(new KeyedProcessFunction<String, SeedEvent, SeedEvent>() {
  113.  
     
  114.  
    private static final long serialVersionUID = 1L;
  115.  
     
  116.  
    private Map<String, String> infos;
  117.  
    private transient ListState<String> state;
  118.  
     
  119.  
    @Override
  120.  
    public void open(Configuration parameters) throws Exception {
  121.  
    LOG.info("open...");
  122.  
    this.state = getRuntimeContext().getListState(
  123.  
    new ListStateDescriptor<>("state", String.class));
  124.  
     
  125.  
    this.infos = new HashMap<>();
  126.  
    this.infos.put("open", LocalDateTime.now().toString());
  127.  
    }
  128.  
     
  129.  
    @Override
  130.  
    public void close() throws Exception {
  131.  
    LOG.info("close...");
  132.  
    }
  133.  
     
  134.  
    @Override
  135.  
    public void processElement(SeedEvent value,
  136.  
    KeyedProcessFunction<String, SeedEvent, SeedEvent>.Context ctx, Collector<SeedEvent> out)
  137.  
    throws Exception {
  138.  
    LOG.info("processElement...");
  139.  
     
  140.  
    final StringBuffer buffer = new StringBuffer();
  141.  
    this.state.get().forEach(t -> buffer.append(t));
  142.  
    LOG.info("CurrentKey:{} Input:{} State:{} Infos:{}",
  143.  
    ctx.getCurrentKey(), value, buffer, this.infos);
  144.  
     
  145.  
    value.setPayload("[Prev]" value.getPayload());
  146.  
     
  147.  
    this.state.clear();
  148.  
    this.state.add(value.toString());
  149.  
     
  150.  
    out.collect(value);
  151.  
    }
  152.  
     
  153.  
    })
  154.  
    .setParallelism(1);
  155.  
     
  156.  
    result
  157.  
    .print()
  158.  
    .setParallelism(1);
  159.  
     
  160.  
    result
  161.  
    .addSink(getPulsarSink(params))
  162.  
    .name("FlinkPulsarSink")
  163.  
    .setParallelism(2);
  164.  
     
  165.  
    LOG.info("ExecutionPlan:{}", env.getExecutionPlan());
  166.  
     
  167.  
    try {
  168.  
    env.execute("PulsarSinkJob");
  169.  
    } catch (final Exception e) {
  170.  
    e.printStackTrace();
  171.  
    }
  172.  
    }
  173.  
     
  174.  
    }
学新通

FlinkPulsarSource

  1.  
    public class PulsarSourceJob {
  2.  
     
  3.  
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceJob.class);
  4.  
     
  5.  
    public static FlinkPulsarSource<SeedEvent> getPulsarSource(ParameterTool params) {
  6.  
    // String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650";
  7.  
    // String adminUrl = "http://server-101:8080,server-102:8080,server-103:8080";
  8.  
    // final String serviceUrl = params.get("serviceUrl", "pulsar://server-101:6650");
  9.  
    // final String adminUrl = params.get("adminUrl", "http://server-101:8080");
  10.  
    final String serviceUrl = params.get("serviceUrl", "pulsar://10.2.2.26:6650");
  11.  
    final String adminUrl = params.get("adminUrl", "http://10.2.2.26:8080");
  12.  
     
  13.  
    // final String inputTopic = params.get("topic", "input-1-seed-avro-topic");
  14.  
    // final String subscription = params.get("subscription", "seed-subscription");
  15.  
     
  16.  
    final String inputTopics = params.get("topic", "persistent://public/yang11/zlp.gjsjbz.gjbzcd3");
  17.  
    // final String inputTopics = params.get("topic", "public/monitor/input-0-seed-avro-topic");
  18.  
    final String subscription = params.get("subscription", "mutil-seed-subscription");
  19.  
     
  20.  
    // final String inputTopicPatten = params.get("topicPatten", "input-1-seed-avro-topic");
  21.  
    // final String subscription = params.get("subscription", "patten-seed-subscription");
  22.  
     
  23.  
    final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
  24.  
    final String authParams = params.get("authParams");
  25.  
     
  26.  
    final Properties props = new Properties();
  27.  
    // http://pulsar.apache.org/docs/en/client-libraries-java/#reader
  28.  
    props.setProperty(PulsarOptions.PULSAR_READER_OPTION_KEY_PREFIX "receiverQueueSize", "2000");
  29.  
     
  30.  
    // props.setProperty(PulsarOptions.TOPIC_SINGLE_OPTION_KEY, inputTopic);
  31.  
    props.setProperty(PulsarOptions.TOPIC_MULTI_OPTION_KEY, inputTopics);
  32.  
    // props.setProperty(PulsarOptions.TOPIC_PATTERN_OPTION_KEY, inputTopicPatten);
  33.  
     
  34.  
    // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getPartitionDiscoveryIntervalInMillis
  35.  
    props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000"); // 自动发现topic时间间隔,默认-1
  36.  
    // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getClientCacheSize
  37.  
    props.setProperty(PulsarOptions.CLIENT_CACHE_SIZE_OPTION_KEY, "5");
  38.  
    // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.flushOnCheckpoint
  39.  
    props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "true");
  40.  
    // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.failOnWrite
  41.  
    props.setProperty(PulsarOptions.FAIL_ON_WRITE_OPTION_KEY, "false");
  42.  
    // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getPollTimeoutMs
  43.  
    props.setProperty(PulsarOptions.POLL_TIMEOUT_MS_OPTION_KEY, "120000");
  44.  
    // org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher
  45.  
    props.setProperty(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, subscription);
  46.  
    // org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getCommitMaxRetries
  47.  
    props.setProperty(PulsarOptions.COMMIT_MAX_RETRIES, "3");
  48.  
    // org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.PulsarFetcher
  49.  
    props.setProperty(PulsarOptions.FAIL_ON_DATA_LOSS_OPTION_KEY, "false");
  50.  
     
  51.  
    final ClientConfigurationData clientConf = new ClientConfigurationData();
  52.  
    clientConf.setServiceUrl(serviceUrl);
  53.  
    clientConf.setConnectionTimeoutMs(6000);
  54.  
    if (!StringUtils.isNullOrWhitespaceOnly(authParams)) {
  55.  
    clientConf.setUseTls(true);
  56.  
    clientConf.setAuthPluginClassName(authPlugin);
  57.  
    clientConf.setAuthParams(authParams);
  58.  
    }
  59.  
     
  60.  
    PulsarDeserializationSchema<SeedEvent> deserializer = null;
  61.  
    deserializer = new PulsarDeserializationSchemaWrapper<>(AvroDeser.of(SeedEvent.class));
  62.  
    deserializer = new PulsarDeserializationSchema<SeedEvent>() {
  63.  
     
  64.  
    private static final long serialVersionUID = 1L;
  65.  
     
  66.  
    private final DeserializationSchema<SeedEvent> schema = AvroDeser.of(SeedEvent.class);
  67.  
     
  68.  
    public void open(DeserializationSchema.InitializationContext context) throws Exception {
  69.  
    this.schema.open(context);
  70.  
    }
  71.  
     
  72.  
    @Override
  73.  
    public TypeInformation<SeedEvent> getProducedType() {
  74.  
    return this.schema.getProducedType();
  75.  
    }
  76.  
     
  77.  
    @Override
  78.  
    public boolean isEndOfStream(SeedEvent nextElement) {
  79.  
    return this.schema.isEndOfStream(nextElement);
  80.  
    }
  81.  
     
  82.  
    @Override
  83.  
    public SeedEvent deserialize(@SuppressWarnings("rawtypes") Message message) throws IOException {
  84.  
    LOG.info("{}", new String(message.getData()));
  85.  
    final SeedEvent value = new SeedEvent();
  86.  
    // final SeedEvent value = this.schema.deserialize(message.getData());
  87.  
    LOG.info("接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}",
  88.  
    Thread.currentThread().getId(),
  89.  
    message.getTopicName(),
  90.  
    message.getMessageId(),
  91.  
    message.getSequenceId(),
  92.  
    message.getEventTime(),
  93.  
    message.getPublishTime(),
  94.  
    message.getProducerName(),
  95.  
    message.getKey(), value);
  96.  
    return value;
  97.  
    }
  98.  
     
  99.  
    };
  100.  
     
  101.  
    final FlinkPulsarSource<SeedEvent> source = new FlinkPulsarSource<>(
  102.  
    adminUrl, clientConf, deserializer, props);
  103.  
    source.setStartFromEarliest();
  104.  
    // source.setStartFromSubscription(subscription);
  105.  
    // source.setStartFromLatest();
  106.  
     
  107.  
    return source;
  108.  
    }
  109.  
     
  110.  
    @SuppressWarnings("deprecation")
  111.  
    public static void main(String[] args) {
  112.  
    final ParameterTool params = ParameterTool.fromArgs(args);
  113.  
     
  114.  
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  115.  
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
  116.  
    env.setStateBackend(new RocksDBStateBackend(new FsStateBackend("file:///tmp/checkpoint/")));
  117.  
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
  118.  
    // 必须开启Checkpoint, 才能从上一次未消费处开始消费, 否则从头开始消费
  119.  
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
  120.  
    env.getConfig().setGlobalJobParameters(params);
  121.  
    env.setParallelism(1);
  122.  
     
  123.  
    final DataStream<String> stream = env
  124.  
    .addSource(getPulsarSource(params))
  125.  
    .name("FlinkPulsarSource")
  126.  
    .uid("PulsarSource")
  127.  
    .setParallelism(1)
  128.  
    .map(new MapFunction<SeedEvent, String>() {
  129.  
     
  130.  
    private static final long serialVersionUID = 1L;
  131.  
     
  132.  
    @Override
  133.  
    public String map(SeedEvent value) throws Exception {
  134.  
    return "[SourceJob]" value;
  135.  
    }
  136.  
    });
  137.  
     
  138.  
    stream
  139.  
    .print()
  140.  
    .name("[Print]")
  141.  
    .uid("PrintSink")
  142.  
    .setParallelism(1);
  143.  
     
  144.  
    try {
  145.  
    env.execute("PulsarSourceJob");
  146.  
    } catch (final Exception e) {
  147.  
    e.printStackTrace();
  148.  
    }
  149.  
    }
  150.  
     
  151.  
    }
学新通

 写在最后

近年来,在AIOps领域快速发展的背景下,IT工具、平台能力、解决方案、AI场景及可用数据集的迫切需求在各行业迸发。基于此,云智慧在2021年8月发布了AIOps社区,旨在树起一面开源旗帜,为各行业客户、用户、研究者和开发者们构建活跃的用户及开发者社区,共同贡献及解决行业难题、促进该领域技术发展。

社区先后开源了数据可视化编排平台-FlyFish、运维管理平台OMP、云服务管理平台-摩尔平台、Hours算法等产品。

可视化编排平台-FlyFish:

项目介绍:https://www.cloudwise.ai/flyFish.html

Github地址: https://github.com/CloudWise-OpenSource/FlyFish

Gitee地址: https://gitee.com/CloudWise/fly-fish

行业案例:https://www.bilibili.com/video/BV1z44y1n77Y/

部分大屏案例:

学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiaeigg
系列文章
更多 icon
同类精品
更多 icon
继续加载