Topic 和订阅

更新时间:2025-01-24 09:06:05下载pdf

涂鸦物联网消息队列 是基于开源消息组件 Apache Pulsar 并适配涂鸦业务场景而深度定制的消息中间件,对于 Pulsar 的许多特性都有保留。
Topic 是涂鸦物联网消息队列下生产消费的主体,订阅等同于 Topic 下的一个消费组,同一个订阅下,一条消息只会被一个消费者消费确认一次。云项目下开启 消息订阅 时,系统会自动创建 Topic 和一个默认的订阅。

Pulsar 架构

Pulsar 集群至少由三部分组成:

  • 元数据存储和配置中心 ZK
  • 消息持久化数据存储 Bookie
  • 无状态代理组件 Broker
Topic 和订阅

Topic 和订阅基本格式

Topic 的基本格式

{persistent|non-persistent}://tenant/namespace/topic
持久化主题|非持久化主题://租户/命名空间/主题
  • 消息订阅 的 Topic 都是 持久化 Topic,即消息都会持久化到磁盘,直到被 ACK 确认或者过期。
  • tenant:默认为当前云项目的 accessId/clientId
  • namespace:默认为 out
  • topic:生产通道 Topic 是 event,也是分区 Topic,11 个分区。测试通道 Topic 是 event-test,为非分区 Topic。

订阅基本格式

tenant-suffix
  • tenant:默认为当前云项目的 accessId/clientId
  • suffix:订阅名的后缀,默认情况下是 sub

Topic Partitions

Pulsar Topic 分为分区 Topic 和非分区 Topic:

  • 分区 Topic:指一个 Topic 拥有多个分区。生产者推送消息时,根据业务 ID 如设备 ID 做 Hash,分发到不同的 Topic 分区,即相同的业务 ID 产生的消息会推送到相同的分区。
  • 非分区 Topic:也可以称为单分区 Topic,即只有一个分区的 Topic。一般用在需要消息全局有序的生产消费场景下,容易出现消费瓶颈。

Pulsar Topic 订阅

订阅模式

Pulsar 支持四种订阅模式:ExclusiveFailoverShared、和 Key_Shared

Topic 和订阅
  • Exclusive:同一个订阅下,只允许单个消费者连接到该订阅。如果多个消费者使用同一个订阅来订阅 Topic,则会发生错误。请注意,如果 Topic 是分区的,则所有分区都将由允许连接到订阅的单个消费者消费。

    适用于全局有序的消息,但如消费能力有限,不推荐此订阅方式。

  • Failover:在故障转移类型中,多个消费者可以添加到同一个订阅。但同一时间,对于非分区 Topic 或分区 Topic 的每个分区,只有一个主消费者能接收消息。当主消费者断开连接时,所有未确认和后续新消息将被传递给下一个消费者。
    对于分区 Topic,Broker 代理根据优先级和消费者名称的字典顺序对消费者进行排序。

    • 适用于局部有序消费的场景。
    • 涂鸦的物联网消息队列的使用场景都是同一个业务 ID 下消息有序,因此推荐 Failover 订阅模式。
    • 推荐搭配累积确认 acknowledgeCumulative 一起使用。acknowledgeCumulative 是一种批量确认机制,它意味着该消费者同时确认了该消息之前的所有未确认消息。

    消费者大于分区情况

    Topic 和订阅

    消费者小于分区情况

    Topic 和订阅
  • Shared:在该模式下,多个消费者可以添加到同一个订阅。消息以循环方式在消费者之间传递,任何给定的消息都只传递给一个消费者。当消费者断​​开连接时,所有发送给它但未确认的消息将重新安排发送给剩余的消费者。

    Shared 订阅模式不保证消息排序且不支持累积确认。

    ![Shared.png](https://images.tuyacn.com/content-platform/hestia/17367592783f589cf74e5.png"图片来自 Pulsar 官方文档")

  • Key_Shared:多个消费者可以添加到同一个订阅。消息在消费者之间分布传递,具有相同 Key 或相同排序 Key 的消息仅传递给一个消费者。无论消息重新传递多少次,它都会传递给同一个消费者。

    • Key_Shared 订阅类型保证同一个 Key 在任何给定时间都由单个消费者处理。
    • 当新消费者连接时,一些 Key 会从现有消费者到新消费者。与此同时,Broker 将记录当前消息读取位置比如 X 并将其与新消费者关联,只有当所有消息都已 ack 到当前读取位置 X 时,Broker 才会开始向新消费者发送消息。
    • 消费者可以通过开启 allowOutOfOrderDelivery 来禁用有序性,从而保证新消费者能立即消费消息。

    Topic 和订阅

    • Key_Shared 模式下不能使用累积确认。
    • 新消费者连接时,可能短期内无法收到消息,直到新消费者连接前下发的消息都被 ack。
    • Key_Shared 的实现机制较为复杂,可能会出现稳定性问题,不建议使用。消息订阅 的 Topic 都是多分区 Topic,推荐使用 Failover 模式。

订阅消费优化建议

消息订阅 生产通道都是分区 Topic,默认分区是 11。

  • 建议的订阅模式是 Failover,保证在单个分区里消息消费有序。
  • 基于 Failover 订阅模式,一个分区一个消费者即可,即最多启动 11 个消费者即可以达到最大消费速度。
  • 基于 Failover 订阅模式,推荐使用 acknowledgeCumulative 来确认消息,减少异常场景某些消息未 ACK 导致消费堆积。

订阅消费常见问题

为什么消费者有消费,但是监控显示堆积?

少量的消息堆积是正常现象,消息在生产者 -> Pulsar -> 消费者过程中,可能有少量的消息未确认,这部分未到消费者端或者消费者端未确认的消息会被统计为堆积。通常消息生产速度均匀的情况下,可能是毫秒到几秒的消息堆积。
大量的消息堆积可能是消费速度跟不上,在生产环境默认情况下,启动 11 个 Pulsar 消费者 可以达到最大消费速率。如果消费者已经超过 11 个,这时候可以通过添加日志,监控工具等观察本地业务处理逻辑耗时。发现瓶颈,修改消费代码。
如果消费速度和生产速度持平,但还是有堆积,可以尝试重启本地消费服务,或检查消费逻辑,是否有场景可能导致一些消息无法被 ACK。

我想用广播消费的方式,让所有的消费者都收到同一份消息,应该怎么做?

Pulsar 不支持广播类型的订阅消费模式,即无法用同一个订阅,让所有的消费者都收到相同的消息。如果需要启动多个消费者,且多个消费者都收到相同的消息。建议开发者单个订阅接收消息,消费者收到消息后,代码里自行实现分发逻辑。

启动消费者消费消息时,出现 “401/Failed to authenticate” 错误,应该怎么处理?

  • 请检查本地消费服务的 Pulsar 服务地址配置是否正确,消息订阅 目前对外开放了四个数据中心,请确认选择了正确的 Pulsar 服务地址。
  • 请打开 IoT 平台的当前云项目,检查 消息订阅 下的数据中心是否正确,请确认切换到了您要开通 Pulsar 服务的数据中心,并开启了 消息订阅
  • 请检查云项目是否开通了 IP 白名单 功能。如果开通了 IP 白名单,请确保消费者的 IP 已经被添加到对应数据中心。如果未开通,请确保消费者的 IP 和数据中心一致。