package com.zulong.keel.realtime.db.sink;

import com.zulong.keel.realtime.Config;
import com.zulong.keel.realtime.KStreamManager;
import com.zulong.keel.realtime.KuduClientManager;
import com.zulong.keel.realtime.RealTimeLoggerManager;
import com.zulong.keel.realtime.handler.LogHandler;
import com.zulong.keel.realtime.handler.kudu.ActiveLogHandler;
import com.zulong.keel.realtime.handler.kudu.AddcashLogHandler;
import com.zulong.keel.realtime.handler.kudu.AddeviceidLogHandler;
import com.zulong.keel.realtime.handler.kudu.AdeventLogHandler;
import com.zulong.keel.realtime.handler.kudu.DefaultKuduLogHandler;
import com.zulong.keel.realtime.handler.kudu.DownloaderStepHandler;
import com.zulong.keel.realtime.handler.kudu.InAppEventLogHandler;
import com.zulong.keel.realtime.handler.kudu.ReinstallLogHandler;
import com.zulong.keel.realtime.handler.kudu.RoleloginLogHandler;
import com.zulong.keel.realtime.handler.kudu.SdkSteplogreportLogHandler;
import com.zulong.keel.realtime.handler.kudu.StepLogReportLogHandler;
import com.zulong.keel.realtime.handler.kudu.ZlclickLogHandler;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduSession;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.PartialRow;

