message filter for Apache Pulsar, both support server-side and client-side.
pulsar-msg-filter-plugin
是一个基于PIP 105: Support pluggable entry filter in Dispatcher
为 Apache Pulsar 实现的 服务端 消息过滤插件。
pulsar-msg-filter-interceptor
是一个基于 Pulsar ConsumerInterceptor
实现的 客户端 消息过滤拦截器。
- 高性能、小巧
- 支持常见条件表达式,几乎满足各种业务过滤场景
[server-side] pulsar-msg-filter-plugin 使用说明
-
下载pulsar-msg-filter-plugin-VERSION.nar插件并保存至指定目录,如/app/conf/plugin
-
修改pulsar broker.conf配置(version >= 2.10),插件名称
pulsar-msg-filter
# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered # You can use this class to decide which entries can be sent to consumers. # Multiple classes need to be separated by commas. entryFilterNames=pulsar-msg-filter # The directory for all the entry filter implementations entryFiltersDirectory=/app/plugin # Location of unpacked NAR file narExtractionDirectory=/app/nar
-
重启broker,查看日志,如果看到如下日志:
Successfully loaded entry filter for name
`pulsar-msg-filter`则说明配置成功
-
验证(option)
-
发送方构建Producer实例时关闭
batch
操作 .enableBatching(false)Producer<String> producer = client.newProducer(Schema.STRING) .topic("test-topic-1") .enableBatching(false) .create(); producer.newMessage() .property("k1","7") .property("k2", "vvvv") .property("k3", "true") .value("hi, this msg from `pulsar-msg-filter-plugin`") .send();
-
消费方使用admin配置订阅消费组过滤表达式,其key固定为 pulsar-msg-filter-expression
pulsar-admin topics update-subscription-properties --property --property pulsar-msg-filter-expression="double(k1)<6 || (k2=='vvvv' && k3=='true')" --subscription 订阅组名称 主题 pulsar-admin topics get-subscription-properties --subscription 订阅组名称 主题
Consumer consumer = client.newConsumer() .subscriptionName("订阅组名称") .topic("主题") .subscribe();
-
[client-side] pulsar-msg-filter-interceptor 使用说明
-
添加 pulsar-msg-filter-interceptor 依赖
<dependency> <groupId>io.github.yangl</groupId> <artifactId>pulsar-msg-filter-interceptor</artifactId> <version>VERSION</version> </dependency>
-
创建Consumer实例时配置 MsgFilterConsumerInterceptor 过滤器
Consumer<String> consumer = client.newConsumer(Schema.STRING) .subscriptionName("订阅组名称") .topic("主题") .intercept(MsgFilterConsumerInterceptor.<String>builder().build()) .subscribe();
.intercept(MsgFilterConsumerInterceptor.<String>builder().webServiceUrl(YOUR_HTTP_SERVICE_URL).build())
注意事项
- 由于pulsar message header的key&value全部为
String
类型,在使用表达式的时候注意将其类型转换至目标类型 - AviatorScript的
false
判断个人建议直接使用字符串的==
true/false
比较,AviatorScript只有nil false
为false,其他全部为true - 过滤引擎使用AviatorScript (感谢晓丹),其内置函数详见其 函数库列表
pulsar-msg-filter
is licensed under the AGPLv3 License.