# Pulsar客户端如何控制内存使用

# 摘要

本文围绕一个常见的使用场景深入分析在高吞吐场景下,使用Pulsar客户端收发消息可能会遇到的若干问题。并以此为切入点,梳理一下Pulsar客户端在内存控制上所做的优化改进。

# 使用场景

假设这样一个常见的场景,一个搜索类业务需要记录用户搜索请求,以便后续分析搜索热点,以及有针对性的优化搜索效果等。于是,我们有下面这段逻辑,简化后如下:

PulsarClient pulsarClient = PulsarClient.builder()  
        .serviceUrl("pulsar://localhost:6650")  
        .build();  
Producer<byte[]> producer = pulsarClient.newProducer()  
        .topic("search-activities")  
        .create();
try {  
    MessageId messageId = producer.send(/* message payload here */);  
    log.debug("Search activity messageId={}", messageId);  
} catch (Exception e) {  
    log.error("Failed to record search activity", e);  
}

注意pulsarClientproducer均支持复用,并且推荐这么做,这里只是为了演示写到了一起。

producer.send是阻塞方式发送消息,换句话说就是线程会卡在这里等待发送结果返回。在现实中可以根据消息在实际业务中的需要选择阻塞非阻塞两种方式,例如这里我们的业务是在用户发起一次搜索请求时记录搜索请求和上下文信息,业务上对搜索请求事件并无强依赖,因此这里使用阻塞方式发消息就不太适合了,从性能上考虑会加长整体的搜索延迟,从稳定性上考虑会增加搜索执行过程中的不确定性,总的来说,要区分支线流程和主线流程,不应该把支线流程全部嵌套在主线流程中。

于是,我们可以优化一下,调整为非阻塞方式,将记录搜索事件放到其它线程中完成:

producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {  
    if (ex != null) {  
        log.error("Failed to record search activity", ex);  
    } else {  
        log.debug("Search activity messageId={}", msgId);  
    }
});

在现实中,若用户搜索的TPS较高,例如在单实例上可以超过1000QPS(高和低都是相对而言的,这里只是举个例子)。若恰好记录的搜索事件内容较多(例如包含了搜索请求的完整上下文和搜索结果等),序列化之后大小能达到100KB甚至1MB,那么上面代码在运行时你可以会遇到MemoryBufferIsFullError异常:

org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full
	at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:972)
	at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452)
	at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102)
	at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)

另外若服务本身与Pulsar的broker之间出现了网络波动,或者Pulsar服务内部组件之间出现网络波动,导致整体producer写入延迟升高,亦或是短时间出现大量写入,你还可能会遇到ProducerQueueIsFullError异常:

org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
	at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:965)
	at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:452)
	at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:343)
	at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:102)
	at org.apache.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:68)

# Producer的内存控制

下面我们对上面两种异常产生的原因作一下分析,我们先来看一下构建Producer时,ProducerBuilder (opens new window)中与内存使用有关的配置项:

/*
 * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
 */
ProducerBuilder<T> maxPendingMessages(int maxPendingMessages);

/*
 * Set the number of max pending messages across all partitions.
 */
ProducerBuilder<T> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);

maxPendingMessages用来控制producer内部队列中正在发送还没有接收到broker确认的消息数量,若队列大小超出了这个限制,默认的行为就是抛出ProducerQueueIsFullError异常,你可以通过修改另外一个配置blockIfQueueFull=true调整为阻塞等待队列中空出新的空间,这里还有另外需要注意的地方在下面会细说。

maxPendingMessages这个配置实际上是直接传递给底层各个分区的内部producer的,对于多分区的topic,实际处于pending状态的最大消息数量是maxPendingMessages乘以topic分区数量。由于maxPendingMessages结合可变的topic分区数量使得最终的pending消息数量变得不可控,因此还有另外一个优先级更高的配置maxPendingMessagesAcrossPartitions用来控制整个topic所有分区的总的一个pending消息数量,最终到各个分区内部producer取maxPendingMessagesmaxPendingMessagesAcrossPartitions / partitions的较小值。

然而,在现实应用场景中,不同业务的消息大小差异很大,单纯基于消息数量控制内存使用,开发者需要预估平均消息大小,这几乎不可能做到,因为消息的实际大小很可能会随着业务的变化而发生变化,因此在PIP-74 (opens new window)中,在构建PulsarClient时,ClientBuilder (opens new window)提供了一个面向整个client实例统一的内存限制配置:

/*
 * Configure a limit on the amount of memory that will be allocated by this client instance.
 *
 * Setting this to 0 will disable the limit.
 */
ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit);

当客户端所有producer中所有pending的消息大小总和超过这个限制时,默认则会抛出MemoryBufferIsFullError异常,若同时配置了blockIfQueueFull=true,则当前线程会阻塞等待前面pending的消息发送完成。

