2222import org .apache .kafka .coordinator .group .api .assignor .MemberAssignment ;
2323import org .apache .kafka .coordinator .group .api .assignor .PartitionAssignor ;
2424import org .apache .kafka .coordinator .group .api .assignor .SubscribedTopicDescriber ;
25+ import org .apache .kafka .coordinator .group .api .assignor .SubscriptionType ;
2526import org .apache .kafka .coordinator .group .assignor .UniformAssignor ;
2627import org .apache .kafka .coordinator .group .modern .Assignment ;
2728import org .apache .kafka .coordinator .group .modern .SubscribedTopicDescriberImpl ;
@@ -71,6 +72,9 @@ public class TargetAssignmentBuilderBenchmark {
7172 @ Param ({"10" , "100" , "1000" })
7273 private int topicCount ;
7374
75+ @ Param ({"HOMOGENEOUS" , "HETEROGENEOUS" })
76+ private SubscriptionType subscriptionType ;
77+
7478 private static final String GROUP_ID = "benchmark-group" ;
7579
7680 private static final int GROUP_EPOCH = 0 ;
@@ -79,6 +83,9 @@ public class TargetAssignmentBuilderBenchmark {
7983
8084 private TargetAssignmentBuilder <ConsumerGroupMember > targetAssignmentBuilder ;
8185
86+ /** The number of homogeneous subgroups to create for the heterogeneous subscription case. */
87+ private static final int MAX_BUCKET_COUNT = 5 ;
88+
8289 private GroupSpec groupSpec ;
8390
8491 private Map <Uuid , Map <Integer , String >> invertedTargetAssignment ;
@@ -109,7 +116,7 @@ public void setup() {
109116 targetAssignmentBuilder = new TargetAssignmentBuilder <ConsumerGroupMember >(GROUP_ID , GROUP_EPOCH , partitionAssignor )
110117 .withMembers (members )
111118 .withSubscriptionMetadata (subscriptionMetadata )
112- .withSubscriptionType (HOMOGENEOUS )
119+ .withSubscriptionType (subscriptionType )
113120 .withTargetAssignment (existingTargetAssignment )
114121 .withInvertedTargetAssignment (invertedTargetAssignment )
115122 .withTopicsImage (topicsImage )
@@ -136,7 +143,7 @@ private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInve
136143 ) {
137144 this .groupSpec = AssignorBenchmarkUtils .createGroupSpec (
138145 members ,
139- HOMOGENEOUS ,
146+ subscriptionType ,
140147 topicsImage
141148 );
142149
@@ -158,12 +165,22 @@ private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInve
158165 }
159166
160167 private Map <String , ConsumerGroupMember > createMembers () {
161- return AssignorBenchmarkUtils .createHomogeneousMembers (
162- memberCount - 1 ,
163- this ::memberId ,
164- this ::rackId ,
165- allTopicNames
166- );
168+ if (subscriptionType == HOMOGENEOUS ) {
169+ return AssignorBenchmarkUtils .createHomogeneousMembers (
170+ memberCount - 1 ,
171+ this ::memberId ,
172+ this ::rackId ,
173+ allTopicNames
174+ );
175+ } else {
176+ return AssignorBenchmarkUtils .createHeterogeneousBucketedMembers (
177+ memberCount - 1 ,
178+ MAX_BUCKET_COUNT ,
179+ this ::memberId ,
180+ this ::rackId ,
181+ allTopicNames
182+ );
183+ }
167184 }
168185
169186 private String memberId (int memberIndex ) {
0 commit comments