source

Kafka는 "그룹 구성원이 실제로 소비자 그룹에 가입하기 전에 유효한 멤버 ID를 가지고 있어야 합니다."

factcode 2023. 2. 17. 21:40
반응형

Kafka는 "그룹 구성원이 실제로 소비자 그룹에 가입하기 전에 유효한 멤버 ID를 가지고 있어야 합니다."

자바에서 메시지를 소비하기 위해 Kafka를 사용하고 있습니다.로컬 박스에서 같은 앱을 여러 번 실행하여 테스트하고 싶습니다.처음 시작할 때 토픽의 메시지를 소비할 수 있습니다.두 번째 시스템을 시작하면 다음과 같은 이점이 있습니다.

Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group

그리고 토픽에서 어떤 메시지도 받지 않습니다.더 많이 시작하려고 해도 같은 문제가 발생합니다.

Kafka에서 사용하고 있는 설정은

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.use.type.headers: false
    listener:
      missing-topics-fatal: false

저는 두 가지 주제가 있습니다.

@Configuration
public class KafkaTopics {
    @Bean("alertsTopic")
    public NewTopic alertsTopic() {

        return TopicBuilder.name("XXX.alerts")
            .compact()
            .build();
    }

    @Bean("serversTopic")
    public NewTopic serversTopic() {

        return TopicBuilder.name("XXX.servers")
            .compact()
            .build();
    }

}

그리고 두 명의 청취자가 다른 클래스 파일에 있습니다.

@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
    properties = {
        "spring.json.key.default.type=java.lang.String",
        "spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo"
    })
public void registerServer(
    @Payload(required = false) ServerInfo serverInfo
) 

@KafkaListener(topics = ALERTS_KAFKA_TOPIC,
    id = "#{T(java.util.UUID).randomUUID().toString()}",
    properties = {
        "spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey",
        "spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka"
    })
public void processAlert(
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
    @Header(KafkaHeaders.OFFSET) long offset,
    @Payload(required = false) AlertOnKafka alert)

내 분석으로는.이것은 정상적인 동작이므로 로그 수준을 변경하여 제외할 수 있습니다.

그 이유는 서버가 클라이언트가 지원할 수 있는 것을 검출했을 경우입니다.member.id이 에러는 클라이언트에 반환됩니다.이것은 KIP-394에 기재되어 있습니다.

클라이언트는 생성된 멤버 ID를 사용하여 서버에 다시 연결합니다.

@Archimedes Trajano의 답변이 통하지 않는 경우(나의 경우처럼), 이것은 Kafka가 소비자 그룹 ID를 픽업할 수 없을 때 발생합니다.

단일 사용자 그룹이 있는 경우 속성 파일에서 다음과 같이 지정할 수 있습니다.

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      group-id: insert-your-consumer-group-id
      ... rest of your properties ...

또는 여러 개의 소비자가 있는 경우 다음 명령을 지정할 수 있습니다.groupId각각에 대해:

@KafkaListener(topics="topic-1", groupId="group-1")
public void registerServer(@Payload(required = false) ServerInfo serverInfo) 

@KafkaListener(topics="topic-2",groupId="group-2")
public void processAlert(@Payload(required = false) AlertOnKafka alert)

docs: https://docs.spring.io/spring-kafka/reference/html/ #information-properties

같은 문제가 발생하여 소비자가 특정 토픽에 가입할 수 없게 되었습니다.

저는 member.id이 client-id(여기서 camel을 사용)와 비슷할 수 있다고 생각했습니다.이것은 어떤 면에서 컨슈머 그룹과 관련되어 있을 가능성이 있습니다.

이 문제를 해결할 수 있었던 것은, 이번에는 클라이언트 ID를 변경하지 않고, 소비하고 있는 서비스의 고객 그룹을 그대로 유지하는 것이었습니다.

hummm.... 제 경우, 소비자들은 생성된 ID로 그룹에 가입합니다.여기 제 시험과 결과가 있습니다.참고:

@Test
    void testSyncSend() throws ExecutionException, InterruptedException {
        int id = (int)(System.currentTimeMillis()/1000);
        SendResult result = producer.syncSend(id);
        logger.info("[testSyncSend] id:{}, result:{}", id, result);
        new CountDownLatch(1).await();
    }