/* loaded from: input_file:com/zulong/keel/realtime/db/sink/KuduSink.class */
public class KuduSink implements Sink {
    private final Map<String, LogHandler> logType2Handlers = new HashMap();
    private final Map<Integer, Map<String, LinkedBlockingQueue<List<String>>>> thread2Caches = new HashMap();
    private final KuduSession kuduSession = KuduClientManager.getInstance().getKuduSession();
    private final Set<String> noKuduLogTypes;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.zulong.keel.realtime.db.sink.KuduSink$1, reason: invalid class name */
    /* loaded from: input_file:com/zulong/keel/realtime/db/sink/KuduSink$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kudu$Type = new int[Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$kudu$Type[Type.STRING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kudu$Type[Type.INT8.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kudu$Type[Type.INT16.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kudu$Type[Type.INT32.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kudu$Type[Type.INT64.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kudu$Type[Type.BOOL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kudu$Type[Type.FLOAT.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kudu$Type[Type.DOUBLE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    public KuduSink(Set<String> set) {
        this.noKuduLogTypes = set;
    }

    public void init(Set<String> set) {
        LogHandler newHandlerInstance;
        for (int i = 0; i < KStreamManager.getInstance().getKafkaToNumStreamThreads().intValue(); i++) {
            HashMap hashMap = new HashMap();
            for (String str : set) {
                if (!this.noKuduLogTypes.contains(str) && (newHandlerInstance = newHandlerInstance(str)) != null) {
                    this.logType2Handlers.put(str, newHandlerInstance);
                    hashMap.put(str, new LinkedBlockingQueue());
                }
            }
            this.thread2Caches.put(Integer.valueOf(i), hashMap);
        }
    }

    @Override // com.zulong.keel.realtime.db.sink.Sink
    public LogHandler getLogHandler(String str) {
        return this.logType2Handlers.get(str);
    }

    @Override // com.zulong.keel.realtime.db.sink.Sink
    public Map<Integer, Map<String, LinkedBlockingQueue<List<String>>>> getThread2Caches() {
        return this.thread2Caches;
    }

    @Override // com.zulong.keel.realtime.db.sink.Sink
    public void sink(LogHandler logHandler, LinkedBlockingQueue<List<String>> linkedBlockingQueue) {
        String realTimeLogTableName = logHandler.getRealTimeLogTableName();
        int size = linkedBlockingQueue.size();
        try {
            KuduTable kuduTable = KuduClientManager.getInstance().getKuduTable(getFullTableName(realTimeLogTableName));
            List columns = kuduTable.getSchema().getColumns();
            int size2 = columns.size();
            for (int i = 0; i < size; i++) {
                List<String> remove = linkedBlockingQueue.remove();
                try {
                    int min = Math.min(remove.size(), size2);
                    Insert newInsert = kuduTable.newInsert();
                    PartialRow row = newInsert.getRow();
                    for (int i2 = 0; i2 < min; i2++) {
                        coerceAndSet(remove.get(i2), (ColumnSchema) columns.get(i2), row);
                    }
                    this.kuduSession.apply(newInsert);
                    String kuduBackupTableName = Config.getInstance().getKuduBackupTableName(realTimeLogTableName);
                    if (kuduBackupTableName != null) {
                        KuduTable kuduTable2 = KuduClientManager.getInstance().getKuduTable(getFullTableName(kuduBackupTableName));
                        List columns2 = kuduTable2.getSchema().getColumns();
                        Insert newInsert2 = kuduTable2.newInsert();
                        PartialRow row2 = newInsert2.getRow();
                        for (int i3 = 0; i3 < min; i3++) {
                            coerceAndSet(remove.get(i3), (ColumnSchema) columns2.get(i3), row2);
                        }
                        this.kuduSession.apply(newInsert2);
                    }
                } catch (Exception e) {
                    RealTimeLoggerManager.logger().warn("KuduSink.sink@add kudu insert operation failed|table_name={}|logFields={}", realTimeLogTableName, remove, e);
                }
            }
            this.kuduSession.flush();
            RealTimeLoggerManager.logger().info("KuduSink.sink@insert logs success|table_name={}|count={}", realTimeLogTableName, Integer.valueOf(size));
        } catch (Exception e2) {
            RealTimeLoggerManager.logger().error("KuduSink.sink@insert error|table_name={}|count={}", realTimeLogTableName, Integer.valueOf(size), e2);
        }
    }

    public void addRealTimeLog(String str, List<String> list) {
        String name = Thread.currentThread().getName();
        int i = 0;
        try {
            i = Integer.parseInt(name.substring(name.lastIndexOf("-") + 1));
        } catch (Exception e) {
            RealTimeLoggerManager.logger().error(String.format("KuduSink.addRealTimeLog@consumer thread name format invalid|threadName=%s", name));
        }
        Map<String, LinkedBlockingQueue<List<String>>> map = this.thread2Caches.get(Integer.valueOf(i % KStreamManager.getInstance().getKafkaToNumStreamThreads().intValue()));
        if (map == null) {
            RealTimeLoggerManager.logger().error("KuduSink.addRealTimeLog@consumer thread map not exist|threadName={}|threadNum={}", name, Integer.valueOf(i));
            return;
        }
        LinkedBlockingQueue<List<String>> linkedBlockingQueue = map.get(str);
        if (linkedBlockingQueue == null) {
            return;
        }
        try {
            linkedBlockingQueue.put(list);
            if (linkedBlockingQueue.size() > Config.getInstance().getSinkRecordLimit()) {
                LinkedBlockingQueue<List<String>> linkedBlockingQueue2 = new LinkedBlockingQueue<>();
                linkedBlockingQueue.drainTo(linkedBlockingQueue2);
                sink(getLogHandler(str), linkedBlockingQueue2);
            }
        } catch (Exception e2) {
            RealTimeLoggerManager.logger().error("KuduSink.addRealTimeLog@add log failed|log_type={}|log={}", str, list, e2);
        }
    }

    public void stop() {
        Iterator<Map.Entry<Integer, Map<String, LinkedBlockingQueue<List<String>>>>> it = this.thread2Caches.entrySet().iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, LinkedBlockingQueue<List<String>>> entry : it.next().getValue().entrySet()) {
                LinkedBlockingQueue<List<String>> value = entry.getValue();
                if (!value.isEmpty()) {
                    LinkedBlockingQueue<List<String>> linkedBlockingQueue = new LinkedBlockingQueue<>();
                    value.drainTo(linkedBlockingQueue);
                    sink(getLogHandler(entry.getKey()), linkedBlockingQueue);
                }
            }
        }
    }

    public LogHandler newHandlerInstance(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -1992049387:
                if (str.equals("userlogout")) {
                    z = 2;
                    break;
                }
                break;
            case -1148142764:
                if (str.equals("addcash")) {
                    z = 3;
                    break;
                }
                break;
            case -1146649513:
                if (str.equals("adevent")) {
                    z = 4;
                    break;
                }
                break;
            case -1129568492:
                if (str.equals("addeviceid")) {
                    z = 5;
                    break;
                }
                break;
            case -1105869262:
                if (str.equals("sdksteplogreport")) {
                    z = 12;
                    break;
                }
                break;
            case -542898980:
                if (str.equals("deviceactive")) {
                    z = 6;
                    break;
                }
                break;
            case -400141268:
                if (str.equals("steplogreport")) {
                    z = 7;
                    break;
                }
                break;
            case -207002218:
                if (str.equals("zlclick")) {
                    z = 13;
                    break;
                }
                break;
            case 160930193:
                if (str.equals("userheartbeat")) {
                    z = true;
                    break;
                }
                break;
            case 347883699:
                if (str.equals("rolelogin")) {
                    z = 11;
                    break;
                }
                break;
            case 351382142:
                if (str.equals("userlogin")) {
                    z = false;
                    break;
                }
                break;
            case 762854718:
                if (str.equals("inappevent")) {
                    z = 9;
                    break;
                }
                break;
            case 1158297473:
                if (str.equals("downloaderstep")) {
                    z = 10;
                    break;
                }
                break;
            case 1996376072:
                if (str.equals("reinstall")) {
                    z = 8;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
            case true:
                return new ActiveLogHandler();
            case true:
                return new AddcashLogHandler();
            case true:
                return new AdeventLogHandler();
            case true:
                return new AddeviceidLogHandler();
            case true:
            case true:
                return new StepLogReportLogHandler();
            case true:
                return new ReinstallLogHandler();
            case true:
                return new InAppEventLogHandler();
            case true:
                return new DownloaderStepHandler();
            case true:
                return new RoleloginLogHandler();
            case true:
                return new SdkSteplogreportLogHandler();
            case true:
                return new ZlclickLogHandler();
            default:
                return new DefaultKuduLogHandler(str);
        }
    }

    private void coerceAndSet(String str, ColumnSchema columnSchema, PartialRow partialRow) throws Exception {
        switch (AnonymousClass1.$SwitchMap$org$apache$kudu$Type[columnSchema.getType().ordinal()]) {
            case 1:
                partialRow.addString(columnSchema.getName(), str);
                return;
            case 2:
            case 3:
            case 4:
                if (str == null || str.isEmpty()) {
                    partialRow.setNull(columnSchema.getName());
                    return;
                } else {
                    partialRow.addInt(columnSchema.getName(), Integer.parseInt(str));
                    return;
                }
            case 5:
                if (str == null || str.isEmpty()) {
                    partialRow.setNull(columnSchema.getName());
                    return;
                } else {
                    partialRow.addLong(columnSchema.getName(), Long.parseLong(str));
                    return;
                }
            case 6:
                if (str == null || str.isEmpty()) {
                    partialRow.setNull(columnSchema.getName());
                    return;
                } else {
                    partialRow.addBoolean(columnSchema.getName(), Boolean.parseBoolean(str));
                    return;
                }
            case 7:
                if (str == null || str.isEmpty()) {
                    partialRow.setNull(columnSchema.getName());
                    return;
                } else {
                    partialRow.addFloat(columnSchema.getName(), Float.parseFloat(str));
                    return;
                }
            case 8:
                if (str == null || str.isEmpty()) {
                    partialRow.setNull(columnSchema.getName());
                    return;
                } else {
                    partialRow.addDouble(columnSchema.getName(), Double.parseDouble(str));
                    return;
                }
            default:
                RealTimeLoggerManager.logger().warn("got unknown type {} for rawVal '{}'-- ignoring this column", columnSchema.getType(), str);
                return;
        }
    }

    private String getFullTableName(String str) {
        return Config.getInstance().getKuduTableWithoutImpala().booleanValue() ? Config.getInstance().getKuduDbName() + "." + str : "impala::" + Config.getInstance().getKuduDbName() + "." + str;
    }
}
