-
Notifications
You must be signed in to change notification settings - Fork 1.1k
2.0.7表模式下如何订阅实时数据 #17414
Copy link
Copy link
Open
Description
`
// 创建 topics
try (final ISubscriptionTableSession session = new SubscriptionTableSessionBuilder()
.host(host)
.port(port)
.username(user)
.password(password)
.build())
{
final Properties config = new Properties();
config.put(TopicConstant.DATABASE_KEY, "酸奶");
config.put(TopicConstant.TABLE_KEY, "金山_产线_灌装_老酸奶灌装机");
config.put(TopicConstant.START_TIME_KEY, "now");
config.put(TopicConstant.STRICT_KEY, "true");
config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_DEFAULT_VALUE);
session.createTopic("test_topic", config);
log.info("主题 {} 创建成功", "test_topic");
}
ThreadUtil.execute(() -> {
// 创建消费者
// push 模式
final ISubscriptionTablePushConsumer consumer = new SubscriptionTablePushConsumerBuilder()
.consumerId("c1").consumerGroupId("cg1")
.host(host).port(port)
.username(user).password("root")
.consumeListener(message -> {
System.out.println( message );
return ConsumeResult.SUCCESS;
})
.build();
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
`
以上代码中 我拿不到 实时数据。我的实时数据是通过iotdb 自带的 mqtt 写进去的。每次写进去 执行select 查询的时候已经查到新数据了。但是 这个订阅始终收不到数据。
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels