Skip to content

Commit 1293658

Browse files
KAFKA-19163: Avoid deleting groups with pending transactional offsets (#19496)
When a group has pending transactional offsets but no committed offsets, we can accidentally delete it while cleaning up expired offsets. Add a check to avoid this case. Reviewers: David Jacot <djacot@confluent.io>
1 parent 86baac1 commit 1293658

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,13 +1000,14 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets(
10001000
* @param groupId The group id.
10011001
* @param records The list of records to populate with offset commit tombstone records.
10021002
*
1003-
* @return True if no offsets exist or if all offsets expired, false otherwise.
1003+
* @return True if no offsets exist after expiry and no pending transactional offsets exist,
1004+
* false otherwise.
10041005
*/
10051006
public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> records) {
10061007
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic =
10071008
offsets.offsetsByGroup.get(groupId);
10081009
if (offsetsByTopic == null) {
1009-
return true;
1010+
return !openTransactions.contains(groupId);
10101011
}
10111012

10121013
// We expect the group to exist.

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2662,6 +2662,42 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() {
26622662
assertEquals(List.of(), records);
26632663
}
26642664

2665+
@Test
2666+
public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly() {
2667+
GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class);
2668+
Group group = mock(Group.class);
2669+
2670+
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
2671+
.withGroupMetadataManager(groupMetadataManager)
2672+
.withOffsetsRetentionMinutes(1)
2673+
.build();
2674+
2675+
long commitTimestamp = context.time.milliseconds();
2676+
2677+
context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
2678+
context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500);
2679+
2680+
context.time.sleep(Duration.ofMinutes(1).toMillis());
2681+
2682+
when(groupMetadataManager.group("group-id")).thenReturn(group);
2683+
when(group.offsetExpirationCondition()).thenReturn(Optional.of(
2684+
new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
2685+
when(group.isSubscribedToTopic("foo")).thenReturn(false);
2686+
2687+
// foo-0 is expired, but the group is not deleted beacuse it has pending transactional offset commits.
2688+
List<CoordinatorRecord> expectedRecords = List.of(
2689+
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0)
2690+
);
2691+
List<CoordinatorRecord> records = new ArrayList<>();
2692+
assertFalse(context.cleanupExpiredOffsets("group-id", records));
2693+
assertEquals(expectedRecords, records);
2694+
2695+
// No offsets are expired, and the group is still not deleted because it has pending transactional offset commits.
2696+
records = new ArrayList<>();
2697+
assertFalse(context.cleanupExpiredOffsets("group-id", records));
2698+
assertEquals(List.of(), records);
2699+
}
2700+
26652701
private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(
26662702
int partition,
26672703
long offset,

0 commit comments

Comments
 (0)