package org.apache.kafka.streams;

import java.lang.Thread;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MemberToRemove;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.1.2.jar:org/apache/kafka/streams/KafkaStreams.class */
public class KafkaStreams implements AutoCloseable {
    private static final String JMX_PREFIX = "kafka.streams";
    private static final Set<Class<? extends Throwable>> EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS = new HashSet(Arrays.asList(IllegalStateException.class, IllegalArgumentException.class));
    private final Time time;
    private final Logger log;
    private final String clientId;
    private final Metrics metrics;
    private final StreamsConfig config;
    protected final List<StreamThread> threads;
    protected final StateDirectory stateDirectory;
    private final StreamsMetadataState streamsMetadataState;
    private final ScheduledExecutorService stateDirCleaner;
    private final ScheduledExecutorService rocksDBMetricsRecordingService;
    private final Admin adminClient;
    private final StreamsMetricsImpl streamsMetrics;
    private final long totalCacheSize;
    private final StreamStateListener streamStateListener;
    private final StateRestoreListener delegatingStateRestoreListener;
    private final Map<Long, StreamThread.State> threadState;
    private final UUID processId;
    private final KafkaClientSupplier clientSupplier;
    protected final TopologyMetadata topologyMetadata;
    private final QueryableStoreProvider queryableStoreProvider;
    GlobalStreamThread globalStreamThread;
    private StateListener stateListener;
    private StateRestoreListener globalStateRestoreListener;
    private boolean oldHandler;
    private Consumer<Throwable> streamsUncaughtExceptionHandler;
    private final Object changeThreadCount;
    private final Object stateLock;
    protected volatile State state;

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.1.2.jar:org/apache/kafka/streams/KafkaStreams$DelegatingStateRestoreListener.class */
    final class DelegatingStateRestoreListener implements StateRestoreListener {
        DelegatingStateRestoreListener() {
        }

        private void throwOnFatalException(Exception exc, TopicPartition topicPartition, String str) {
            throw new StreamsException(String.format("Fatal user code error in store restore listener for store %s, partition %s.", str, topicPartition), exc);
        }

        @Override // org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreStart(TopicPartition topicPartition, String str, long j, long j2) {
            if (KafkaStreams.this.globalStateRestoreListener != null) {
                try {
                    KafkaStreams.this.globalStateRestoreListener.onRestoreStart(topicPartition, str, j, j2);
                } catch (Exception e) {
                    throwOnFatalException(e, topicPartition, str);
                }
            }
        }

        @Override // org.apache.kafka.streams.processor.StateRestoreListener
        public void onBatchRestored(TopicPartition topicPartition, String str, long j, long j2) {
            if (KafkaStreams.this.globalStateRestoreListener != null) {
                try {
                    KafkaStreams.this.globalStateRestoreListener.onBatchRestored(topicPartition, str, j, j2);
                } catch (Exception e) {
                    throwOnFatalException(e, topicPartition, str);
                }
            }
        }

