package com.zulong.keel.bi.advtracking.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.dom4j.Element;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/com/zulong/keel/bi/advtracking/kafka/KafkaManager.class */
public class KafkaManager {
    private final Map<String, KafkaStreams> projectKStreamMap = new HashMap();
    private final Map<String, TopicProducer> projectProducerMap = new HashMap();
    private final Map<String, KafkaStreams> projectAdvClickKafkaStreamsMap = new HashMap();
    private final Map<String, TopicProducer> projectAdvClickProducerMap = new HashMap();

    public void init(String str, Integer num, Integer num2, Element element) throws Exception {
        Element element2 = element.element("bilogs");
        KafkaStreams buildConsumerStreams = buildConsumerStreams(element2, new AdvTrackingForeachAction());
        buildConsumerStreams.start();
        this.projectKStreamMap.put(str, buildConsumerStreams);
        this.projectProducerMap.put(str, buildProducer(element2, "bilogs"));
        Element element3 = element.element("advclick");
        KafkaStreams buildConsumerStreams2 = buildConsumerStreams(element3, new AdvClickMongoSinkForeachAction(str, num, num2));
        buildConsumerStreams2.start();
        this.projectAdvClickKafkaStreamsMap.put(str, buildConsumerStreams2);
        this.projectAdvClickProducerMap.put(str, buildProducer(element3, "advclick"));
    }

    public KafkaStreams buildConsumerStreams(Element element, ForeachAction<String, String> foreachAction) throws Exception {
        String elementTextTrim = element.elementTextTrim("bootstrapServers");
        String elementTextTrim2 = element.elementTextTrim("applicationId");
        String elementTextTrim3 = element.elementTextTrim("topic");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", elementTextTrim);
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, elementTextTrim2);
        properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, element.elementTextTrim("numStreamThreads"));
        properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, element.elementTextTrim("commitIntervalMs"));
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        properties.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, KafkaStreamTimeExtractor.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, element.elementTextTrim("autoOffsetReset"));
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, element.elementTextTrim("maxPollRecords"));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(elementTextTrim3).foreach(foreachAction);
        return new KafkaStreams(streamsBuilder.build(), properties);
    }

    public TopicProducer buildProducer(Element element, String str) throws Exception {
        String elementTextTrim = element.elementTextTrim("bootstrapServers");
        String elementTextTrim2 = element.elementTextTrim("applicationId");
        String elementTextTrim3 = element.elementTextTrim("topic");
        int parseInt = Integer.parseInt(element.elementTextTrim("batchSize"));
        long parseLong = Long.parseLong(element.elementTextTrim("lingerMs"));
        long parseLong2 = Long.parseLong(element.elementTextTrim("bufferMemory"));
        String str2 = elementTextTrim2 + "_" + str;
        Properties properties = new Properties();
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.valueOf(parseInt));
        properties.put(ProducerConfig.LINGER_MS_CONFIG, Long.valueOf(parseLong));
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Long.valueOf(parseLong2));
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("client.id", str2);
        properties.put("bootstrap.servers", elementTextTrim);
        return new TopicProducer(str2, elementTextTrim3, new KafkaProducer(properties));
    }

    public TopicProducer getKafkaTopicProducerByProjectId(String str) {
        return this.projectProducerMap.get(str);
    }

    public TopicProducer getAdvClickProducerByProjectId(String str) {
        return this.projectAdvClickProducerMap.get(str);
    }
}
