Skip to content

kafka adaptor can not handle non-partitioned topic #38

@casuallc

Description

@casuallc

Reproduce

  • create non-paritioned topic
  • send message to this topic

error
image

probable reason
PulsarKafkaConsumer -> poll

public ConsumerRecords<K, V> poll(long timeoutMillis) {
        try {
            QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
            if (item == null) {
                return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
            }

            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();

            int numberOfRecords = 0;

            while (item != null) {
                TopicName topicName = TopicName.get(item.consumer.getTopic());
                String topic = topicName.getPartitionedTopicName();
                int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
                Message<byte[]> msg = item.message;
                MessageId msgId = msg.getMessageId();
                if (msgId instanceof TopicMessageIdImpl) {
                    msgId = ((TopicMessageIdImpl) msgId).getInnerMessageId();
                }
                long offset = MessageIdUtils.getOffset(msgId);

                TopicPartition tp = new TopicPartition(topic, partition);
                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                	log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
                	resetOffsets(tp);
                }

               // .. other code

            // If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
            return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
This code can not discriminate partitioned-topic or non-paritioned-topic.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions