package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.class */
public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V, V> {
    private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer();
    private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer();
    private final long gracePeriod;
    private final RocksDBTimeOrderedKeyValueBytesStore store;
    private long minTimestamp;
    private Serde<K> keySerde;
    private Serde<V> valueSerde;
    private final String topic;
    private final boolean loggingEnabled;
    private int partition;
    private String changelogTopic;
    private InternalProcessorContext context;
    private boolean minValid = false;
    private int numRecords = 0;
    private long bufferSize = 0;
    private int seqnum = 0;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer$Builder.class */
    public static class Builder<K, V> implements StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> {
        private final String storeName;
        private boolean loggingEnabled = true;
        private Map<String, String> logConfig = new HashMap();
        private final Duration grace;
        private final String topic;

        public Builder(String str, Duration duration, String str2) {
            this.storeName = str;
            this.grace = duration;
            this.topic = str2;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> withCachingEnabled() {
            return this;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> withCachingDisabled() {
            return this;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> withLoggingEnabled(Map<String, String> map) {
            this.logConfig = map;
            return this;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> withLoggingDisabled() {
            this.loggingEnabled = false;
            return this;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public TimeOrderedKeyValueBuffer<K, V, V> build() {
            return new RocksDBTimeOrderedKeyValueBuffer(new RocksDBTimeOrderedKeyValueBytesStoreSupplier(this.storeName).get(), this.grace, this.topic, this.loggingEnabled);
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public Map<String, String> logConfig() {
            return loggingEnabled() ? Collections.unmodifiableMap(this.logConfig) : Collections.emptyMap();
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public boolean loggingEnabled() {
            return this.loggingEnabled;
        }

        @Override // org.apache.kafka.streams.state.StoreBuilder
        public String name() {
            return this.storeName;
        }
    }

    public RocksDBTimeOrderedKeyValueBuffer(RocksDBTimeOrderedKeyValueBytesStore rocksDBTimeOrderedKeyValueBytesStore, Duration duration, String str, boolean z) {
        this.store = rocksDBTimeOrderedKeyValueBytesStore;
        this.gracePeriod = duration.toMillis();
        this.minTimestamp = rocksDBTimeOrderedKeyValueBytesStore.minTimestamp();
        this.topic = str;
        this.loggingEnabled = z;
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public void setSerdesIfNull(SerdeGetter serdeGetter) {
        this.keySerde = this.keySerde == null ? serdeGetter.keySerde() : this.keySerde;
        this.valueSerde = this.valueSerde == null ? serdeGetter.valueSerde() : this.valueSerde;
    }

    private long observedStreamTime() {
        return this.store.observedStreamTime;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.store.name();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.store.init(processorContext, stateStore);
        this.context = ProcessorContextUtils.asInternalProcessorContext(processorContext);
        this.partition = processorContext.taskId().partition();
        if (this.loggingEnabled) {
            this.changelogTopic = ProcessorContextUtils.changelogFor(processorContext, name(), Boolean.TRUE);
        }
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        this.store.init(stateStoreContext, stateStore);
        this.context = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
        this.partition = stateStoreContext.taskId().partition();
        if (this.loggingEnabled) {
            this.changelogTopic = ProcessorContextUtils.changelogFor(stateStoreContext, name(), Boolean.TRUE);
        }
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void flush() {
        this.store.flush();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void close() {
        this.store.close();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return this.store.persistent();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.store.isOpen();
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public void evictWhile(Supplier<Boolean> supplier, Consumer<TimeOrderedKeyValueBuffer.Eviction<K, V>> consumer) {
        if (supplier.get().booleanValue()) {
            long j = 0;
            if (this.minValid) {
                j = minTimestamp();
            }
            KeyValueIterator<Bytes, byte[]> fetchAll = this.store.fetchAll(j, observedStreamTime() - this.gracePeriod);
            Throwable th = null;
            while (fetchAll.hasNext() && supplier.get().booleanValue()) {
                try {
                    KeyValue<K, V> next = fetchAll.next();
                    BufferValue deserialize = BufferValue.deserialize(ByteBuffer.wrap((byte[]) next.value));
                    K deserialize2 = this.keySerde.deserializer().deserialize(this.topic, PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(((Bytes) next.key).get()));
                    if (deserialize.context().timestamp() < this.minTimestamp && this.minValid) {
                        throw new IllegalStateException("minTimestamp [" + this.minTimestamp + "] did not match the actual min timestamp [" + deserialize.context().timestamp() + "]");
                    }
                    this.minTimestamp = deserialize.context().timestamp();
                    this.minValid = true;
                    consumer.accept(new TimeOrderedKeyValueBuffer.Eviction<>(deserialize2, this.valueSerde.deserializer().deserialize(this.topic, deserialize.newValue()), deserialize.context()));
                    this.store.remove((Bytes) next.key);
                    if (this.loggingEnabled) {
                        logTombstone((Bytes) next.key);
                    }
                    this.numRecords--;
                    this.bufferSize -= computeRecordSize((Bytes) next.key, deserialize);
                } catch (Throwable th2) {
                    if (fetchAll != null) {
                        if (0 != 0) {
                            try {
                                fetchAll.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            fetchAll.close();
                        }
                    }
                    throw th2;
                }
            }
            if (this.numRecords == 0) {
                this.minTimestamp = NetworkClientDelegate.PollResult.WAIT_FOREVER;
            } else {
                this.minTimestamp = (observedStreamTime() - this.gracePeriod) + 1;
            }
            if (fetchAll != null) {
                if (0 == 0) {
                    fetchAll.close();
                    return;
                }
                try {
                    fetchAll.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            }
        }
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public Maybe<ValueAndTimestamp<V>> priorValueForBuffered(K k) {
        return Maybe.undefined();
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public boolean put(long j, Record<K, V> record, ProcessorRecordContext processorRecordContext) {
        Objects.requireNonNull(record.value(), "value cannot be null");
        Objects.requireNonNull(record.key(), "key cannot be null");
        Objects.requireNonNull(processorRecordContext, "recordContext cannot be null");
        if (observedStreamTime() - this.gracePeriod > record.timestamp()) {
            return false;
        }
        maybeUpdateSeqnumForDups();
        Bytes wrap = Bytes.wrap(PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(this.keySerde.serializer().serialize(this.topic, record.key()), record.timestamp(), this.seqnum).get());
        BufferValue bufferValue = new BufferValue(null, null, this.valueSerde.serializer().serialize(this.topic, record.value()), processorRecordContext);
        this.store.put(wrap, bufferValue.serialize(0).array());
        if (this.loggingEnabled) {
            logValue(wrap, new BufferKey(0L, wrap), bufferValue);
        }
        this.bufferSize += computeRecordSize(wrap, bufferValue);
        this.numRecords++;
        this.minTimestamp = Math.min(minTimestamp(), record.timestamp());
        return true;
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public int numRecords() {
        return this.numRecords;
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public long bufferSize() {
        return this.bufferSize;
    }

    @Override // org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer
    public long minTimestamp() {
        return this.minTimestamp;
    }

    private static long computeRecordSize(Bytes bytes, BufferValue bufferValue) {
        long length = 0 + bytes.get().length;
        if (bufferValue != null) {
            length += bufferValue.residentMemorySizeEstimate();
        }
        return length;
    }

    private void maybeUpdateSeqnumForDups() {
        this.seqnum = (this.seqnum + 1) & Integer.MAX_VALUE;
    }

    private void logValue(Bytes bytes, BufferKey bufferKey, BufferValue bufferValue) {
        ByteBuffer serialize = bufferValue.serialize(8);
        serialize.putLong(bufferKey.time());
        ((RecordCollector.Supplier) this.context).recordCollector().send(this.changelogTopic, (String) bytes, (Bytes) serialize.array(), (Headers) null, Integer.valueOf(this.partition), (Long) null, (Serializer<String>) KEY_SERIALIZER, (Serializer<Bytes>) VALUE_SERIALIZER, (String) null, (InternalProcessorContext<Void, Void>) null);
    }

    private void logTombstone(Bytes bytes) {
        ((RecordCollector.Supplier) this.context).recordCollector().send(this.changelogTopic, (String) bytes, (Bytes) null, (Headers) null, Integer.valueOf(this.partition), (Long) null, (Serializer<String>) KEY_SERIALIZER, (Serializer<Bytes>) VALUE_SERIALIZER, (String) null, (InternalProcessorContext<Void, Void>) null);
    }
}
