Skip to content

Commit 017da21

Browse files
KAFKA-17710; Rework uniform heterogeneous assignor to improve perf (#17385)
Rework the uniform heterogeneous assignor to improve performance, while preserving the high level ideas and structure from the existing implementation: * The assignor works in 3 stages: importing the previous assignment for stickiness, assigning unassigned partitions and iteratively reassigning partitions to improve balance. * Unassigned partitions are assigned to the subscribers with the least number of partitions. This maximizes balance within a single topic. * During the iterative rebalancing phase, partitions are reassigned to their previous owner if it improves balance (stickiness restoration). * During the iterative rebalancing phase, partitions are reassigned to the subscriber with the least number of partitions to improve balance. A non-exhaustive list of changes is: * The assignment of unassigned partitions and iterative reassignment stages now works through partitions topic by topic. Previously partitions from topics with the same number of partitions per subscriber would be interleaved. Since we iterate topic by topic, we can reuse data about topic subscribers. * Instead of maintaining TreeSets to find the least loaded subscribers, we sort an ArrayList of subscribers once per topic and start filling up subscribers, least loaded first. In testing, this approach was found to be faster than maintaining PriorityQueues. * Implement stickiness restoration by creating a mapping of partitions to previous owner and checking against that mapping, instead of tracking partition movements during iterative reassignment. * Track member partition counts using a plain int array, to avoid overhead from boxing and HashMap lookups. Member partition counts are accessed very frequently and this needs to be fast. As a consequence, we have to number members 0 to M - 1. * Bound the iterative reassignment stage to a fixed number of iterations. Under some uncommon subscription patterns, the iterative reassignment stage converges slowly. In these cases, the iterative reassignment stage terminates without producing an optimally balanced assignment anyway (see javadoc for balanceTopics). * Re-use Maps from the previous assignment where possible, ie. introduce a copy-on-write mechanism while computing the new assignment. Reviewers: David Jacot <djacot@confluent.io>
1 parent c20c244 commit 017da21

File tree

4 files changed

+867
-676
lines changed

4 files changed

+867
-676
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.group.assignor;
18+
19+
import org.apache.kafka.common.Uuid;
20+
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.HashSet;
24+
import java.util.Map;
25+
import java.util.Set;
26+
27+
/**
28+
* Provides helper methods for assignors.
29+
*/
30+
public final class AssignorHelpers {
31+
private static final Class<?> UNMODIFIABLE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass();
32+
private static final Class<?> EMPTY_MAP_CLASS = Collections.emptyMap().getClass();
33+
34+
/**
35+
* @return True if the provided map is an UnmodifiableMap or EmptyMap. Those classes are not
36+
* public hence we cannot use the `instanceof` operator.
37+
*/
38+
public static boolean isImmutableMap(Map<?, ?> map) {
39+
return UNMODIFIABLE_MAP_CLASS.isInstance(map) || EMPTY_MAP_CLASS.isInstance(map);
40+
}
41+
42+
/**
43+
* Deep copies a member assignment map.
44+
* @param map The assignment to copy.
45+
* @return A deep copy of the assignment.
46+
*/
47+
public static Map<Uuid, Set<Integer>> deepCopyAssignment(Map<Uuid, Set<Integer>> map) {
48+
Map<Uuid, Set<Integer>> copy = new HashMap<>(map.size());
49+
for (Map.Entry<Uuid, Set<Integer>> entry : map.entrySet()) {
50+
copy.put(entry.getKey(), new HashSet<>(entry.getValue()));
51+
}
52+
return copy;
53+
}
54+
}

0 commit comments

Comments
 (0)