2021-04-20 14:32:20.980  INFO 6672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
2021-04-20 14:32:20.981  INFO 6672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
2021-04-20 14:32:20.981  INFO 6672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1618900340980
2021-04-20 14:32:21.125  INFO 6672 --- [listener1-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-listener1-1, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125  INFO 6672 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125  INFO 6672 --- [listener1-3-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-listener1-4, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125  INFO 6672 --- [listener1-2-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-listener1-3, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125  INFO 6672 --- [listener1-1-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-listener1-2, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.127  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
2021-04-20 14:32:21.127  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
2021-04-20 14:32:21.127  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
2021-04-20 14:32:21.127  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
2021-04-20 14:32:21.130  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.130  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.130  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.130  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.147  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2021-04-20 14:32:21.147  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2021-04-20 14:32:21.147  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2021-04-20 14:32:21.147  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2021-04-20 14:32:21.148  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.148  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.148  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.148  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] (Re-)joining group
2021-04-20 14:32:21.317  INFO 6672 --- [           main] c.z.s.cbaConnector.KafkaProducerTest     : [testSyncSend] id:1618900340, result:SendResult [producerRecord=ProducerRecord(topic=test1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 122, 104, 105, 108, 105, 46, 115, 109, 115, 109, 111, 100, 117, 108, 101, 46, 101, 110, 116, 105, 116, 121, 46, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Message(id=1618900340), timestamp=null), recordMetadata=test1-1@0]
2021-04-20 14:32:23.770  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Finished assignment for group at generation 2: {consumer-listener1-4-9a383250-e84d-413a-b012-85405abdcf7f=Assignment(partitions=[test1-8, test1-9]), consumer-listener1-2-3d26d9ef-b973-4d19-a930-5ba77d938680=Assignment(partitions=[test1-3, test1-4, test1-5]), consumer-listener1-1-10b6895e-264e-45bd-ba90-c71ea12b21e5=Assignment(partitions=[test1-0, test1-1, test1-2]), consumer-listener1-3-54ce965a-87cd-4e28-b0e9-b0f2c9f69423=Assignment(partitions=[test1-6, test1-7])}
2021-04-20 14:32:23.775  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Successfully joined group with generation 2
2021-04-20 14:32:23.775  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Successfully joined group with generation 2
2021-04-20 14:32:23.775  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Successfully joined group with generation 2
2021-04-20 14:32:23.776  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Successfully joined group with generation 2
2021-04-20 14:32:23.779  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Adding newly assigned partitions: test1-5, test1-4, test1-3
2021-04-20 14:32:23.779  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Adding newly assigned partitions: test1-6, test1-7
2021-04-20 14:32:23.779  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Adding newly assigned partitions: test1-0, test1-2, test1-1
2021-04-20 14:32:23.779  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Adding newly assigned partitions: test1-9, test1-8
2021-04-20 14:32:23.789  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Found no committed offset for partition test1-8
2021-04-20 14:32:23.789  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Found no committed offset for partition test1-6
2021-04-20 14:32:23.789  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Found no committed offset for partition test1-2
2021-04-20 14:32:23.789  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Found no committed offset for partition test1-1
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-1, groupId=listener1] Setting offset for partition test1-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-5 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-4, groupId=listener1] Setting offset for partition test1-9 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-3, groupId=listener1] Setting offset for partition test1-7 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-4 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.791  INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-3 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
2021-04-20 14:32:23.792  INFO 6672 --- [listener1-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : listener1: partitions assigned: [test1-5, test1-4, test1-3]
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-listener1-4, groupId=listener1] Resetting offset for partition test1-8 to offset 0.
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-listener1-3, groupId=listener1] Resetting offset for partition test1-6 to offset 0.
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-listener1-1, groupId=listener1] Resetting offset for partition test1-2 to offset 0.
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-listener1-1, groupId=listener1] Resetting offset for partition test1-1 to offset 0.
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : listener1: partitions assigned: [test1-9, test1-8]
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : listener1: partitions assigned: [test1-6, test1-7]
2021-04-20 14:32:23.804  INFO 6672 --- [listener1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : listener1: partitions assigned: [test1-0, test1-2, test1-1]
2021-04-20 14:32:23.817  INFO 6672 --- [listener1-0-C-1] c.z.s.cbaConnector.KafkaConsumer         : [KafakaConsumer][consume] thread:21 received message:{"id":1618900340}

언급URL : https://stackoverflow.com/questions/63946696/kafka-is-giving-the-group-member-needs-to-have-a-valid-member-id-before-actual

반응형