package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBufferChangelogDeserializationHelper;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.class */
public final class InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements TimeOrderedKeyValueBuffer<K, V, Change<V>> {
    private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
    private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
    private static final byte[] V_1_CHANGELOG_HEADER_VALUE = {1};
    private static final byte[] V_2_CHANGELOG_HEADER_VALUE = {2};
    private static final byte[] V_3_CHANGELOG_HEADER_VALUE = {3};
    static final RecordHeaders CHANGELOG_HEADERS = new RecordHeaders(new Header[]{new RecordHeader(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_KEY, V_3_CHANGELOG_HEADER_VALUE)});
    private static final String METRIC_SCOPE = "in-memory-suppression";
    private final Map<Bytes, BufferKey> index;
    private final TreeMap<BufferKey, BufferValue> sortedMap;
    private final Set<Bytes> dirtyKeys;
    private final String storeName;
    private final boolean loggingEnabled;
    private Serde<K> keySerde;
    private FullChangeSerde<V> valueSerde;
    private long memBufferSize;
    private long minTimestamp;
    private InternalProcessorContext context;
    private String changelogTopic;
    private Sensor bufferSizeSensor;
    private Sensor bufferCountSensor;
    private StreamsMetricsImpl streamsMetrics;
    private String taskId;
    private volatile boolean open;
    private int partition;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer$Builder.class */
    public static class Builder<K, V> implements StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> {
        private final String storeName;
        private final Serde<K> keySerde;
        private final Serde<V> valueSerde;
        private boolean loggingEnabled = true;
        private Map<String, String> logConfig = new HashMap();

