package com.zulong.keel.realtime;

import com.zulong.keel.realtime.kafka.KafkaStreamTimeExtractor;
import com.zulong.keel.realtime.kafka.processor.DefaultKStreamProcessor;
import com.zulong.keel.realtime.kafka.processor.JsonKStreamProcessor;
import com.zulong.keel.realtime.kafka.serde.StringListSerde;
import com.zulong.keel.realtime.kafka.stream.LogTransformKuduForeachAction;
import com.zulong.keel.realtime.kafka.stream.LogTransformMysqlForeachAction;
import com.zulong.keel.realtime.model.KStreamProcessor;
import java.io.File;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;

/* loaded from: input_file:com/zulong/keel/realtime/KStreamManager.class */
public class KStreamManager {
    private static KStreamManager instance;
    Map<String, KafkaStreams> sourceStreamsMap = new HashMap();
    Integer kafkaToNumStreamThreads;
    KafkaStreams mysqlSinkStreams;
    KafkaStreams kuduSinkStreams;

    private KStreamManager() throws Exception {
        load("config/kafka.properties");
    }

    public static void init() throws Exception {
        instance = new KStreamManager();
    }

    public static KStreamManager getInstance() {
        return instance;
    }

    public void load(String str) throws Exception {
        Properties properties = new Properties();
        properties.load(new FileInputStream(new File(str)));
        String property = properties.getProperty("kafka.bootstrap.servers");
        RealTimeLoggerManager.logger().info("Config.load@load kafka config|kafka.bootstrap.servers={}", property);
        List<String> asList = Arrays.asList(properties.getProperty("kafka.sources").split(","));
        RealTimeLoggerManager.logger().info("Config.load@load kafka config|kafka.sources={}", asList);
        String property2 = properties.getProperty("kafka.to.application.mysql.id");
        RealTimeLoggerManager.logger().info("Config.load@load kafka config|kafka.to.application.mysql.id={}", property2);
        String property3 = properties.getProperty("kafka.to.application.kudu.id");
        RealTimeLoggerManager.logger().info("Config.load@load kafka config|kafka.to.application.kudu.id={}", property3);
        String property4 = properties.getProperty("kafka.to.topic");
        RealTimeLoggerManager.logger().info("Config.load@load kafka config|kafka.to.topic={}", property4);
        this.kafkaToNumStreamThreads = Integer.valueOf(properties.getProperty("kafka.to.num.stream.threads"));
        RealTimeLoggerManager.logger().info("Config.load@load kafka config|kafka.to.num.stream.threads={}", this.kafkaToNumStreamThreads);
        String property5 = properties.getProperty("kafka.to.auto.offset.reset");
        RealTimeLoggerManager.logger().info("Config.load@load kafka config|kafka.to.auto.offset.reset={}", property5);
        String property6 = properties.getProperty("kafka.to.consumer.max.poll.records");
        RealTimeLoggerManager.logger().info("Config.load@load kafka config|kafka.to.consumer.max.poll.records={}", property6);
        for (String str2 : asList) {
            Properties properties2 = new Properties();
            properties2.put("bootstrap.servers", property);
            properties2.put("application.id", properties.getProperty(String.format("kafka.source.%s.application.id", str2)));
            properties2.put("num.stream.threads", properties.getProperty(String.format("kafka.source.%s.num.stream.threads", str2)));
            properties2.put("default.key.serde", Serdes.String().getClass());
            properties2.put("default.value.serde", Serdes.String().getClass());
            properties2.put("default.timestamp.extractor", KafkaStreamTimeExtractor.class);
            properties2.put("auto.offset.reset", properties.getProperty(String.format("kafka.source.%s.auto.offset.reset", str2)));
            properties2.put("max.poll.records", properties.getProperty(String.format("kafka.source.%s.consumer.max.poll.records", str2)));
            properties2.put("batch.size", properties.getProperty(String.format("kafka.source.%s.producer.batch.size", str2)));
            properties2.put("linger.ms", properties.getProperty(String.format("kafka.source.%s.producer.linger.ms", str2)));
            String property7 = properties.getProperty(String.format("kafka.source.%s.topic", str2));
            String property8 = properties.getProperty(String.format("kafka.source.%s.processor.type", str2));
            this.sourceStreamsMap.put(str2, KStreamProcessor.JSON.getValue().equals(property8) ? new JsonKStreamProcessor().build(property7, property4, properties2, properties.getProperty(String.format("kafka.source.%s.processor.json.mapper", str2))) : new DefaultKStreamProcessor().build(property7, property4, properties2));
            RealTimeLoggerManager.logger().info("Config.load@load kafka source config|name={},type={},source.topic={},to.topic={},properties={}", str2, property8, property7, property4, properties2);
        }
        Properties properties3 = new Properties();
        properties3.put("bootstrap.servers", property);
        properties3.put("application.id", property2);
        properties3.put("num.stream.threads", this.kafkaToNumStreamThreads);
        properties3.put("default.key.serde", Serdes.String().getClass());
        properties3.put("default.value.serde", StringListSerde.class);
        properties3.put("auto.offset.reset", property5);
        properties3.put("max.poll.records", property6);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(property4).foreach((str3, list) -> {
            new LogTransformMysqlForeachAction().apply(str3, (List<String>) list);
        });
        this.mysqlSinkStreams = new KafkaStreams(streamsBuilder.build(), properties3);
        Properties properties4 = new Properties();
        properties4.put("bootstrap.servers", property);
        properties4.put("application.id", property3);
        properties4.put("num.stream.threads", this.kafkaToNumStreamThreads);
        properties4.put("default.key.serde", Serdes.String().getClass());
        properties4.put("default.value.serde", StringListSerde.class);
        properties4.put("auto.offset.reset", property5);
        properties4.put("max.poll.records", property6);
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        streamsBuilder2.stream(property4).foreach((str4, list2) -> {
            new LogTransformKuduForeachAction().apply(str4, (List<String>) list2);
        });
        this.kuduSinkStreams = new KafkaStreams(streamsBuilder2.build(), properties4);
    }

    public void start() {
        Iterator<KafkaStreams> it = this.sourceStreamsMap.values().iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        if (Config.getInstance().isSinkMysqlOpen()) {
            this.mysqlSinkStreams.start();
        }
        if (Config.getInstance().isSinkKuduOpen()) {
            this.kuduSinkStreams.start();
        }
    }

    public void stop() {
        Iterator<KafkaStreams> it = this.sourceStreamsMap.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        if (this.mysqlSinkStreams != null) {
            this.mysqlSinkStreams.close();
        }
        if (this.kuduSinkStreams != null) {
            this.kuduSinkStreams.close();
        }
    }

    public Integer getKafkaToNumStreamThreads() {
        return this.kafkaToNumStreamThreads;
    }
}
