package com.zulong.keel.realtime.kafka.processor;

import com.zulong.keel.realtime.kafka.serde.StringListSerde;
import com.zulong.keel.realtime.kafka.stream.LogFilterPredicate;
import com.zulong.keel.realtime.kafka.stream.LogJsonAdvClickKeyValueMapper;
import com.zulong.keel.realtime.kafka.stream.LogReplaceFieldMapper;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Produced;

/* loaded from: input_file:com/zulong/keel/realtime/kafka/processor/JsonKStreamProcessor.class */
public class JsonKStreamProcessor {
    KafkaStreams kStreams;

    public KafkaStreams build(String str, String str2, Properties properties, String str3) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(str).map((str4, str5) -> {
            return new LogJsonAdvClickKeyValueMapper().apply(str4, str5);
        }).filter((str6, list) -> {
            return new LogFilterPredicate().test(str6, (List<String>) list);
        }).map((str7, list2) -> {
            return new LogReplaceFieldMapper().apply(str7, (List<String>) list2);
        }).to(str2, Produced.with(Serdes.String(), new StringListSerde()));
        this.kStreams = new KafkaStreams(streamsBuilder.build(), properties);
        return this.kStreams;
    }

    public void start() {
        if (this.kStreams != null) {
            this.kStreams.start();
        }
    }

    public void stop() {
        if (this.kStreams != null) {
            this.kStreams.close();
        }
    }
}
