package com.zulong.keel.realtime.kafka.stream;

import com.zulong.keel.realtime.RealTimeLogHandlerManager;
import com.zulong.keel.realtime.RealTimeLoggerManager;
import com.zulong.keel.realtime.handler.kudu.KuduLogHandler;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.streams.kstream.ForeachAction;

/* loaded from: input_file:com/zulong/keel/realtime/kafka/stream/LogTransformKuduForeachAction.class */
public class LogTransformKuduForeachAction implements ForeachAction<String, List<String>> {
    public void apply(String str, List<String> list) {
        RealTimeLoggerManager.logger().debug("LogTransformKuduForeachAction.apply@consume log from kafka|log={}", list);
        try {
            String str2 = list.get(RealTimeLogHandlerManager.getInstance().getDefaultModelIndex("logtype") - 1);
            KuduLogHandler kuduLogHandler = (KuduLogHandler) RealTimeLogHandlerManager.getInstance().getKuduSink().getLogHandler(str2);
            if (kuduLogHandler != null && kuduLogHandler.distinctRealTimeLog(str2, list)) {
                List<String> formatRealTimeLog = kuduLogHandler.formatRealTimeLog(str2, list);
                RealTimeLoggerManager.logger().debug("LogTransformKuduForeachAction.apply@etl success|logType={}|log={}", str2, formatRealTimeLog);
                if (kuduLogHandler.needMultiTimezone()) {
                    Iterator<List<String>> it = kuduLogHandler.convertLogTime(formatRealTimeLog).iterator();
                    while (it.hasNext()) {
                        RealTimeLogHandlerManager.getInstance().getKuduSink().addRealTimeLog(str2, it.next());
                    }
                } else {
                    RealTimeLogHandlerManager.getInstance().getKuduSink().addRealTimeLog(str2, formatRealTimeLog);
                }
            }
        } catch (Exception e) {
            RealTimeLoggerManager.logger().error("LogTransformKuduForeachAction.apply@other exception|log={}", list, e);
        }
    }
}