        @Override // org.apache.kafka.streams.processor.StateRestoreListener
        public void onRestoreEnd(TopicPartition topicPartition, String str, long j) {
            if (KafkaStreams.this.globalStateRestoreListener != null) {
                try {
                    KafkaStreams.this.globalStateRestoreListener.onRestoreEnd(topicPartition, str, j);
                } catch (Exception e) {
                    throwOnFatalException(e, topicPartition, str);
                }
            }
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.1.2.jar:org/apache/kafka/streams/KafkaStreams$State.class */
    public enum State {
        CREATED(1, 3),
        REBALANCING(2, 3, 5),
        RUNNING(1, 2, 3, 5),
        PENDING_SHUTDOWN(4),
        NOT_RUNNING(new Integer[0]),
        PENDING_ERROR(6),
        ERROR(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet();

        State(Integer... numArr) {
            this.validTransitions.addAll(Arrays.asList(numArr));
        }

        public boolean hasNotStarted() {
            return equals(CREATED);
        }

        public boolean isRunningOrRebalancing() {
            return equals(RUNNING) || equals(REBALANCING);
        }

        public boolean isShuttingDown() {
            return equals(PENDING_SHUTDOWN) || equals(PENDING_ERROR);
        }

        public boolean hasCompletedShutdown() {
            return equals(NOT_RUNNING) || equals(ERROR);
        }

        public boolean hasStartedOrFinishedShuttingDown() {
            return isShuttingDown() || hasCompletedShutdown();
        }

        public boolean isValidTransition(State state) {
            return this.validTransitions.contains(Integer.valueOf(state.ordinal()));
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.1.2.jar:org/apache/kafka/streams/KafkaStreams$StateListener.class */
    public interface StateListener {
        void onChange(State state, State state2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.1.2.jar:org/apache/kafka/streams/KafkaStreams$StreamStateListener.class */
    public final class StreamStateListener implements StreamThread.StateListener {
        private final Map<Long, StreamThread.State> threadState;
        private GlobalStreamThread.State globalThreadState;
        private final Object threadStatesLock = new Object();

        StreamStateListener(Map<Long, StreamThread.State> map, GlobalStreamThread.State state) {
            this.threadState = map;
            this.globalThreadState = state;
        }

        private void maybeSetRunning() {
            for (StreamThread.State state : this.threadState.values()) {
                if (state != StreamThread.State.RUNNING && state != StreamThread.State.DEAD) {
                    return;
                }
            }
            if (this.globalThreadState == null || this.globalThreadState == GlobalStreamThread.State.RUNNING) {
                KafkaStreams.this.setState(State.RUNNING);
            }
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.StateListener
        public synchronized void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2) {
            synchronized (this.threadStatesLock) {
                if (thread instanceof StreamThread) {
                    StreamThread.State state = (StreamThread.State) threadStateTransitionValidator;
                    this.threadState.put(Long.valueOf(thread.getId()), state);
                    if (state == StreamThread.State.PARTITIONS_REVOKED || state == StreamThread.State.PARTITIONS_ASSIGNED) {
                        KafkaStreams.this.setState(State.REBALANCING);
                    } else if (state == StreamThread.State.RUNNING) {
                        maybeSetRunning();
                    }
                } else if (thread instanceof GlobalStreamThread) {
                    GlobalStreamThread.State state2 = (GlobalStreamThread.State) threadStateTransitionValidator;
                    this.globalThreadState = state2;
                    if (state2 == GlobalStreamThread.State.RUNNING) {
                        maybeSetRunning();
                    } else if (state2 == GlobalStreamThread.State.DEAD) {
                        KafkaStreams.this.log.error("Global thread has died. The streams application or client will now close to ERROR.");
                        KafkaStreams.this.closeToError();
                    }
                }
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    private boolean waitOnState(State state, long j) {
        long milliseconds = this.time.milliseconds();
        synchronized (this.stateLock) {
            boolean z = false;
            long j2 = 0;
            while (this.state != state) {
                try {
                    if (j <= j2) {
                        this.log.debug("Cannot transit to {} within {}ms", state, Long.valueOf(j));
                        if (z) {
                            Thread.currentThread().interrupt();
                        }
                        return false;
                    }
                    try {
                        this.stateLock.wait(j - j2);
                    } catch (InterruptedException e) {
                        z = true;
                    }
                    j2 = this.time.milliseconds() - milliseconds;
                } catch (Throwable th) {
                    if (z) {
                        Thread.currentThread().interrupt();
                    }
                    throw th;
                }
            }
            if (z) {
                Thread.currentThread().interrupt();
            }
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean setState(State state) {
        synchronized (this.stateLock) {
            State state2 = this.state;
            if (this.state == State.PENDING_SHUTDOWN && state != State.NOT_RUNNING) {
                return false;
            }
            if (this.state == State.NOT_RUNNING && (state == State.PENDING_SHUTDOWN || state == State.NOT_RUNNING)) {
                return false;
            }
            if (this.state == State.REBALANCING && state == State.REBALANCING) {
                return false;
            }
            if (this.state == State.ERROR && (state == State.PENDING_ERROR || state == State.ERROR)) {
                return false;
            }
            if (this.state == State.PENDING_ERROR && state != State.ERROR) {
                return false;
            }
            if (!this.state.isValidTransition(state)) {
                throw new IllegalStateException("Stream-client " + this.clientId + ": Unexpected state transition from " + state2 + " to " + state);
            }
            this.log.info("State transition from {} to {}", state2, state);
            this.state = state;
            this.stateLock.notifyAll();
            if (this.stateListener == null) {
                return true;
            }
            this.stateListener.onChange(state, state2);
            return true;
        }
    }

    public State state() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isRunningOrRebalancing() {
        boolean isRunningOrRebalancing;
        synchronized (this.stateLock) {
            isRunningOrRebalancing = this.state.isRunningOrRebalancing();
        }
        return isRunningOrRebalancing;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasStartedOrFinishedShuttingDown() {
        boolean hasStartedOrFinishedShuttingDown;
        synchronized (this.stateLock) {
            hasStartedOrFinishedShuttingDown = this.state.hasStartedOrFinishedShuttingDown();
        }
        return hasStartedOrFinishedShuttingDown;
    }

    private void validateIsRunningOrRebalancing() {
        synchronized (this.stateLock) {
            if (this.state.hasNotStarted()) {
                throw new StreamsNotStartedException("KafkaStreams has not been started, you can retry after calling start()");
            }
            if (!this.state.isRunningOrRebalancing()) {
                throw new IllegalStateException("KafkaStreams is not running. State is " + this.state + ".");
            }
        }
    }

    public void setStateListener(StateListener stateListener) {
        synchronized (this.stateLock) {
            if (!this.state.hasNotStarted()) {
                throw new IllegalStateException("Can only set StateListener before calling start(). Current state is: " + this.state);
            }
            this.stateListener = stateListener;
        }
    }

    @Deprecated
    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        synchronized (this.stateLock) {
            if (!this.state.hasNotStarted()) {
                throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). Current state is: " + this.state);
            }
            this.oldHandler = true;
            processStreamThread(streamThread -> {
                streamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
            });
            if (this.globalStreamThread != null) {
                this.globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
            }
        }
    }

    public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        Consumer<Throwable> consumer = th -> {
            handleStreamsUncaughtException(th, streamsUncaughtExceptionHandler);
        };
        synchronized (this.stateLock) {
            if (!this.state.hasNotStarted()) {
                throw new IllegalStateException("Can only set UncaughtExceptionHandler before calling start(). Current state is: " + this.state);
            }
            this.streamsUncaughtExceptionHandler = consumer;
            Objects.requireNonNull(streamsUncaughtExceptionHandler);
            processStreamThread(streamThread -> {
                streamThread.setStreamsUncaughtExceptionHandler(consumer);
            });
            if (this.globalStreamThread != null) {
                this.globalStreamThread.setUncaughtExceptionHandler(consumer);
            }
        }
    }

    private void defaultStreamsUncaughtExceptionHandler(Throwable th) {
        if (!this.oldHandler) {
            handleStreamsUncaughtException(th, th2 -> {
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
            });
            return;
        }
        this.threads.remove(Thread.currentThread());
        if (th instanceof RuntimeException) {
            throw ((RuntimeException) th);
        }
        if (!(th instanceof Error)) {
            throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", th);
        }
        throw ((Error) th);
    }

    private void replaceStreamThread(Throwable th) {
        if (this.globalStreamThread != null && Thread.currentThread().getName().equals(this.globalStreamThread.getName())) {
            this.log.warn("The global thread cannot be replaced. Reverting to shutting down the client.");
            this.log.error("Encountered the following exception during processing  The streams client is going to shut down now. ", th);
            closeToError();
        }
        ((StreamThread) Thread.currentThread()).shutdown();
        addStreamThread();
        if (th instanceof RuntimeException) {
            throw ((RuntimeException) th);
        }
        if (!(th instanceof Error)) {
            throw new RuntimeException("Unexpected checked exception caught in the uncaught exception handler", th);
        }
        throw ((Error) th);
    }

    private boolean wrappedExceptionIsIn(Throwable th, Set<Class<? extends Throwable>> set) {
        return th.getCause() != null && set.contains(th.getCause().getClass());
    }

    private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(Throwable th, StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        return wrappedExceptionIsIn(th, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS) ? StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT : streamsUncaughtExceptionHandler.handle(th);
    }

    private void handleStreamsUncaughtException(Throwable th, StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse actionForThrowable = getActionForThrowable(th, streamsUncaughtExceptionHandler);
        if (this.oldHandler) {
            this.log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler.The old handler will be ignored as long as a new handler is set.");
        }
        switch (actionForThrowable) {
            case REPLACE_THREAD:
                this.log.error("Replacing thread in the streams uncaught exception handler", th);
                replaceStreamThread(th);
                return;
            case SHUTDOWN_CLIENT:
                this.log.error("Encountered the following exception during processing and Kafka Streams opted to " + actionForThrowable + ". The streams client is going to shut down now. ", th);
                closeToError();
                return;
            case SHUTDOWN_APPLICATION:
                if (getNumLiveStreamThreads() == 1) {
                    this.log.warn("Attempt to shut down the application requires adding a thread to communicate the shutdown. No processing will be done on this thread");
                    addStreamThread();
                }
                if (th instanceof Error) {
                    this.log.error("This option requires running threads to shut down the application.but the uncaught exception was an Error, which means this runtime is no longer in a well-defined state. Attempting to send the shutdown command anyway.", th);
                }
                if (Thread.currentThread().equals(this.globalStreamThread) && getNumLiveStreamThreads() == 0) {
                    this.log.error("Exception in global thread caused the application to attempt to shutdown. This action will succeed only if there is at least one StreamThread running on this client. Currently there are no running threads so will now close the client.");
                    closeToError();
                    return;
                } else {
                    processStreamThread(streamThread -> {
                        streamThread.sendShutdownRequest(AssignorError.SHUTDOWN_REQUESTED);
                    });
                    this.log.error("Encountered the following exception during processing and sent shutdown request for the entire application.", th);
                    return;
                }
            default:
                return;
        }
    }

    public void setGlobalStateRestoreListener(StateRestoreListener stateRestoreListener) {
        synchronized (this.stateLock) {
            if (!this.state.hasNotStarted()) {
                throw new IllegalStateException("Can only set GlobalStateRestoreListener before calling start(). Current state is: " + this.state);
            }
            this.globalStateRestoreListener = stateRestoreListener;
        }
    }

    public Map<MetricName, ? extends Metric> metrics() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        processStreamThread(streamThread -> {
            linkedHashMap.putAll(streamThread.producerMetrics());
            linkedHashMap.putAll(streamThread.consumerMetrics());
            linkedHashMap.putAll(streamThread.adminClientMetrics());
        });
        if (this.globalStreamThread != null) {
            linkedHashMap.putAll(this.globalStreamThread.consumerMetrics());
        }
        linkedHashMap.putAll(this.metrics.metrics());
        return Collections.unmodifiableMap(linkedHashMap);
    }

    public KafkaStreams(Topology topology, Properties properties) {
        this(topology, new StreamsConfig(properties), new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(Topology topology, Properties properties, KafkaClientSupplier kafkaClientSupplier) {
        this(topology, new StreamsConfig(properties), kafkaClientSupplier, Time.SYSTEM);
    }

    public KafkaStreams(Topology topology, Properties properties, Time time) {
        this(topology, new StreamsConfig(properties), new DefaultKafkaClientSupplier(), time);
    }

    public KafkaStreams(Topology topology, Properties properties, KafkaClientSupplier kafkaClientSupplier, Time time) {
        this(topology, new StreamsConfig(properties), kafkaClientSupplier, time);
    }

    public KafkaStreams(Topology topology, StreamsConfig streamsConfig) {
        this(topology, streamsConfig, new DefaultKafkaClientSupplier());
    }

    public KafkaStreams(Topology topology, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier) {
        this(new TopologyMetadata(topology.internalTopologyBuilder, streamsConfig), streamsConfig, kafkaClientSupplier);
    }

    public KafkaStreams(Topology topology, StreamsConfig streamsConfig, Time time) {
        this(new TopologyMetadata(topology.internalTopologyBuilder, streamsConfig), streamsConfig, new DefaultKafkaClientSupplier(), time);
    }

    private KafkaStreams(Topology topology, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, Time time) throws StreamsException {
        this(new TopologyMetadata(topology.internalTopologyBuilder, streamsConfig), streamsConfig, kafkaClientSupplier, time);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaStreams(TopologyMetadata topologyMetadata, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier) throws StreamsException {
        this(topologyMetadata, streamsConfig, kafkaClientSupplier, Time.SYSTEM);
    }

    private KafkaStreams(TopologyMetadata topologyMetadata, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, Time time) throws StreamsException {
        this.changeThreadCount = new Object();
        this.stateLock = new Object();
        this.state = State.CREATED;
        this.config = streamsConfig;
        this.time = time;
        this.topologyMetadata = topologyMetadata;
        this.topologyMetadata.buildAndRewriteTopology();
        boolean hasGlobalTopology = topologyMetadata.hasGlobalTopology();
        try {
            this.stateDirectory = new StateDirectory(streamsConfig, time, topologyMetadata.hasPersistentStores(), topologyMetadata.hasNamedTopologies());
            this.processId = this.stateDirectory.initializeProcessId();
            String string = streamsConfig.getString("client.id");
            String string2 = streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
            if (string.length() <= 0) {
                this.clientId = string2 + "-" + this.processId;
            } else {
                this.clientId = string;
            }
            this.log = new LogContext(String.format("stream-client [%s] ", this.clientId)).logger(getClass());
            this.clientSupplier = kafkaClientSupplier;
            this.adminClient = kafkaClientSupplier.getAdmin(streamsConfig.getAdminConfigs(ClientUtils.getSharedAdminClientId(this.clientId)));
            this.log.info("Kafka Streams version: {}", ClientMetrics.version());
            this.log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId());
            this.metrics = getMetrics(streamsConfig, time, this.clientId);
            this.streamsMetrics = new StreamsMetricsImpl(this.metrics, this.clientId, streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), time);
            ClientMetrics.addVersionMetric(this.streamsMetrics);
            ClientMetrics.addCommitIdMetric(this.streamsMetrics);
            ClientMetrics.addApplicationIdMetric(this.streamsMetrics, streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
            ClientMetrics.addTopologyDescriptionMetric(this.streamsMetrics, (metricConfig, j) -> {
                return this.topologyMetadata.topologyDescriptionString();
            });
            ClientMetrics.addStateMetric(this.streamsMetrics, (metricConfig2, j2) -> {
                return this.state;
            });
            this.threads = Collections.synchronizedList(new LinkedList());
            ClientMetrics.addNumAliveStreamThreadMetric(this.streamsMetrics, (metricConfig3, j3) -> {
                return Integer.valueOf(getNumLiveStreamThreads());
            });
            this.streamsMetadataState = new StreamsMetadataState(this.topologyMetadata, parseHostInfo(streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
            this.oldHandler = false;
            this.streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
            this.delegatingStateRestoreListener = new DelegatingStateRestoreListener();
            this.totalCacheSize = streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG).longValue();
            int numStreamThreads = topologyMetadata.getNumStreamThreads(streamsConfig);
            long cacheSizePerThread = getCacheSizePerThread(numStreamThreads);
            GlobalStreamThread.State state = null;
            if (hasGlobalTopology) {
                this.globalStreamThread = new GlobalStreamThread(topologyMetadata.globalTaskTopology(), streamsConfig, kafkaClientSupplier.getGlobalConsumer(streamsConfig.getGlobalConsumerConfigs(this.clientId)), this.stateDirectory, cacheSizePerThread, this.streamsMetrics, time, this.clientId + "-GlobalStreamThread", this.delegatingStateRestoreListener, this.streamsUncaughtExceptionHandler);
                state = this.globalStreamThread.state();
            }
            this.threadState = new HashMap(numStreamThreads);
            this.streamStateListener = new StreamStateListener(this.threadState, state);
            GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(this.topologyMetadata.globalStateStores());
            if (hasGlobalTopology) {
                this.globalStreamThread.setStateListener(this.streamStateListener);
            }
            this.queryableStoreProvider = new QueryableStoreProvider(globalStateStoreProvider);
            for (int i = 1; i <= numStreamThreads; i++) {
                createAndAddStreamThread(cacheSizePerThread, i);
            }
            this.stateDirCleaner = setupStateDirCleaner();
            this.rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(this.clientId, streamsConfig);
        } catch (ProcessorStateException e) {
            throw new StreamsException(e);
        }
    }

    private StreamThread createAndAddStreamThread(long j, int i) {
        StreamThread create = StreamThread.create(this.topologyMetadata, this.config, this.clientSupplier, this.adminClient, this.processId, this.clientId, this.streamsMetrics, this.time, this.streamsMetadataState, j, this.stateDirectory, this.delegatingStateRestoreListener, i, this::closeToError, this.streamsUncaughtExceptionHandler);
        create.setStateListener(this.streamStateListener);
        this.threads.add(create);
        this.threadState.put(Long.valueOf(create.getId()), create.state());
        this.queryableStoreProvider.addStoreProviderForThread(create.getName(), new StreamThreadStateStoreProvider(create));
        return create;
    }

    private static Metrics getMetrics(StreamsConfig streamsConfig, Time time, String str) {
        MetricConfig timeWindow = new MetricConfig().samples(streamsConfig.getInt("metrics.num.samples").intValue()).recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString("metrics.recording.level"))).timeWindow(streamsConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        List configuredInstances = streamsConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", str));
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.configure(streamsConfig.originals());
        configuredInstances.add(jmxReporter);
        return new Metrics(timeWindow, (List<MetricsReporter>) configuredInstances, time, new KafkaMetricsContext(JMX_PREFIX, streamsConfig.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)));
    }

    public Optional<String> addStreamThread() {
        StreamThread createAndAddStreamThread;
        if (!isRunningOrRebalancing()) {
            this.log.warn("Cannot add a stream thread when Kafka Streams client is in state {}", this.state);
            return Optional.empty();
        }
        synchronized (this.changeThreadCount) {
            int nextThreadIndex = getNextThreadIndex();
            int numLiveStreamThreads = getNumLiveStreamThreads();
            long cacheSizePerThread = getCacheSizePerThread(numLiveStreamThreads + 1);
            this.log.info("Adding StreamThread-{}, there will now be {} live threads and the new cache size per thread is {}", Integer.valueOf(nextThreadIndex), Integer.valueOf(numLiveStreamThreads + 1), Long.valueOf(cacheSizePerThread));
            resizeThreadCache(cacheSizePerThread);
            createAndAddStreamThread = createAndAddStreamThread(cacheSizePerThread, nextThreadIndex);
        }
        synchronized (this.stateLock) {
            if (isRunningOrRebalancing()) {
                createAndAddStreamThread.start();
                return Optional.of(createAndAddStreamThread.getName());
            }
            this.log.warn("Terminating the new thread because the Kafka Streams client is in state {}", this.state);
            createAndAddStreamThread.shutdown();
            this.threads.remove(createAndAddStreamThread);
            long cacheSizePerThread2 = getCacheSizePerThread(getNumLiveStreamThreads());
            this.log.info("Resizing thread cache due to terminating added thread, new cache size per thread is {}", Long.valueOf(cacheSizePerThread2));
            resizeThreadCache(cacheSizePerThread2);
            return Optional.empty();
        }
    }

    public Optional<String> removeStreamThread() {
        return removeStreamThread(Long.MAX_VALUE);
    }

    public Optional<String> removeStreamThread(Duration duration) {
        return removeStreamThread(ApiUtils.validateMillisecondDuration(duration, ApiUtils.prepareMillisCheckFailMsgPrefix(duration, "timeout")));
    }

    private Optional<String> removeStreamThread(long j) throws TimeoutException {
        long milliseconds = this.time.milliseconds();
        if (isRunningOrRebalancing()) {
            synchronized (this.changeThreadCount) {
                Iterator it = new ArrayList(this.threads).iterator();
                while (it.hasNext()) {
                    StreamThread streamThread = (StreamThread) it.next();
                    boolean z = !streamThread.getName().equals(Thread.currentThread().getName());
                    if (streamThread.isAlive() && (z || getNumLiveStreamThreads() == 1)) {
                        this.log.info("Removing StreamThread " + streamThread.getName());
                        Optional<String> groupInstanceID = streamThread.getGroupInstanceID();
                        streamThread.requestLeaveGroupDuringShutdown();
                        streamThread.shutdown();
                        if (streamThread.getName().equals(Thread.currentThread().getName())) {
                            this.log.info("{} is the last remaining thread and must remove itself, therefore we cannot wait for it to complete shutdown as this will result in deadlock.", streamThread.getName());
                        } else {
                            long milliseconds2 = j - (this.time.milliseconds() - milliseconds);
                            if (milliseconds2 <= 0 || !streamThread.waitOnThreadState(StreamThread.State.DEAD, milliseconds2)) {
                                this.log.warn("{} did not shutdown in the allotted time.", streamThread.getName());
                            } else {
                                this.log.info("Successfully removed {} in {}ms", streamThread.getName(), Long.valueOf(this.time.milliseconds() - milliseconds));
                                this.threads.remove(streamThread);
                                this.queryableStoreProvider.removeStoreProviderForThread(streamThread.getName());
                            }
                        }
                        long cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads());
                        this.log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", Long.valueOf(cacheSizePerThread));
                        resizeThreadCache(cacheSizePerThread);
                        if (groupInstanceID.isPresent() && z) {
                            MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get());
                            try {
                                this.adminClient.removeMembersFromConsumerGroup(this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG), new RemoveMembersFromConsumerGroupOptions(Collections.singletonList(memberToRemove))).memberResult(memberToRemove).get(j - (this.time.milliseconds() - milliseconds), TimeUnit.MILLISECONDS);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            } catch (ExecutionException e2) {
                                this.log.error("Could not remove static member {} from consumer group {} due to: {}", groupInstanceID.get(), this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e2);
                                throw new StreamsException("Could not remove static member " + groupInstanceID.get() + " from consumer group " + this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG) + " for the following reason: ", e2.getCause());
                            } catch (java.util.concurrent.TimeoutException e3) {
                                this.log.error("Could not remove static member {} from consumer group {} due to a timeout: {}", groupInstanceID.get(), this.config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e3);
                                throw new TimeoutException(e3.getMessage(), e3);
                            }
                        }
                        if (j - (this.time.milliseconds() - milliseconds) <= 0) {
                            throw new TimeoutException("Thread " + streamThread.getName() + " did not stop in the allotted time");
                        }
                        return Optional.of(streamThread.getName());
                    }
                }
                this.log.warn("There are no threads eligible for removal");
            }
        } else {
            this.log.warn("Cannot remove a stream thread when Kafka Streams client is in state  " + state());
        }
        return Optional.empty();
    }

    private int getNumLiveStreamThreads() {
        int i;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        synchronized (this.threads) {
            processStreamThread(streamThread -> {
                if (streamThread.state() == StreamThread.State.DEAD) {
                    this.log.debug("Trimming thread {} from the threads list since it's state is {}", streamThread.getName(), StreamThread.State.DEAD);
                    this.threads.remove(streamThread);
                } else if (streamThread.state() == StreamThread.State.PENDING_SHUTDOWN) {
                    this.log.debug("Skipping thread {} from num live threads computation since it's state is {}", streamThread.getName(), StreamThread.State.PENDING_SHUTDOWN);
                } else {
                    atomicInteger.incrementAndGet();
                }
            });
            i = atomicInteger.get();
        }
        return i;
    }

    private int getNextThreadIndex() {
        HashSet hashSet = new HashSet();
        AtomicInteger atomicInteger = new AtomicInteger(1);
        synchronized (this.threads) {
            processStreamThread(streamThread -> {
                if (streamThread.state() == StreamThread.State.DEAD) {
                    this.threads.remove(streamThread);
                    return;
                }
                hashSet.add(streamThread.getName());
                int parseInt = Integer.parseInt(streamThread.getName().substring(streamThread.getName().lastIndexOf("-") + 1));
                if (parseInt > atomicInteger.get()) {
                    atomicInteger.set(parseInt);
                }
            });
            String str = this.clientId + "-StreamThread-";
            for (int i = 1; i <= atomicInteger.get(); i++) {
                if (!hashSet.contains(str + i)) {
                    return i;
                }
            }
            return this.threads.size() + 1;
        }
    }

    private long getCacheSizePerThread(int i) {
        if (i == 0) {
            return this.totalCacheSize;
        }
        return this.totalCacheSize / (i + (this.topologyMetadata.hasGlobalTopology() ? 1 : 0));
    }

    private void resizeThreadCache(long j) {
        processStreamThread(streamThread -> {
            streamThread.resizeCache(j);
        });
        if (this.globalStreamThread != null) {
            this.globalStreamThread.resize(j);
        }
    }

    private ScheduledExecutorService setupStateDirCleaner() {
        return Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(runnable, this.clientId + "-CleanupThread");
            thread.setDaemon(true);
            return thread;
        });
    }

    private static ScheduledExecutorService maybeCreateRocksDBMetricsRecordingService(String str, StreamsConfig streamsConfig) {
        if (Sensor.RecordingLevel.forName(streamsConfig.getString("metrics.recording.level")) == Sensor.RecordingLevel.DEBUG) {
            return Executors.newSingleThreadScheduledExecutor(runnable -> {
                Thread thread = new Thread(runnable, str + "-RocksDBMetricsRecordingTrigger");
                thread.setDaemon(true);
                return thread;
            });
        }
        return null;
    }

    private static HostInfo parseHostInfo(String str) {
        HostInfo buildFromEndpoint = HostInfo.buildFromEndpoint(str);
        return buildFromEndpoint == null ? StreamsMetadataState.UNKNOWN_HOST : buildFromEndpoint;
    }

    public synchronized void start() throws IllegalStateException, StreamsException {
        if (!setState(State.REBALANCING)) {
            throw new IllegalStateException("The client is either already started or already stopped, cannot re-start");
        }
        this.log.debug("Starting Streams client");
        if (this.globalStreamThread != null) {
            this.globalStreamThread.start();
        }
        processStreamThread((v0) -> {
            v0.start();
        });
        Long l = this.config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
        this.stateDirCleaner.scheduleAtFixedRate(() -> {
            if (this.state == State.RUNNING) {
                this.stateDirectory.cleanRemovedTasks(l.longValue());
            }
        }, l.longValue(), l.longValue(), TimeUnit.MILLISECONDS);
        if (this.rocksDBMetricsRecordingService != null) {
            this.rocksDBMetricsRecordingService.scheduleAtFixedRate(this.streamsMetrics.rocksDBMetricsRecordingTrigger(), 0L, 1L, TimeUnit.MINUTES);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        close(Long.MAX_VALUE);
    }

    private Thread shutdownHelper(boolean z) {
        this.stateDirCleaner.shutdownNow();
        if (this.rocksDBMetricsRecordingService != null) {
            this.rocksDBMetricsRecordingService.shutdownNow();
        }
        return new Thread(() -> {
            processStreamThread((v0) -> {
                v0.shutdown();
            });
            this.topologyMetadata.wakeupThreads();
            processStreamThread(streamThread -> {
                try {
                    if (!streamThread.isRunning()) {
                        streamThread.join();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
            if (this.globalStreamThread != null) {
                this.globalStreamThread.shutdown();
            }
            if (this.globalStreamThread != null && !this.globalStreamThread.stillRunning()) {
                try {
                    this.globalStreamThread.join();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                this.globalStreamThread = null;
            }
            this.stateDirectory.close();
            this.adminClient.close();
            this.streamsMetrics.removeAllClientLevelSensorsAndMetrics();
            this.metrics.close();
            if (z) {
                setState(State.ERROR);
            } else {
                setState(State.NOT_RUNNING);
            }
        }, "kafka-streams-close-thread");
    }

    private boolean close(long j) {
        if (this.state.hasCompletedShutdown()) {
            this.log.info("Streams client is already in the terminal {} state, all resources are closed and the client has stopped.", this.state);
            return true;
        }
        if (this.state.isShuttingDown()) {
            this.log.info("Streams client is in {}, all resources are being closed and the client will be stopped.", this.state);
            if (this.state == State.PENDING_ERROR && waitOnState(State.ERROR, j)) {
                this.log.info("Streams client stopped to ERROR completely");
                return true;
            }
            if (this.state == State.PENDING_SHUTDOWN && waitOnState(State.NOT_RUNNING, j)) {
                this.log.info("Streams client stopped to NOT_RUNNING completely");
                return true;
            }
            this.log.warn("Streams client cannot transition to {}} completely within the timeout", this.state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING : State.ERROR);
            return false;
        }
        if (!setState(State.PENDING_SHUTDOWN)) {
            this.log.error("Failed to transition to PENDING_SHUTDOWN, current state is {}", this.state);
            throw new StreamsException("Failed to shut down while in state " + this.state);
        }
        Thread shutdownHelper = shutdownHelper(false);
        shutdownHelper.setDaemon(true);
        shutdownHelper.start();
        if (waitOnState(State.NOT_RUNNING, j)) {
            this.log.info("Streams client stopped completely");
            return true;
        }
        this.log.info("Streams client cannot stop completely within the timeout");
        return false;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeToError() {
        if (!setState(State.PENDING_ERROR)) {
            this.log.info("Skipping shutdown since we are already in " + state());
            return;
        }
        Thread shutdownHelper = shutdownHelper(true);
        shutdownHelper.setDaemon(true);
        shutdownHelper.start();
    }

    public synchronized boolean close(Duration duration) throws IllegalArgumentException {
        long validateMillisecondDuration = ApiUtils.validateMillisecondDuration(duration, ApiUtils.prepareMillisCheckFailMsgPrefix(duration, "timeout"));
        if (validateMillisecondDuration < 0) {
            throw new IllegalArgumentException("Timeout can't be negative.");
        }
        this.log.debug("Stopping Streams client with timeoutMillis = {} ms.", Long.valueOf(validateMillisecondDuration));
        return close(validateMillisecondDuration);
    }

    public void cleanUp() {
        if (!this.state.hasNotStarted() && !this.state.hasCompletedShutdown()) {
            throw new IllegalStateException("Cannot clean up while running.");
        }
        this.stateDirectory.clean();
    }

    @Deprecated
    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
        validateIsRunningOrRebalancing();
        return (Collection) this.streamsMetadataState.getAllMetadata().stream().map(streamsMetadata -> {
            return new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(), streamsMetadata.stateStoreNames(), streamsMetadata.topicPartitions(), streamsMetadata.standbyStateStoreNames(), streamsMetadata.standbyTopicPartitions());
        }).collect(Collectors.toSet());
    }

    public Collection<StreamsMetadata> metadataForAllStreamsClients() {
        validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getAllMetadata();
    }

    @Deprecated
    public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(String str) {
        validateIsRunningOrRebalancing();
        return (Collection) this.streamsMetadataState.getAllMetadataForStore(str).stream().map(streamsMetadata -> {
            return new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(), streamsMetadata.stateStoreNames(), streamsMetadata.topicPartitions(), streamsMetadata.standbyStateStoreNames(), streamsMetadata.standbyTopicPartitions());
        }).collect(Collectors.toSet());
    }

    public Collection<StreamsMetadata> streamsMetadataForStore(String str) {
        validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getAllMetadataForStore(str);
    }

    public <K> KeyQueryMetadata queryMetadataForKey(String str, K k, Serializer<K> serializer) {
        validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getKeyQueryMetadataForKey(str, (String) k, (Serializer<String>) serializer);
    }

    public <K> KeyQueryMetadata queryMetadataForKey(String str, K k, StreamPartitioner<? super K, ?> streamPartitioner) {
        validateIsRunningOrRebalancing();
        return this.streamsMetadataState.getKeyQueryMetadataForKey(str, (String) k, (StreamPartitioner<? super String, ?>) streamPartitioner);
    }

    public <T> T store(StoreQueryParameters<T> storeQueryParameters) {
        validateIsRunningOrRebalancing();
        String storeName = storeQueryParameters.storeName();
        if (this.topologyMetadata.hasStore(storeName)) {
            return (T) this.queryableStoreProvider.getStore(storeQueryParameters);
        }
        throw new UnknownStateStoreException("Cannot get state store " + storeName + " because no such store is registered in the topology.");
    }

    protected void processStreamThread(Consumer<StreamThread> consumer) {
        Iterator it = new ArrayList(this.threads).iterator();
        while (it.hasNext()) {
            consumer.accept((StreamThread) it.next());
        }
    }

    @Deprecated
    public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() {
        return (Set) metadataForLocalThreads().stream().map(threadMetadata -> {
            return new org.apache.kafka.streams.processor.ThreadMetadata(threadMetadata.threadName(), threadMetadata.threadState(), threadMetadata.consumerClientId(), threadMetadata.restoreConsumerClientId(), threadMetadata.producerClientIds(), threadMetadata.adminClientId(), (Set) threadMetadata.activeTasks().stream().map(taskMetadata -> {
                return new org.apache.kafka.streams.processor.TaskMetadata(taskMetadata.taskId().toString(), taskMetadata.topicPartitions(), taskMetadata.committedOffsets(), taskMetadata.endOffsets(), taskMetadata.timeCurrentIdlingStarted());
            }).collect(Collectors.toSet()), (Set) threadMetadata.standbyTasks().stream().map(taskMetadata2 -> {
                return new org.apache.kafka.streams.processor.TaskMetadata(taskMetadata2.taskId().toString(), taskMetadata2.topicPartitions(), taskMetadata2.committedOffsets(), taskMetadata2.endOffsets(), taskMetadata2.timeCurrentIdlingStarted());
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toSet());
    }

    public Set<ThreadMetadata> metadataForLocalThreads() {
        HashSet hashSet = new HashSet();
        processStreamThread(streamThread -> {
            synchronized (streamThread.getStateLock()) {
                if (streamThread.state() != StreamThread.State.DEAD) {
                    hashSet.add(streamThread.threadMetadata());
                }
            }
        });
        return hashSet;
    }

    public Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags() {
        TreeMap treeMap = new TreeMap();
        LinkedList linkedList = new LinkedList();
        HashMap hashMap = new HashMap();
        processStreamThread(streamThread -> {
            for (Task task : streamThread.allTasks().values()) {
                linkedList.addAll(task.changelogPartitions());
                hashMap.putAll(task.changelogOffsets());
            }
        });
        this.log.debug("Current changelog positions: {}", hashMap);
        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> fetchEndOffsets = ClientUtils.fetchEndOffsets(linkedList, this.adminClient);
        this.log.debug("Current end offsets :{}", fetchEndOffsets);
        for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> entry : fetchEndOffsets.entrySet()) {
            long longValue = ((Long) hashMap.getOrDefault(entry.getKey(), 0L)).longValue();
            long offset = entry.getValue().offset();
            ((Map) treeMap.computeIfAbsent(this.streamsMetadataState.getStoreForChangelogTopic(entry.getKey().topic()), str -> {
                return new TreeMap();
            })).put(Integer.valueOf(entry.getKey().partition()), new LagInfo(longValue == -2 ? offset : longValue, offset));
        }
        return Collections.unmodifiableMap(treeMap);
    }
}
