Skip to content

Commit 02bd342

Browse files
KAFKA-19449: Don't consider partitions unreleased when pending revocation by the current member (#20781)
During reconciliation, we transition members to UNRELEASED_PARTITIONS when the target assignment contains a partition that is owned by another member. Partitions are considered owned by another member if they are in a member's assignment or partitions pending revocation. Ownership is only updated after reconciliation has finished. Fix a bug where we consider a partition owned by another member when it is actually in the current member's list of partitions pending revocation. In some cases, this skips a transition to UNRELEASED_PARTITIONS and allows the current assignment to converge to the target assignment one heartbeat sooner. Reviewers: PoAn Yang <payang@apache.org>, David Jacot <djacot@confluent.io>
1 parent 69e1c91 commit 02bd342

File tree

2 files changed

+80
-1
lines changed

2 files changed

+80
-1
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,11 @@ private ConsumerGroupMember computeNextAssignment(
394394
Set<Integer> partitionsPendingAssignment = new HashSet<>(target);
395395
partitionsPendingAssignment.removeAll(assignedPartitions);
396396
hasUnreleasedPartitions = partitionsPendingAssignment.removeIf(partitionId ->
397-
currentPartitionEpoch.apply(topicId, partitionId) != -1
397+
currentPartitionEpoch.apply(topicId, partitionId) != -1 &&
398+
// Don't consider a partition unreleased if it is owned by the current member
399+
// because it is pending revocation. This is safe to do since only a single member
400+
// can own a partition at a time.
401+
!member.partitionsPendingRevocation().getOrDefault(topicId, Set.of()).contains(partitionId)
398402
) || hasUnreleasedPartitions;
399403

400404
if (!assignedPartitions.isEmpty()) {

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,81 @@ public void testUnrevokedPartitionsToUnreleasedPartitions() {
498498
);
499499
}
500500

501+
@Test
502+
public void testUnrevokedPartitionsToStableWithReturnedPartitionsPendingRevocation() {
503+
String topic1 = "topic1";
504+
String topic2 = "topic2";
505+
Uuid topicId1 = Uuid.randomUuid();
506+
Uuid topicId2 = Uuid.randomUuid();
507+
508+
CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
509+
.addTopic(topicId1, topic1, 10)
510+
.addTopic(topicId2, topic2, 10)
511+
.buildCoordinatorMetadataImage();
512+
513+
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
514+
.setState(MemberState.UNREVOKED_PARTITIONS)
515+
.setMemberEpoch(10)
516+
.setPreviousMemberEpoch(10)
517+
.setSubscribedTopicNames(List.of(topic1, topic2))
518+
.setAssignedPartitions(mkAssignment(
519+
mkTopicAssignment(topicId1, 2, 3),
520+
mkTopicAssignment(topicId2, 5, 6)))
521+
.setPartitionsPendingRevocation(mkAssignment(
522+
// Partition 4 is pending revocation by the member but is back in the latest target
523+
// assignment.
524+
mkTopicAssignment(topicId1, 4)))
525+
.build();
526+
527+
ConsumerGroupMember updatedMember = new CurrentAssignmentBuilder(member)
528+
.withMetadataImage(metadataImage)
529+
.withTargetAssignment(12, new Assignment(mkAssignment(
530+
mkTopicAssignment(topicId1, 2, 3, 4),
531+
mkTopicAssignment(topicId2, 5, 6, 7))))
532+
.withCurrentPartitionEpoch((topicId, partitionId) -> {
533+
if (topicId.equals(topicId1)) {
534+
// Partitions 2 and 3 are in the member's current assignment.
535+
// Partition 4 is pending revocation by the member.
536+
switch (partitionId) {
537+
case 2:
538+
case 3:
539+
case 4:
540+
return 10;
541+
}
542+
} else if (topicId.equals(topicId2)) {
543+
// Partitions 5 and 6 are in the member's current assignment.
544+
switch (partitionId) {
545+
case 5:
546+
case 6:
547+
return 10;
548+
}
549+
}
550+
return -1;
551+
})
552+
.withOwnedTopicPartitions(Arrays.asList(
553+
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
554+
.setTopicId(topicId1)
555+
.setPartitions(Arrays.asList(2, 3)),
556+
new ConsumerGroupHeartbeatRequestData.TopicPartitions()
557+
.setTopicId(topicId2)
558+
.setPartitions(Arrays.asList(5, 6))))
559+
.build();
560+
561+
assertEquals(
562+
new ConsumerGroupMember.Builder("member")
563+
.setState(MemberState.STABLE)
564+
.setMemberEpoch(12)
565+
.setPreviousMemberEpoch(10)
566+
.setSubscribedTopicNames(List.of(topic1, topic2))
567+
.setAssignedPartitions(mkAssignment(
568+
mkTopicAssignment(topicId1, 2, 3, 4),
569+
mkTopicAssignment(topicId2, 5, 6, 7)))
570+
.setPartitionsPendingRevocation(Map.of())
571+
.build(),
572+
updatedMember
573+
);
574+
}
575+
501576
@Test
502577
public void testUnreleasedPartitionsToStable() {
503578
String topic1 = "topic1";

0 commit comments

Comments
 (0)