        public Builder(String str, Serde<K> serde, Serde<V> serde2) {
            this.storeName = str;
            this.keySerde = serde;
            this.valueSerde = serde2;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> withCachingEnabled() {
            return this;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> withCachingDisabled() {
            return this;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> withLoggingEnabled(Map<String, String> map) {
            this.logConfig = map;
            return this;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public StoreBuilder<InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>>> withLoggingDisabled() {
            this.loggingEnabled = false;
            return this;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public InMemoryTimeOrderedKeyValueChangeBuffer<K, V, Change<V>> build() {
            return new InMemoryTimeOrderedKeyValueChangeBuffer<>(this.storeName, this.loggingEnabled, this.keySerde, this.valueSerde);
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public Map<String, String> logConfig() {
            return loggingEnabled() ? Collections.unmodifiableMap(this.logConfig) : Collections.emptyMap();
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public boolean loggingEnabled() {
            return this.loggingEnabled;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public String name() {
            return this.storeName;
        }
    }

    private InMemoryTimeOrderedKeyValueChangeBuffer(String str, boolean z, Serde<K> serde, Serde<V> serde2) {
        this.index = new HashMap();
        this.sortedMap = new TreeMap<>();
        this.dirtyKeys = new HashSet();
        this.memBufferSize = 0L;
        this.minTimestamp = NetworkClientDelegate.PollResult.WAIT_FOREVER;
        this.storeName = str;
        this.loggingEnabled = z;
        this.keySerde = serde;
        this.valueSerde = FullChangeSerde.wrap(serde2);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.storeName;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return false;
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public void setSerdesIfNull(SerdeGetter serdeGetter) {
        this.keySerde = this.keySerde == null ? serdeGetter.keySerde() : this.keySerde;
        this.valueSerde = this.valueSerde == null ? FullChangeSerde.wrap(serdeGetter.valueSerde()) : this.valueSerde;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.context = ProcessorContextUtils.asInternalProcessorContext(processorContext);
        this.changelogTopic = ProcessorContextUtils.changelogFor(processorContext, name(), Boolean.TRUE);
        init(stateStore);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        this.context = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
        this.changelogTopic = ProcessorContextUtils.changelogFor(stateStoreContext, name(), Boolean.TRUE);
        init(stateStore);
    }

    private void init(StateStore stateStore) {
        this.taskId = this.context.taskId().toString();
        this.streamsMetrics = this.context.metrics();
        this.bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor(this.taskId, METRIC_SCOPE, this.storeName, this.streamsMetrics);
        this.bufferCountSensor = StateStoreMetrics.suppressionBufferCountSensor(this.taskId, METRIC_SCOPE, this.storeName, this.streamsMetrics);
        this.context.register(stateStore, this::restoreBatch);
        updateBufferMetrics();
        this.open = true;
        this.partition = this.context.taskId().partition();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.open;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public Position getPosition() {
        throw new UnsupportedOperationException("This store does not keep track of the position.");
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void close() {
        this.open = false;
        this.index.clear();
        this.sortedMap.clear();
        this.dirtyKeys.clear();
        this.memBufferSize = 0L;
        this.minTimestamp = NetworkClientDelegate.PollResult.WAIT_FOREVER;
        updateBufferMetrics();
        this.streamsMetrics.removeAllStoreLevelSensorsAndMetrics(this.taskId, this.storeName);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void flush() {
        if (this.loggingEnabled) {
            for (Bytes bytes : this.dirtyKeys) {
                BufferKey bufferKey = this.index.get(bytes);
                if (bufferKey == null) {
                    logTombstone(bytes);
                } else {
                    logValue(bytes, bufferKey, this.sortedMap.get(bufferKey));
                }
            }
            this.dirtyKeys.clear();
        }
    }

    private void logValue(Bytes bytes, BufferKey bufferKey, BufferValue bufferValue) {
        ByteBuffer serialize = bufferValue.serialize(8);
        serialize.putLong(bufferKey.time());
        ((RecordCollector.Supplier) this.context).recordCollector().send(this.changelogTopic, (String) bytes, (Bytes) serialize.array(), (Headers) CHANGELOG_HEADERS, Integer.valueOf(this.partition), (Long) null, (Serializer<String>) KEY_SERIALIZER, (Serializer<Bytes>) VALUE_SERIALIZER, (String) null, (InternalProcessorContext<Void, Void>) null);
    }

    private void logTombstone(Bytes bytes) {
        ((RecordCollector.Supplier) this.context).recordCollector().send(this.changelogTopic, (String) bytes, (Bytes) null, (Headers) null, Integer.valueOf(this.partition), (Long) null, (Serializer<String>) KEY_SERIALIZER, (Serializer<Bytes>) VALUE_SERIALIZER, (String) null, (InternalProcessorContext<Void, Void>) null);
    }

    private void restoreBatch(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        for (ConsumerRecord<byte[], byte[]> consumerRecord : collection) {
            if (consumerRecord.partition() != this.partition) {
                throw new IllegalStateException(String.format("record partition [%d] is being restored by the wrong suppress partition [%d]", Integer.valueOf(consumerRecord.partition()), Integer.valueOf(this.partition)));
            }
            Bytes wrap = Bytes.wrap(consumerRecord.key());
            if (consumerRecord.value() == null) {
                BufferKey remove = this.index.remove(wrap);
                if (remove != null) {
                    BufferValue remove2 = this.sortedMap.remove(remove);
                    if (remove2 != null) {
                        this.memBufferSize -= computeRecordSize(remove.key(), remove2);
                    }
                    if (remove.time() == this.minTimestamp) {
                        this.minTimestamp = this.sortedMap.isEmpty() ? NetworkClientDelegate.PollResult.WAIT_FOREVER : this.sortedMap.firstKey().time();
                    }
                }
            } else {
                Header lastHeader = consumerRecord.headers().lastHeader(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_KEY);
                if (lastHeader == null) {
                    TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult deserializeV0 = TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV0(consumerRecord, wrap, this.index.containsKey(wrap) ? internalPriorValueForBuffered(wrap) : null);
                    cleanPut(deserializeV0.time(), deserializeV0.key(), deserializeV0.bufferValue());
                } else if (Arrays.equals(lastHeader.value(), V_3_CHANGELOG_HEADER_VALUE)) {
                    TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult deserializeV3 = TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV3(consumerRecord, wrap);
                    cleanPut(deserializeV3.time(), deserializeV3.key(), deserializeV3.bufferValue());
                } else if (Arrays.equals(lastHeader.value(), V_2_CHANGELOG_HEADER_VALUE)) {
                    TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult duckTypeV2 = TimeOrderedKeyValueBufferChangelogDeserializationHelper.duckTypeV2(consumerRecord, wrap);
                    cleanPut(duckTypeV2.time(), duckTypeV2.key(), duckTypeV2.bufferValue());
                } else {
                    if (!Arrays.equals(lastHeader.value(), V_1_CHANGELOG_HEADER_VALUE)) {
                        throw new IllegalArgumentException("Restoring apparently invalid changelog record: " + consumerRecord);
                    }
                    TimeOrderedKeyValueBufferChangelogDeserializationHelper.DeserializationResult deserializeV1 = TimeOrderedKeyValueBufferChangelogDeserializationHelper.deserializeV1(consumerRecord, wrap, this.index.containsKey(wrap) ? internalPriorValueForBuffered(wrap) : null);
                    cleanPut(deserializeV1.time(), deserializeV1.key(), deserializeV1.bufferValue());
                }
            }
        }
        updateBufferMetrics();
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public void evictWhile(Supplier<Boolean> supplier, Consumer<TimeOrderedKeyValueBuffer.Eviction<K, Change<V>>> consumer) {
        Iterator<Map.Entry<BufferKey, BufferValue>> it = this.sortedMap.entrySet().iterator();
        int i = 0;
        if (supplier.get().booleanValue()) {
            Map.Entry<BufferKey, BufferValue> entry = null;
            if (it.hasNext()) {
                entry = it.next();
            }
            while (entry != null && supplier.get().booleanValue()) {
                if (entry.getKey().time() != this.minTimestamp) {
                    throw new IllegalStateException("minTimestamp [" + this.minTimestamp + "] did not match the actual min timestamp [" + entry.getKey().time() + "]");
                }
                K deserialize = this.keySerde.deserializer().deserialize(this.changelogTopic, entry.getKey().key().get());
                BufferValue value = entry.getValue();
                consumer.accept(new TimeOrderedKeyValueBuffer.Eviction<>(deserialize, this.valueSerde.deserializeParts(this.changelogTopic, new Change<>(value.newValue(), value.oldValue())), value.context()));
                it.remove();
                this.index.remove(entry.getKey().key());
                if (this.loggingEnabled) {
                    this.dirtyKeys.add(entry.getKey().key());
                }
                this.memBufferSize -= computeRecordSize(entry.getKey().key(), value);
                if (it.hasNext()) {
                    entry = it.next();
                    this.minTimestamp = entry == null ? NetworkClientDelegate.PollResult.WAIT_FOREVER : entry.getKey().time();
                } else {
                    entry = null;
                    this.minTimestamp = NetworkClientDelegate.PollResult.WAIT_FOREVER;
                }
                i++;
            }
        }
        if (i > 0) {
            updateBufferMetrics();
        }
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public Maybe<ValueAndTimestamp<V>> priorValueForBuffered(K k) {
        Bytes wrap = Bytes.wrap(this.keySerde.serializer().serialize(this.changelogTopic, k));
        if (!this.index.containsKey(wrap)) {
            return Maybe.undefined();
        }
        return Maybe.defined(ValueAndTimestamp.make(this.valueSerde.innerSerde().deserializer().deserialize(this.changelogTopic, internalPriorValueForBuffered(wrap)), -1L));
    }

    private byte[] internalPriorValueForBuffered(Bytes bytes) {
        BufferKey bufferKey = this.index.get(bytes);
        if (bufferKey == null) {
            throw new NoSuchElementException("Key [" + bytes + "] is not in the buffer.");
        }
        return this.sortedMap.get(bufferKey).priorValue();
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public boolean put(long j, Record<K, Change<V>> record, ProcessorRecordContext processorRecordContext) {
        Objects.requireNonNull(record.value(), "value cannot be null");
        Objects.requireNonNull(processorRecordContext, "recordContext cannot be null");
        Bytes wrap = Bytes.wrap(this.keySerde.serializer().serialize(this.changelogTopic, record.key()));
        Change<byte[]> serializeParts = this.valueSerde.serializeParts(this.changelogTopic, record.value());
        BufferValue buffered = getBuffered(wrap);
        cleanPut(j, wrap, new BufferValue(buffered == null ? serializeParts.oldValue : buffered.priorValue(), serializeParts.oldValue, serializeParts.newValue, processorRecordContext));
        if (this.loggingEnabled) {
            this.dirtyKeys.add(wrap);
        }
        updateBufferMetrics();
        return true;
    }

    private BufferValue getBuffered(Bytes bytes) {
        BufferKey bufferKey = this.index.get(bytes);
        if (bufferKey == null) {
            return null;
        }
        return this.sortedMap.get(bufferKey);
    }

    private void cleanPut(long j, Bytes bytes, BufferValue bufferValue) {
        BufferKey bufferKey = this.index.get(bytes);
        if (bufferKey != null) {
            BufferValue put = this.sortedMap.put(bufferKey, bufferValue);
            this.memBufferSize = (this.memBufferSize + computeRecordSize(bytes, bufferValue)) - (put == null ? 0L : computeRecordSize(bytes, put));
            return;
        }
        BufferKey bufferKey2 = new BufferKey(j, bytes);
        this.index.put(bytes, bufferKey2);
        this.sortedMap.put(bufferKey2, bufferValue);
        this.minTimestamp = Math.min(this.minTimestamp, j);
        this.memBufferSize += computeRecordSize(bytes, bufferValue);
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public int numRecords() {
        return this.index.size();
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public long bufferSize() {
        return this.memBufferSize;
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public long minTimestamp() {
        return this.minTimestamp;
    }

    private static long computeRecordSize(Bytes bytes, BufferValue bufferValue) {
        long length = 0 + 8 + bytes.get().length;
        if (bufferValue != null) {
            length += bufferValue.residentMemorySizeEstimate();
        }
        return length;
    }

    private void updateBufferMetrics() {
        this.bufferSizeSensor.record(this.memBufferSize, this.context.currentSystemTimeMs());
        this.bufferCountSensor.record(this.index.size(), this.context.currentSystemTimeMs());
    }

    public String toString() {
        return "InMemoryTimeOrderedKeyValueChangeBuffer{storeName='" + this.storeName + "', changelogTopic='" + this.changelogTopic + "', open=" + this.open + ", loggingEnabled=" + this.loggingEnabled + ", minTimestamp=" + this.minTimestamp + ", memBufferSize=" + this.memBufferSize + ", \n\tdirtyKeys=" + this.dirtyKeys + ", \n\tindex=" + this.index + ", \n\tsortedMap=" + this.sortedMap + '}';
    }
}
