package org.apache.flume.sink;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.api.AbstractRpcClient;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.source.SyslogSourceConfigurationConstants;
import org.apache.thrift.protocol.TMultiplexedProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/AbstractRpcSink.class */
public abstract class AbstractRpcSink extends AbstractSink implements Configurable, BatchSizeSupported {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) AbstractRpcSink.class);
    private String hostname;
    private Integer port;
    private RpcClient client;
    private Properties clientProps;
    private SinkCounter sinkCounter;
    private int cxnResetInterval;
    private AtomicBoolean resetConnectionFlag;
    private final int DEFAULT_CXN_RESET_INTERVAL = 0;
    private final ScheduledExecutorService cxnResetExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Rpc Sink Reset Thread").build());
    private int batchSize;

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.clientProps = new Properties();
        this.hostname = context.getString(SyslogSourceConfigurationConstants.CONFIG_KEEP_FIELDS_HOSTNAME);
        this.port = context.getInteger("port");
        Preconditions.checkState(this.hostname != null, "No hostname specified");
        Preconditions.checkState(this.port != null, "No port specified");
        this.clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
        this.clientProps.setProperty("hosts.h1", this.hostname + TMultiplexedProtocol.SEPARATOR + this.port);
        UnmodifiableIterator<Map.Entry<String, String>> it = context.getParameters().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, String> next = it.next();
            this.clientProps.setProperty(next.getKey(), next.getValue());
        }
        this.batchSize = AbstractRpcClient.parseBatchSize(this.clientProps);
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
        this.cxnResetInterval = context.getInteger("reset-connection-interval", 0).intValue();
        if (this.cxnResetInterval == 0) {
            logger.info("Connection reset is set to " + String.valueOf(0) + ". Will not reset connection to next hop");
        }
    }

    protected abstract RpcClient initializeRpcClient(Properties properties);

    private void createConnection() throws FlumeException {
        if (this.client == null) {
            logger.info("Rpc sink {}: Building RpcClient with hostname: {}, port: {}", getName(), this.hostname, this.port);
            try {
                this.resetConnectionFlag = new AtomicBoolean(false);
                this.client = initializeRpcClient(this.clientProps);
                Preconditions.checkNotNull(this.client, "Rpc Client could not be initialized. " + getName() + " could not be started");
                this.sinkCounter.incrementConnectionCreatedCount();
                if (this.cxnResetInterval > 0) {
                    this.cxnResetExecutor.schedule(new Runnable() { // from class: org.apache.flume.sink.AbstractRpcSink.1
                        @Override // java.lang.Runnable
                        public void run() {
                            AbstractRpcSink.this.resetConnectionFlag.set(true);
                        }
                    }, this.cxnResetInterval, TimeUnit.SECONDS);
                }
                logger.debug("Rpc sink {}: Created RpcClient: {}", getName(), this.client);
            } catch (Exception e) {
                this.sinkCounter.incrementConnectionFailedCount();
                if (!(e instanceof FlumeException)) {
                    throw new FlumeException(e);
                }
                throw ((FlumeException) e);
            }
        }
    }

    private void resetConnection() {
        try {
            destroyConnection();
            createConnection();
        } catch (Throwable th) {
            logger.error("Error while trying to expire connection", th);
        }
    }

    private void destroyConnection() {
        if (this.client != null) {
            logger.debug("Rpc sink {} closing Rpc client: {}", getName(), this.client);
            try {
                this.client.close();
                this.sinkCounter.incrementConnectionClosedCount();
            } catch (FlumeException e) {
                this.sinkCounter.incrementConnectionFailedCount();
                logger.error("Rpc sink " + getName() + ": Attempt to close Rpc client failed. Exception follows.", (Throwable) e);
            }
        }
        this.client = null;
    }

    private void verifyConnection() throws FlumeException {
        if (this.client == null) {
            createConnection();
        } else {
            if (this.client.isActive()) {
                return;
            }
            destroyConnection();
            createConnection();
        }
    }

    @Override // org.apache.flume.sink.AbstractSink, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Starting {}...", this);
        this.sinkCounter.start();
        try {
            createConnection();
        } catch (FlumeException e) {
            logger.warn("Unable to create Rpc client using hostname: " + this.hostname + ", port: " + this.port, (Throwable) e);
            destroyConnection();
        }
        super.start();
        logger.info("Rpc sink {} started.", getName());
    }

    @Override // org.apache.flume.sink.AbstractSink, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Rpc sink {} stopping...", getName());
        destroyConnection();
        this.cxnResetExecutor.shutdown();
        try {
            if (this.cxnResetExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.cxnResetExecutor.shutdownNow();
            }
        } catch (Exception e) {
            logger.error("Interrupted while waiting for connection reset executor to shut down");
        }
        this.sinkCounter.stop();
        super.stop();
        logger.info("Rpc sink {} stopped. Metrics: {}", getName(), this.sinkCounter);
    }

    @Override // org.apache.flume.sink.AbstractSink
    public String toString() {
        return "RpcSink " + getName() + " { host: " + this.hostname + ", port: " + this.port + " }";
    }

    @Override // org.apache.flume.Sink
    public Sink.Status process() throws EventDeliveryException {
        Event take;
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        if (this.resetConnectionFlag.get()) {
            resetConnection();
            this.resetConnectionFlag.set(false);
        }
        try {
            try {
                transaction.begin();
                verifyConnection();
                LinkedList newLinkedList = Lists.newLinkedList();
                for (int i = 0; i < this.client.getBatchSize() && (take = channel.take()) != null; i++) {
                    newLinkedList.add(take);
                }
                int size = newLinkedList.size();
                int batchSize = this.client.getBatchSize();
                if (size == 0) {
                    this.sinkCounter.incrementBatchEmptyCount();
                    status = Sink.Status.BACKOFF;
                } else {
                    if (size < batchSize) {
                        this.sinkCounter.incrementBatchUnderflowCount();
                    } else {
                        this.sinkCounter.incrementBatchCompleteCount();
                    }
                    this.sinkCounter.addToEventDrainAttemptCount(size);
                    this.client.appendBatch(newLinkedList);
                }
                transaction.commit();
                this.sinkCounter.addToEventDrainSuccessCount(size);
                transaction.close();
            } catch (Throwable th) {
                transaction.rollback();
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                if (!(th instanceof ChannelException)) {
                    this.sinkCounter.incrementEventWriteFail();
                    destroyConnection();
                    throw new EventDeliveryException("Failed to send events", th);
                }
                logger.error("Rpc Sink " + getName() + ": Unable to get event from channel " + channel.getName() + ". Exception follows.", th);
                this.sinkCounter.incrementChannelReadFail();
                status = Sink.Status.BACKOFF;
                transaction.close();
            }
            return status;
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }

    @VisibleForTesting
    RpcClient getUnderlyingClient() {
        return this.client;
    }

    @Override // org.apache.flume.conf.BatchSizeSupported
    public long getBatchSize() {
        return this.batchSize;
    }
}
