package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.class */
public class HighAvailabilityTaskAssignor implements LegacyTaskAssignor {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HighAvailabilityTaskAssignor.class);
    public static final int DEFAULT_HIGH_AVAILABILITY_TRAFFIC_COST = 10;
    public static final int DEFAULT_HIGH_AVAILABILITY_NON_OVERLAP_COST = 1;

    @Override // org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor
    public boolean assign(Map<ProcessId, ClientState> map, Set<TaskId> set, Set<TaskId> set2, RackAwareTaskAssignor rackAwareTaskAssignor, AssignmentConfigs assignmentConfigs) {
        TreeSet treeSet = new TreeSet(set2);
        TreeMap<ProcessId, ClientState> treeMap = new TreeMap<>(map);
        assignActiveStatefulTasks(treeMap, treeSet, rackAwareTaskAssignor, assignmentConfigs);
        assignStandbyReplicaTasks(treeMap, set, treeSet, rackAwareTaskAssignor, assignmentConfigs);
        AtomicInteger atomicInteger = new AtomicInteger(assignmentConfigs.maxWarmupReplicas());
        Map<TaskId, SortedSet<ProcessId>> tasksToCaughtUpClients = tasksToCaughtUpClients(treeSet, treeMap, assignmentConfigs.acceptableRecoveryLag());
        Map<TaskId, SortedSet<ProcessId>> tasksToClientByLag = tasksToClientByLag(treeSet, treeMap);
        TreeMap treeMap2 = new TreeMap();
        int assignActiveTaskMovements = TaskMovement.assignActiveTaskMovements(tasksToCaughtUpClients, tasksToClientByLag, treeMap, treeMap2, atomicInteger);
        int assignStandbyTaskMovements = TaskMovement.assignStandbyTaskMovements(tasksToCaughtUpClients, tasksToClientByLag, treeMap, atomicInteger, treeMap2);
        assignStatelessActiveTasks(treeMap, Utils.diff(TreeSet::new, set, treeSet), rackAwareTaskAssignor);
        boolean z = assignActiveTaskMovements + assignStandbyTaskMovements > 0;
        log.info("Decided on assignment: " + treeMap + " with" + (z ? "" : " no") + " followup probing rebalance.");
        return z;
    }

    private static void assignActiveStatefulTasks(SortedMap<ProcessId, ClientState> sortedMap, SortedSet<TaskId> sortedSet, RackAwareTaskAssignor rackAwareTaskAssignor, AssignmentConfigs assignmentConfigs) {
        Iterator<ClientState> it = null;
        for (TaskId taskId : sortedSet) {
            if (it == null || !it.hasNext()) {
                it = sortedMap.values().iterator();
            }
            it.next().assignActive(taskId);
        }
        balanceTasksOverThreads(sortedMap, (v0) -> {
            return v0.activeTasks();
        }, (v0, v1) -> {
            v0.unassignActive(v1);
        }, (v0, v1) -> {
            v0.assignActive(v1);
        }, (clientState, clientState2) -> {
            return true;
        });
        if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
            rackAwareTaskAssignor.optimizeActiveTasks(sortedSet, sortedMap, assignmentConfigs.rackAwareTrafficCost().orElse(10), assignmentConfigs.rackAwareNonOverlapCost().orElse(1));
        }
    }

    private void assignStandbyReplicaTasks(TreeMap<ProcessId, ClientState> treeMap, Set<TaskId> set, Set<TaskId> set2, RackAwareTaskAssignor rackAwareTaskAssignor, AssignmentConfigs assignmentConfigs) {
        if (assignmentConfigs.numStandbyReplicas() == 0) {
            return;
        }
        StandbyTaskAssignor create = StandbyTaskAssignorFactory.create(assignmentConfigs, null);
        create.assign(treeMap, set, set2, assignmentConfigs);
        Function function = (v0) -> {
            return v0.standbyTasks();
        };
        BiConsumer biConsumer = (v0, v1) -> {
            v0.unassignStandby(v1);
        };
        BiConsumer biConsumer2 = (v0, v1) -> {
            v0.assignStandby(v1);
        };
        create.getClass();
        balanceTasksOverThreads(treeMap, function, biConsumer, biConsumer2, create::isAllowedTaskMovement);
        if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
            int orElse = assignmentConfigs.rackAwareTrafficCost().orElse(10);
            int orElse2 = assignmentConfigs.rackAwareNonOverlapCost().orElse(1);
            create.getClass();
            rackAwareTaskAssignor.optimizeStandbyTasks(treeMap, orElse, orElse2, create::isAllowedTaskMovement);
        }
    }

    private static void balanceTasksOverThreads(SortedMap<ProcessId, ClientState> sortedMap, Function<ClientState, Set<TaskId>> function, BiConsumer<ClientState, TaskId> biConsumer, BiConsumer<ClientState, TaskId> biConsumer2, BiPredicate<ClientState, ClientState> biPredicate) {
        boolean z = true;
        while (z) {
            z = false;
            for (Map.Entry<ProcessId, ClientState> entry : sortedMap.entrySet()) {
                ProcessId key = entry.getKey();
                ClientState value = entry.getValue();
                for (Map.Entry<ProcessId, ClientState> entry2 : sortedMap.entrySet()) {
                    ProcessId key2 = entry2.getKey();
                    ClientState value2 = entry2.getValue();
                    if (!key.equals(key2)) {
                        Iterator it = new TreeSet(function.apply(value)).iterator();
                        while (shouldMoveATask(value, value2) && it.hasNext()) {
                            TaskId taskId = (TaskId) it.next();
                            if (!value2.hasAssignedTask(taskId) && biPredicate.test(value, value2)) {
                                biConsumer.accept(value, taskId);
                                biConsumer2.accept(value2, taskId);
                                z = true;
                            }
                        }
                    }
                }
            }
        }
    }

    private static boolean shouldMoveATask(ClientState clientState, ClientState clientState2) {
        double assignedTaskLoad = clientState.assignedTaskLoad() - clientState2.assignedTaskLoad();
        if (assignedTaskLoad <= 0.0d) {
            return false;
        }
        double assignedTaskCount = ((clientState.assignedTaskCount() - 1.0d) / clientState.capacity()) - ((clientState2.assignedTaskCount() + 1.0d) / clientState2.capacity());
        return assignedTaskCount >= 0.0d && assignedTaskCount < assignedTaskLoad;
    }

    private static void assignStatelessActiveTasks(TreeMap<ProcessId, ClientState> treeMap, Iterable<TaskId> iterable, RackAwareTaskAssignor rackAwareTaskAssignor) {
        ConstrainedPrioritySet constrainedPrioritySet = new ConstrainedPrioritySet((processId, taskId) -> {
            return true;
        }, processId2 -> {
            return Double.valueOf(((ClientState) treeMap.get(processId2)).activeTaskLoad());
        });
        constrainedPrioritySet.offerAll(treeMap.keySet());
        TreeSet treeSet = new TreeSet();
        for (TaskId taskId2 : iterable) {
            treeSet.add(taskId2);
            ProcessId poll = constrainedPrioritySet.poll(taskId2);
            treeMap.get(poll).assignActive(taskId2);
            constrainedPrioritySet.offer(poll);
        }
        if (rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
            rackAwareTaskAssignor.optimizeActiveTasks(treeSet, treeMap, 1, 0);
        }
    }

    private static Map<TaskId, SortedSet<ProcessId>> tasksToCaughtUpClients(Set<TaskId> set, Map<ProcessId, ClientState> map, long j) {
        HashMap hashMap = new HashMap();
        for (TaskId taskId : set) {
            TreeSet treeSet = new TreeSet();
            for (Map.Entry<ProcessId, ClientState> entry : map.entrySet()) {
                ProcessId key = entry.getKey();
                long lagFor = entry.getValue().lagFor(taskId);
                if (activeRunning(lagFor) || unbounded(j) || acceptable(j, lagFor)) {
                    treeSet.add(key);
                }
            }
            hashMap.put(taskId, treeSet);
        }
        return hashMap;
    }

    private static Map<TaskId, SortedSet<ProcessId>> tasksToClientByLag(Set<TaskId> set, Map<ProcessId, ClientState> map) {
        HashMap hashMap = new HashMap();
        for (TaskId taskId : set) {
            TreeSet treeSet = new TreeSet(Comparator.comparingLong(processId -> {
                return ((ClientState) map.get(processId)).lagFor(taskId);
            }).thenComparing(processId2 -> {
                return processId2;
            }));
            treeSet.addAll(map.keySet());
            hashMap.put(taskId, treeSet);
        }
        return hashMap;
    }

    private static boolean unbounded(long j) {
        return j == NetworkClientDelegate.PollResult.WAIT_FOREVER;
    }

    private static boolean acceptable(long j, long j2) {
        return j2 >= 0 && j2 <= j;
    }

    private static boolean activeRunning(long j) {
        return j == -2;
    }
}
