前言
对于pulsar的特性以及优异,这里不多讲解,直接上干货,主要讲一下Pulsar的docker部署,生产者/消费者几种
不同模式,以及Topic的使用规则
复制代码
Docker部署pulsar
docker run -it -p 80:80 -p 8080:8080 -p 6650:6650 -d apachepulsar/pulsar-standalone
复制代码
部署问题
因为我用的是腾讯云最基础的服务器,在执行docker命令后,发现Pulsar会启动失败或启动不久便停止,查看日志发现是内存顶不住
复制代码
查看官网Pulsar默认启动是2g,因此把启动配置修改成机器支持的即可;
docker exec -it pulsar-test sh
cd /pulsar/conf/
vim conf/pulsar_env.sh; 之后重启pulsar即可
复制代码
连接Pulsar
/**
* pulsar 连接bean
*/
@Bean
public PulsarClient getPulsarClient() throws PulsarClientException {
return PulsarClient.builder()
.serviceUrl("pulsar://Ip地址:6650")
.build();
}
复制代码
基础概念了解
Produce 消息的源头,也是消息的发布者,负责将消息发送到 topic。
Consumer 消息的消费者,负责从 topic 订阅并消费消息。
Topic 消息数据的载体,在 Pulsar 中 Topic 可以指定分为多个 partition,如果不设置默认只有一个 partition
(这个指定多个partition,我会在文中后面示例演示,可以留意下)
Brkber 一个无状态组件,主要负责接收 Producer 发送过来的消息,并交付给 Consumer,可以理解成送快递的小哥
复制代码
Produce详解
创建方式
简单方法创建
Producer<String> stringProducer = client.newProducer(Schema.STRING)
.topic("my-topic")
.create();
stringProducer.send("My message")
复制代码
loadConf自定义配置创建
config里面可以填一些自定义配置,如sendTimeoutMs 消息发送超时(毫秒)。如果在sendTimeout过期之前服务器未确认消息,则会发生错误,其他有趣的可以看下官网
复制代码
Pulsar官网
/**
* 使用loadConf创建Produce
*/
@Test
public void testProducer() throws Exception {
Map<String, Object> config1 = new HashMap<>();
config1.put("producerName", "produce-demo1");
config1.put("topicName", "topic1");
Producer producer1 = client
.newProducer()
.loadConf(config1)
.create();
producer1.send(("test1 --- " + new Date()).getBytes());
}
复制代码
发送模式
同步发送
同步发送消息是Producer发送消息以后要等到broker的确认以后才会认为消息发送成功,如果没有收到确认就认为消息发送失败
复制代码
/**
* 测试同步发送
*/
@Test
public void testProducer22() throws Exception {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("my-topic")
.producerName("produce-demo1")
.create();
MessageId messageId = stringProducer.send("My message" + "发送消息时间" + new Date());
System.out.println("消息同步发送---");
System.in.read();
}
复制代码
异步发送
异步发送消息是 Producer 发送消息,将消息放到阻塞队列中并立即返回。不需要等待 broker 的确认
复制代码
/**
* 测试异步发送
*/
@Test
public void testProducer222() throws Exception {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("my-topic")
.producerName("produce-demo1")
.create();
CompletableFuture<MessageId> messageIdCompletableFuture = stringProducer.sendAsync(
"异步发送的消息");
System.in.read();
}
复制代码
访问方式/发送方式
Share模式(默认情况)
默认情况下多个生产者可以发布消息到同一个Topic,指定发送模式.accessMode(ProducerAccessMode.Shared)方法
复制代码
/**
* shard模式 默认情况下多个生产者可以发布消息到同一个 Topic
*/
@Test
public void testProducer222() throws IOException {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.accessMode(ProducerAccessMode.Shared)
.topic("访问模式-shared")
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "发送消息时间" + new Date());
Producer<String> stringProducer2 = client
.newProducer(Schema.STRING)
.accessMode(ProducerAccessMode.Shared)
.topic("访问模式-shared")
// Producer with name 'produce-demo1' is already connected to topic
//注意生产者名称不能重复
.producerName("produce-demo2")
.create();
stringProducer2.send("My message 2 " + "发送消息时间" + new Date());
System.in.read();
}
复制代码
请注意:
这里我特意标注了生产者名称不能重复,否则对于Pulsar来说,发送消息会报错,如下图,已经有一个produce-
demo1的生产者了,再来一个就会报错Producer with name 'produce-demo1' is already connected to topic
因此如果我们是集群部署的话,尤其注意每一个节点生产者的命名
当然对于消费者也是同样的规则,不允许名称重复(在下文我也会演示到)
复制代码
/**
* 演示生产者名称重复,发送报错
*/
@Test
public void testProducer1() throws IOException {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("访问模式-shared")
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "发送消息时间" + new Date());
System.in.read();
}
/**
* 演示生产者名称重复,发送报错
*/
@Test
public void testProducer11() throws IOException {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("访问模式-shared")
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "发送消息时间" + new Date());
System.in.read();
}
复制代码
Exclusive
要求生产者以独占模式访问 Topic,在此模式下如果 Topic已经有了生产者,那么其他生产者在连接就会失败报错。
复制代码
/**
* Exclusive 要求生产者以独占模式访问 Topic,在此模式下 如果 Topic 已经有了生产者,那么其他生产者在连接就会失败报错。
* <p>
* "Topic has an existing exclusive producer: standalone-0-12
*/
@Test
public void testProducer6() throws IOException {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("访问模式-Exclusive")
//设置访问模式 默认shared
.accessMode(ProducerAccessMode.Exclusive)
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "发送消息时间" + new Date());
Producer<String> stringProducer2 = client
.newProducer(Schema.STRING)
.topic("访问模式-Exclusive")
//设置访问模式 默认shared
.accessMode(ProducerAccessMode.Exclusive)
// Producer with name 'produce-demo1' is already connected to topic
//注意生产者名称不能重复
.producerName("produce-demo2")
.create();
stringProducer2.send("My message 2 " + "发送消息时间" + new Date());
System.in.read();
}
复制代码
WaitForExclusive
如果主题已经连接了生产者,则将当前生产者挂起,直到生产者获得了Exclusive 访问权限。
该怎么来理解这句话,打个不恰当比喻,类似于Java中的独占锁Sycronized一样,你没有获取到锁,没有获取到权限,就不能发消息,
对比Exclusive报错来说,WaitForExclusive是不会报错的,只会是挂起,
来看下面的demo感受下
1 我们先开启一个线程A向 访问模式-WaitForExclusive topic发送一条消息,My message 1 ***
复制代码
/**
* WaitForExclusive
* <p>
* 如果主题已经连接了生产者,则将当前生产者挂起,直到生产者获得了 Exclusive 访问权限。
* <p>
* 也就是存在相同的生产者,不会报错,当然也不会发送消息, 获取到独占后,会将未获取到独占时的消息进行发送!!!
*/
@Test
public void testProducer2() throws Exception {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("访问模式-WaitForExclusive")
//设置访问模式 默认shared
.accessMode(ProducerAccessMode.WaitForExclusive)
.producerName("produce-demo1")
.create();
stringProducer.send("My message 1 " + "发送消息时间" + new Date());
System.in.read();
}
复制代码
2 然后再开启另一个线程B向 访问模式-WaitForExclusive topic发送10条消息,My message 2 ***
复制代码
/**
* WaitForExclusive
*/
@Test
public void testProducer22() throws Exception {
Producer<String> stringProducer = client
.newProducer(Schema.STRING)
.topic("访问模式-WaitForExclusive")
//设置访问模式 默认shared
.accessMode(ProducerAccessMode.WaitForExclusive)
.producerName("produce-demo1")
.create();
//假设有10条消息在未获取 独占前,均未被发送,模拟来看一下,获取独占后, 这10条消息会进行发送吗 ? 会
for (int i = 0; i < 10; i++) {
stringProducer.send("My message 2 " + "发送消息时间" + new Date());
}
System.in.read();
}
复制代码
3 然后写个简单的消费者看一下消费情况
复制代码
@Test
public void testConsumer2() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("访问模式-WaitForExclusive")
.subscriptionName("my-subscription")
.messageListener(myMessageListener)
.subscribe()
System.in.read();
}
复制代码
4 会看到消费者只消费到了 线程A发送的消息,线程B的消息未被消费,因为此时topic的独占权还在线程池A上
复制代码
5 手动杀死线程A,然后看消费者情况,会看到开始消费出My message 2 *** 也就是线程B的消息,
因为此时线程A被杀死,线程B得到了独占权,线程B将消息发送出去
复制代码
Consumer详解
创建方式
简单方法创建
可以看到写了一个while true去获取消息,对于线城是阻塞不友好的,因此我一般用第二种,监听器方法
复制代码
/**
* 创建消费者
*/
@Test
public void testConsumer22() throws Exception{
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscribe();
while (true) {
// Wait for a message
Message msg = consumer.receive();
try {
// Do something with the message
System.out.println("Message received: " + new String(msg.getData()));
// Acknowledge the message so that it can be deleted by the message broker
consumer.acknowledge(msg);
} catch (Exception e) {
// Message failed to process, redeliver later
consumer.negativeAcknowledge(msg);
}
}
}
复制代码
监听器方法创建
/**
* 接收消息:异步 不阻塞主线程
*/
@Test
public void testConsumer2() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.messageListener(myMessageListener)
.subscribe();
System.out.println("监听器方式,不阻塞线程");
System.in.read();
}
复制代码
loadConf自定义配置创建
更多自定义的配置可以看下官网文件
复制代码
/**
* loadConf创建消费者
*/
@Test
public void testConsumer222() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Map<String, Object> config1 = new HashMap<>();
config1.put("subscriptionName", "consumer-demo1");
config1.put("topicNames", Arrays.asList(new String[]{"my-topic"}));
Consumer consumer = client
.newConsumer()
.loadConf(config1)
.messageListener(myMessageListener)
.subscribe();
System.out.println("loadConf方式");
System.in.read();
}
复制代码
多主题订阅
多主题订阅主要是指一个消费者,可以订阅多个topic,这里我只演示其中两个
复制代码
传入List数组的多主题订阅
/**
* Multi-topic subscriptions
* 多主题订阅
* 多topic 订阅list设置的topic1 topic2
*/
@Test
public void testConsumer3() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
ConsumerBuilder consumerBuilder = client.newConsumer()
.subscriptionName("consumer-3");
List<String> topics = Arrays.asList(
"topic1",
"topic2"
);
Consumer multiTopicConsumer = consumerBuilder
.topics(topics)
.messageListener(myMessageListener)
.subscribe();
System.in.read();
}
复制代码
正则表达式多主题订阅
简单点就是正则表达式匹配,根据业务需要自行设置表达式,这里不多演示
复制代码
/**
* Multi-topic subscriptions
* 多主题订阅
* 正则表达式,订阅所有以1结束的topic
*
*/
@Test
public void testConsumer222() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
ConsumerBuilder consumerBuilder = client.newConsumer()
.subscriptionName("consumer-1");
Pattern allTopicsInNamespace = Pattern.compile("public/default/.*1");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.messageListener(myMessageListener)
.subscribe();
System.in.read();
}
复制代码
消费模式
Exclusive(默认)
这里需要注意的是同一topic主题上只能有一个具有相同订阅名称的使用者 默认,也就是说 如果后端是集群部署的话,请注意默认情况下subscriptionName的命名情况,否则会报错
复制代码
/**
* Exclusive 模式 也是默认的
* 同一主题上只能有一个具有相同订阅名称的使用者 默认
* 否则会启动报错
*/
@Test
public void testConsumerExclusive() throws IOException {
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("Message received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription2")
.subscriptionType(SubscriptionType.Exclusive)
.messageListener(myMessageListener)
.subscribe();
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription2")
.subscriptionType(SubscriptionType.Exclusive)
.messageListener(myMessageListener)
.subscribe();
System.in.read();
}
复制代码
Failover
这个主要是失败转移,对比Exclusive模式,同一主题上可以有具有相同订阅名称的使用者,也就是subscriptionName可以重复,一个节点挂掉了 剩余消息转移到另一个节点继续消费;
这块的业务场景挺不错的,假设我们后台有两台集群部署机器A,B,并且subscriptionName相同,
正常情况下,其他模块往队列仍了一条消息,但是只希望被其中一台机器消费, 一条消息被消费一次,而不是A,B两机器都消费对吧,正常的幂等性操作
现在开始模拟,假设其他模块发送了10条消息,然后只被其中一台消费
复制代码
@Test
public void testProduce2() throws PulsarClientException {
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create();
// 3 messages with "key-1", 3 messages with "key-2", 2 messages with "key-3" and 2 messages with "key-4"
// 这里的key可以类似于 投递到 不同broker的一个标识
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
}
@Test
public void testConsumerFailover() throws IOException {
MessageListener myMessageListener1 = (consumer, msg) -> {
try {
// a++;
// if (a > 4) {
// System.out.println("模拟节点1故障");
//关闭节点1
// consumer.close();
// throw new RuntimeException("模拟某时刻节点1故障,转移至节点2消费");
// }
System.out.println("Message1 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.messageListener(myMessageListener1)
.subscribe();
MessageListener myMessageListener2 = (consumer2, msg) -> {
try {
System.out.println("Message2 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.messageListener(myMessageListener2)
.subscribe();
System.in.read();
}
复制代码
再来看看失败转移,假设其中一台机器宕机,然后我希望剩下机器B,继续消费未消费完的消息,可以看到一台机器模拟宕机后,另一台机器继续消费,也就是失败转移
复制代码
/**
* Failover故障转移 .subscriptionName("my-subscription") 可重复
* 一个节点挂掉了 剩余消息转移到另一个节点继续消费
* 注意这些消费模式 都是和subscriptionName("my-subscription") 订阅者名称相关
*/
int a = 0;
@Test
public void testConsumerFailover() throws IOException {
MessageListener myMessageListener1 = (consumer, msg) -> {
try {
a++;
if (a > 4) {
System.out.println("模拟节点1故障");
//关闭节点1
consumer.close();
throw new RuntimeException("模拟某时刻节点1故障,转移至节点2消费");
}
System.out.println("Message1 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
//consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.messageListener(myMessageListener1)
.subscribe();
MessageListener myMessageListener2 = (consumer2, msg) -> {
try {
System.out.println("Message2 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.messageListener(myMessageListener2)
.subscribe();
System.in.read();
}
复制代码
Shared
多个使用者将能够使用相同的订阅名称,并且消息将根据连接的使用者之间的循环旋转进行分派。 在这种模式下,消费顺序不能保证
复制代码
/**
* Shared模式
* 多个使用者将能够使用相同的订阅名称,并且消息将根据连接的使用者之间的循环旋转进行分派。 在这种模式下,消费顺序不能保证。
* 也就是消费者 1 消费者2 总共消费10条
* 注意都是从 .subscriptionName("my-subscription") 视角
*/
@Test
public void testShared() throws IOException {
MessageListener myMessageListener1 = (consumer, msg) -> {
try {
System.out.println("Message1 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.messageListener(myMessageListener1)
.subscribe();
MessageListener myMessageListener2 = (consumer2, msg) -> {
try {
System.out.println("Message2 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.messageListener(myMessageListener2)
.subscribe();
System.in.read();
}
复制代码
Key_Shared模式
这个简单来理解,发送消息的时候,给这批消息指定一个key,那么消息被消费的时候,相同key的这批消息,只能被同一个节点消费
如下示例我发送消息时,指定下key,然后写消费者看下消费情况,会看到key相同的消息被同一节点消费
复制代码
@Test
public void testProduce2() throws PulsarClientException {
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("my-topic")
.enableBatching(false)
.create();
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
}
/**
* Key_Shared模式
* 多个使用者将能够使用相同的订阅名称,并且消息将根据连接的使用者之间的循环旋转进行分派。 在这种模式下,消费顺序不能保证。
* 也就是消费者 1 消费者2 总共消费10条
* 注意都是从 .subscriptionName("my-subscription") 视角
* <p>
* 具有相同密钥的消息仅按顺序传递给一个消费者。消息在不同消费者之间的可能分布(默认情况下,我们事先不知道哪些密钥将被分配给消费者,但一个密钥只会同时被分配给消费者
* ("key-1", "message-1-1")
* ("key-1", "message-1-2")
* ("key-1", "message-1-3")
* ("key-3", "message-3-1")
* ("key-3", "message-3-2")
* <p>
* <p>
* ("key-2", "message-2-1")
* ("key-2", "message-2-2")
* ("key-2", "message-2-3")
* ("key-4", "message-4-1")
* ("key-4", "message-4-2")
*/
@Test
public void testKeyShared() throws IOException {
MessageListener myMessageListener1 = (consumer, msg) -> {
try {
System.out.println("Message1 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.messageListener(myMessageListener1)
.subscribe();
MessageListener myMessageListener2 = (consumer2, msg) -> {
try {
System.out.println("Message2 received: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
Consumer consumer2 = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.messageListener(myMessageListener2)
.subscribe();
System.in.read();
}
复制代码
模式对比
Exclusive只支持同一topic只能有一个同名订阅者,对于目前大多集群架构,需要每个节点命名subscriptionName不同操作下,
集群中的每个节点都能收到topic消息,对于特殊场景如 前端websocket连接后台集群这类场景,还是蛮实用
Failover:可以保证在集群中消息只被消费一次,幂等性嘛简单点,正常情况下只被其中一台机器消费,也就是固定一台机器,这种就很纱布了
Shared: 可以保证在集群中消息只被消费一次,也是保证了幂等性,而且消息被集群平均消费了,压力down down
Key_Shared 我再想想
复制代码
Topic
Pulsar对topic的命名有如下规则,
{persistent|non-persistent}://tenant/namespace/topic
复制代码
-
persistent / non-persistent 表示主题的类型,主题分为持久化和非持久化主题,默认是持久化的类型。持久化的主题会将消息保存到磁盘上,而非持久化的主题就不会将消息保存到磁盘。
-
tenant Pulsar 中主题的租户,租户对于 Pulsar中的多租户至关重要,并且分布在集群中。
-
namespace 将相关联的 Topic 作为一个组来管理,是管理 Topic 的基本单元。每个租户可以有一个或多个命名空间。文章来源:https://uudwc.com/A/MxRjn
在上面的示例,我们都没有去关注persistent,tenant,namespace的玩法,因为你不去特殊设置的话,pulsar都有默认的
复制代码
我们可以尝试往persistent://sample/namespace_test4/topic-haha1直接发一条消息,你会发现发送报错Policies not found for sample/namespace_test4 namespace
复制代码
/**
* 报错
* 向租户sample 命名空间 namespace_test4 topic topic-haha1 发送消息
* 注意namespace需手动先创建好,否则会报错 olicies not found for sample/namespace_test4 namespace
*/
@Test
public void testProduce322() throws Exception {
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("persistent://sample/namespace_test4/topic-haha1")
.enableBatching(false)
.create();
producer.send("向租户sample 命名空间 namespace_test2 topic topic-haha1 发送消息");
System.in.read();
}
复制代码
文章来源地址https://uudwc.com/A/MxRjn
这里则表示我们需要先创建namespace之后,搞好对应的namespace tenant这些之后才行
那么如何动态去创建namespace,管理tenanat,以及包括我们刚才搞了那么多的生产者消费者测试出来,
能不能有一个UI界面让我一目了然,一手掌握Pulsar呢?
这里我即将介绍Pulsar的一款UI工具Pulsar admin
敬请期待,持续更新