前面提到关于blockIfQueueFull配置的使用有一个细节需要注意,这个配置是为了限制客户端producer内存使用的同时,让开发者简化处理队列或者内存buffer满了的情况可以继续发送消息,例如在一个后台定时任务的场景中批量发送消息。然而这里需要强调的是blockIfQueueFull一旦配置为true,不论是应用发送消息调用的是阻塞的Producer.send方法还是非阻塞的Producer.sendAsync方法都会出现阻塞等待,”卡“住当前线程,那么对于我们上面的业务来说这是不可接受的,若由于支线流程(特殊情况容忍丢失的用户搜索事件)异常抖动,阻塞了主线流程(搜索主线程)就得不偿失了。

// 注意:若producer发送队列满或者内存buffer满,当前线程将卡在sendAsync方法调用
producer.sendAsync(/* message payload here */).whenComplete((msgId, ex) -> {  
    if (ex != null) {  
        log.error("Failed to record search activity", ex);  
    } else {  
        log.debug("Search activity messageId={}", msgId);  
    }
});

PIP-120 (opens new window)2.10.0以及之后版本的客户端中,默认启用了memoryLimit配置,其默认值为64MB,同时默认禁用了maxPendingMessagesmaxPendingMessagesAcrossPartitions配置(默认值修改为0),另外将maxPendingMessagesAcrossPartitions配置标记为了Deprecated,因为使用这个配置最终目的就是控制客户端producer的内存使用,现在已经有memoryLimit这个更加直接的配置可以替代了。

# Consumer的内存控制

上面说的全部都是围绕着Producer侧的内存使用来讲的,其实在PIP-74中也提到了Pulsar客户端consumer侧的内存使用,只不过在实现中是分阶段进行的。

我们先来看一下Pulsar客户端的API早期在构造一个Consumer时,ConsumerBuilder (opens new window)提供的与内存使用有关的选项:

/*
 * Sets the size of the consumer receive queue.
 *
 * (default: 1000)
 */
ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);

/*
 * Sets the max total receiver queue size across partitions.
 *
 * (default: 50000)
 */
ConsumerBuilder<T> maxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions);

Pulsar客户端通过预接收队列临时存放broker推送过来的消息,以便应用程序调用Consumer#receive或者Consumer#receiveAsync方法时直接从内存中返回消息,这是出于消费吞吐的考虑,本质上是一种以空间换取时间的策略。上面两个选项是给这个”空间“设置一个数量上的上限,注意这里仅是数量上的上限,实际的内存空间使用还要取决于平均消息大小。receiverQueueSize控制每个分区consumer的接收队列大小,maxTotalReceiverQueueSizeAcrossPartitions来控制所有分区consumer和parent consumer的接收队列总大小。

前面提到receiverQueueSizemaxTotalReceiverQueueSizeAcrossPartitions参数是以数量的形式间接的控制Consumer预接收队列的内存使用,在PIP-74中提出了整个client级别的memoryLimit,同时提出了一个新的控制Consumer内存使用的方案,就是autoScaledReceiverQueueSizeEnabled:

/*
 * If this is enabled, the consumer receiver queue size is initialized as a very small value, 1 by default,  
 * and will double itself until it reaches either the value set by {@link #receiverQueueSize(int)} or the client  
 * memory limit set by {@link ClientBuilder#memoryLimit(long, SizeUnit)}.
 */
ConsumerBuilder<T> autoScaledReceiverQueueSizeEnabled(boolean enabled);

当启用了这个特性之后,receiverQueueSize会从1开始呈2的指数倍增长,直至达到receiverQueueSize的限制或达到client的memoryLimit限制,其目标是在有限制的内存使用下,达到最大的吞吐效率。

# 番外

除了上面说的Producer和Consumer在生产和消费过程中的内存使用之外,还有一个容易被忽视的点是创建Consumer时ackTimeoutackTimeoutTickTime的配置如果不匹配,会消耗较多堆内内存。

/*
 * Sets the timeout for unacknowledged messages, truncated to the nearest millisecond. The timeout must be greater than 1 second.
 */
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);

/**
 * Define the granularity of the ack-timeout redelivery.
 *
 * <p>By default, the tick time is set to 1 second. Using a higher tick time  
 * reduces the memory overhead to track messages when the ack-timeout is set to  
 * bigger values (e.g., 1 hour).
 */
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);

若Consumer配置了ackTimeout并且配置了较大的时间窗口(例如1小时或者更长)时,应适当的调大ackTimeoutTickTime,这是因为Consumer内部使用了一个简单时间轮的算法对消息的处理时间计时,若ackTimeout时间窗口很大,ackTimeoutTickTime仍然使用其默认值1s,时间轮本身将会占用大量堆内存空间。具体细节可参考客户端源码UnAckedMessageTracker.java

# 总结

  1. 使用sendAsync非阻塞方法要注意其不能保证消息一定发送成功,特别是开启了blockIfQueueFull之后,它会在特定情况下演变成阻塞方法。
  2. 对于同时使用到了Producer和Consumer的应用,推荐创建两个client,分别用来创建Producer和Consumer,做读写分离,避免由于共用memoryLimit导致相互影响。

# 参考链接

  • https://github.com/apache/pulsar/wiki/PIP-74%3A-Pulsar-client-memory-limits