package com.ibm.ws.microprofile.reactive.messaging.kafka;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.ras.annotation.TraceObjectField;
import com.ibm.websphere.ras.annotation.TraceOptions;
import com.ibm.ws.ras.instrument.annotation.InjectedFFDC;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@InjectedFFDC
@TraceObjectField(fieldName = "tc", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
@TraceOptions
/* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/Batcher.class */
public class Batcher<T> {
    private static final TraceComponent tc = Tr.register(Batcher.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");
    private final int maxBatchSize;
    private final Duration maxBatchTime;
    private final ProcessBatchAction<T> processBatchAction;
    private final ScheduledExecutorService executor;
    private boolean closed;
    private List<T> batchList;
    private Future<?> pendingBatchAction;
    static final long serialVersionUID = 576754537990526995L;

    @InjectedFFDC
    @TraceObjectField(fieldName = "$$$tc$$$", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
    @TraceOptions
    /* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/Batcher$BatcherBuilder.class */
    public static class BatcherBuilder<T> {
        private int maxBatchSize;
        private Duration maxBatchTime;
        private ProcessBatchAction<T> processBatchAction;
        private ScheduledExecutorService executor;
        static final long serialVersionUID = 4550765020788158690L;
        private static final /* synthetic */ TraceComponent $$$tc$$$ = Tr.register("com.ibm.ws.microprofile.reactive.messaging.kafka.Batcher$BatcherBuilder", BatcherBuilder.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");

        private BatcherBuilder() {
            this.maxBatchSize = -1;
        }

        public BatcherBuilder<T> withMaxBatchSize(int i) {
            this.maxBatchSize = i;
            return this;
        }

        public BatcherBuilder<T> withMaxBatchTime(Duration duration) {
            this.maxBatchTime = duration;
            return this;
        }

        public BatcherBuilder<T> withProcessBatchAction(ProcessBatchAction<T> processBatchAction) {
            this.processBatchAction = processBatchAction;
            return this;
        }

        public BatcherBuilder<T> withExecutor(ScheduledExecutorService scheduledExecutorService) {
            this.executor = scheduledExecutorService;
            return this;
        }

        public Batcher<T> build() {
            if (this.maxBatchTime != null && this.executor == null) {
                throw new IllegalStateException(Tr.formatMessage(Batcher.tc, "internal.kafka.connector.error.CWMRX1000E", new Object[]{"Executor must be set if maxBatchTime is set"}));
            }
            if (this.maxBatchSize == -1 && this.maxBatchTime == null) {
                throw new IllegalStateException(Tr.formatMessage(Batcher.tc, "internal.kafka.connector.error.CWMRX1000E", new Object[]{"Either maxBatchSize or maxBatchTime must be set"}));
            }
            if (this.processBatchAction == null) {
                throw new IllegalStateException(Tr.formatMessage(Batcher.tc, "internal.kafka.connector.error.CWMRX1000E", new Object[]{"processBatchAction must be set"}));
            }
            return new Batcher<>(this.maxBatchSize, this.maxBatchTime, this.processBatchAction, this.executor);
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/Batcher$ProcessBatchAction.class */
    public interface ProcessBatchAction<T> {
        void processBatch(List<T> list);
    }

    public static <T> BatcherBuilder<T> create(Class<T> cls) {
        return new BatcherBuilder<>();
    }

    private Batcher(int i, Duration duration, ProcessBatchAction<T> processBatchAction, ScheduledExecutorService scheduledExecutorService) {
        this.maxBatchSize = i;
        this.maxBatchTime = duration;
        this.processBatchAction = processBatchAction;
        this.executor = scheduledExecutorService;
        this.closed = false;
        this.batchList = new ArrayList();
        this.pendingBatchAction = null;
    }

    public void addToBatch(T t) {
        List<T> list = null;
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.batchList.add(t);
            if (this.maxBatchSize != -1 && this.batchList.size() >= this.maxBatchSize) {
                list = startNewBatch();
            } else if (this.maxBatchTime != null && this.pendingBatchAction == null) {
                this.pendingBatchAction = this.executor.schedule(this::processBatch, this.maxBatchTime.toNanos(), TimeUnit.NANOSECONDS);
            }
            if (list != null) {
                this.processBatchAction.processBatch(list);
            }
        }
    }

    public void close() {
        synchronized (this) {
            this.closed = true;
            if (this.pendingBatchAction != null) {
                this.pendingBatchAction.cancel(false);
            }
        }
    }

    private List<T> startNewBatch() {
        synchronized (this) {
            if (this.batchList.isEmpty()) {
                return null;
            }
            if (this.pendingBatchAction != null) {
                this.pendingBatchAction.cancel(false);
                this.pendingBatchAction = null;
            }
            List<T> list = this.batchList;
            this.batchList = new ArrayList();
            return list;
        }
    }

    private void processBatch() {
        List<T> startNewBatch;
        synchronized (this) {
            startNewBatch = startNewBatch();
        }
        if (startNewBatch != null) {
            this.processBatchAction.processBatch(startNewBatch);
        }
    }
}
