package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.1.2.jar:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.class */
public class SubscriptionStoreReceiveProcessorSupplier<K, KO> implements ProcessorSupplier<KO, SubscriptionWrapper<K>> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SubscriptionStoreReceiveProcessorSupplier.class);
    private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
    private final CombinedKeySchema<KO, K> keySchema;

    public SubscriptionStoreReceiveProcessorSupplier(StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, CombinedKeySchema<KO, K> combinedKeySchema) {
        this.storeBuilder = storeBuilder;
        this.keySchema = combinedKeySchema;
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier, java.util.function.Supplier
    public Processor<KO, SubscriptionWrapper<K>> get() {
        return new AbstractProcessor<KO, SubscriptionWrapper<K>>() { // from class: org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier.1
            private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> store;
            private Sensor droppedRecordsSensor;

            @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
            public void init(ProcessorContext processorContext) {
                super.init(processorContext);
                InternalProcessorContext internalProcessorContext = (InternalProcessorContext) processorContext;
                this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), internalProcessorContext.taskId().toString(), internalProcessorContext.metrics());
                this.store = (TimestampedKeyValueStore) internalProcessorContext.getStateStore(SubscriptionStoreReceiveProcessorSupplier.this.storeBuilder);
                SubscriptionStoreReceiveProcessorSupplier.this.keySchema.init(processorContext);
            }

            public void process(KO ko, SubscriptionWrapper<K> subscriptionWrapper) {
                if (ko == null) {
                    SubscriptionStoreReceiveProcessorSupplier.LOG.warn("Skipping record due to null foreign key. value=[{}] topic=[{}] partition=[{}] offset=[{}]", subscriptionWrapper, context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset()));
                    this.droppedRecordsSensor.record();
                } else {
                    if (subscriptionWrapper.getVersion() != 0) {
                        throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
                    }
                    Bytes bytes = SubscriptionStoreReceiveProcessorSupplier.this.keySchema.toBytes(ko, subscriptionWrapper.getPrimaryKey());
                    ValueAndTimestamp make = ValueAndTimestamp.make(subscriptionWrapper, context().timestamp());
                    ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp) this.store.get(bytes);
                    if (subscriptionWrapper.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_AND_PROPAGATE) || subscriptionWrapper.getInstruction().equals(SubscriptionWrapper.Instruction.DELETE_KEY_NO_PROPAGATE)) {
                        this.store.delete(bytes);
                    } else {
                        this.store.put(bytes, make);
                    }
                    context().forward(new CombinedKey(ko, subscriptionWrapper.getPrimaryKey()), new Change(make, valueAndTimestamp), To.all().withTimestamp(make.timestamp()));
                }
            }

            @Override // org.apache.kafka.streams.processor.Processor
            public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
                process((AnonymousClass1) obj, (SubscriptionWrapper) obj2);
            }
        };
    }
}
