package com.zulong.keel.bi.advtracking.kafka;

import com.zulong.keel.bi.advtracking.db.mongo.DynamicMongoTemplate;
import com.zulong.keel.bi.advtracking.db.mongo.MongoTableEnum;
import com.zulong.keel.bi.advtracking.db.mongo.entity.ClickEntity;
import com.zulong.keel.bi.advtracking.util.JsonUtil;
import com.zulong.keel.bi.advtracking.util.SpringContextUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/classes/com/zulong/keel/bi/advtracking/kafka/AdvClickMongoSinkForeachAction.class */
public class AdvClickMongoSinkForeachAction implements ForeachAction<String, String> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AdvClickMongoSinkForeachAction.class);
    final Map<String, Map<String, LinkedBlockingQueue<ClickEntity>>> mongoClickHolder = new HashMap();
    final DynamicMongoTemplate dynamicMongoTemplate = (DynamicMongoTemplate) SpringContextUtil.getBean(DynamicMongoTemplate.class);
    final ScheduledThreadPoolExecutor mongoSinkWatcher = new ScheduledThreadPoolExecutor(1);
    final String projectId;
    final Integer mongoBatchSize;

    /* loaded from: input_file:BOOT-INF/classes/com/zulong/keel/bi/advtracking/kafka/AdvClickMongoSinkForeachAction$MongoBatchWatcher.class */
    class MongoBatchWatcher implements Runnable {
        MongoBatchWatcher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            for (Map.Entry<String, Map<String, LinkedBlockingQueue<ClickEntity>>> entry : AdvClickMongoSinkForeachAction.this.mongoClickHolder.entrySet()) {
                String key = entry.getKey();
                for (Map.Entry<String, LinkedBlockingQueue<ClickEntity>> entry2 : entry.getValue().entrySet()) {
                    try {
                        String key2 = entry2.getKey();
                        ArrayList arrayList = new ArrayList();
                        entry2.getValue().drainTo(arrayList);
                        if (!arrayList.isEmpty()) {
                            AdvClickMongoSinkForeachAction.this.dynamicMongoTemplate.bulkSave(AdvClickMongoSinkForeachAction.this.projectId, arrayList, key2);
                            AdvClickMongoSinkForeachAction.log.info("[checkBatchInterval] bulk save mongo success,projectId={},consumerThreadName={},mongoTableName={},count={}", AdvClickMongoSinkForeachAction.this.projectId, key, key2, Integer.valueOf(arrayList.size()));
                        }
                    } catch (Exception e) {
                        AdvClickMongoSinkForeachAction.log.error("[checkBatchInterval] bulk save mongo exception,projectId={}", AdvClickMongoSinkForeachAction.this.projectId, e);
                    }
                }
            }
        }
    }

    public AdvClickMongoSinkForeachAction(String str, Integer num, Integer num2) {
        this.projectId = str;
        this.mongoBatchSize = num;
        this.mongoSinkWatcher.scheduleWithFixedDelay(new MongoBatchWatcher(), num2.intValue(), num2.intValue(), TimeUnit.SECONDS);
    }

    @Override // org.apache.kafka.streams.kstream.ForeachAction
    public void apply(String str, String str2) {
        MongoTableEnum fromValue;
        ClickEntity clickEntity;
        try {
            if (StringUtils.hasText(str)) {
                String[] split = str.split("-");
                if (split.length < 1 || (fromValue = MongoTableEnum.fromValue(split[0])) == null || (clickEntity = (ClickEntity) JsonUtil.jsonToObject(str2, ClickEntity.class)) == null) {
                    return;
                }
                Map<String, LinkedBlockingQueue<ClickEntity>> computeIfAbsent = this.mongoClickHolder.computeIfAbsent(Thread.currentThread().getName(), str3 -> {
                    return new HashMap();
                });
                LinkedBlockingQueue<ClickEntity> linkedBlockingQueue = computeIfAbsent.get(fromValue.getName());
                if (linkedBlockingQueue == null) {
                    linkedBlockingQueue = new LinkedBlockingQueue<>();
                    computeIfAbsent.put(fromValue.getName(), linkedBlockingQueue);
                }
                linkedBlockingQueue.add(clickEntity);
                if (linkedBlockingQueue.size() == this.mongoBatchSize.intValue()) {
                    ArrayList arrayList = new ArrayList();
                    linkedBlockingQueue.drainTo(arrayList);
                    if (arrayList.isEmpty()) {
                        return;
                    }
                    this.dynamicMongoTemplate.bulkSave(this.projectId, arrayList, fromValue.getName());
                    log.info("[kStreamForeach] bulk save mongo success,projectId={},mongoTableName={},count={}", this.projectId, fromValue.getName(), Integer.valueOf(arrayList.size()));
                }
            }
        } catch (Exception e) {
            log.error("[kStreamForeach] other exception,logRecord={}", str2, e);
        }
    }
}
