package com.ibm.ws.microprofile.reactive.messaging.kafka;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.ras.annotation.TraceObjectField;
import com.ibm.websphere.ras.annotation.TraceOptions;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaAdapterFactory;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.TopicPartition;
import com.ibm.ws.ras.instrument.annotation.InjectedFFDC;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@InjectedFFDC
@TraceObjectField(fieldName = "tc", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
@TraceOptions
/* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/CommittingPartitionTracker.class */
public class CommittingPartitionTracker extends PartitionTracker {
    private static final TraceComponent tc = Tr.register(CommittingPartitionTracker.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");
    private final KafkaAdapterFactory factory;
    private final ScheduledExecutorService executor;
    private final KafkaInput<?, ?> kafkaInput;
    private final int maxCommitBatchSize;
    private final Duration maxCommitBatchInterval;
    private final SortedSet<CompletedWork> completedWork;
    private int outstandingUncommittedWork;
    private long committedOffset;
    private Future<?> pendingCommitTask;
    static final long serialVersionUID = -6191053165273479598L;

    @InjectedFFDC
    @TraceObjectField(fieldName = "$$$tc$$$", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
    @TraceOptions
    /* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/CommittingPartitionTracker$CompletedWork.class */
    public static class CompletedWork implements Comparable<CompletedWork> {
        private final long offset;
        private final CompletableFuture<Void> completion;
        private final Optional<Integer> leaderEpoch;
        static final long serialVersionUID = 4117644519278210751L;
        private static final /* synthetic */ TraceComponent $$$tc$$$ = Tr.register("com.ibm.ws.microprofile.reactive.messaging.kafka.CommittingPartitionTracker$CompletedWork", CompletedWork.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");

        public CompletedWork(long j, Optional<Integer> optional, CompletableFuture<Void> completableFuture) {
            this.offset = j;
            this.leaderEpoch = optional;
            this.completion = completableFuture;
        }

        public CompletableFuture<Void> getCompletion() {
            return this.completion;
        }

        public Optional<Integer> getLeaderEpoch() {
            return this.leaderEpoch;
        }

        @Override // java.lang.Comparable
        public int compareTo(CompletedWork completedWork) {
            return Long.compare(this.offset, completedWork.offset);
        }
    }

    public CommittingPartitionTracker(TopicPartition topicPartition, KafkaAdapterFactory kafkaAdapterFactory, KafkaInput<?, ?> kafkaInput, long j, ScheduledExecutorService scheduledExecutorService, int i, Duration duration) {
        super(topicPartition);
        this.completedWork = new TreeSet();
        this.outstandingUncommittedWork = 0;
        this.pendingCommitTask = null;
        this.factory = kafkaAdapterFactory;
        this.kafkaInput = kafkaInput;
        this.executor = scheduledExecutorService;
        this.committedOffset = j;
        this.maxCommitBatchSize = i;
        this.maxCommitBatchInterval = duration;
    }

    @Override // com.ibm.ws.microprofile.reactive.messaging.kafka.PartitionTracker
    public CompletionStage<Void> recordDone(long j, Optional<Integer> optional) {
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            synchronized (this.completedWork) {
                if (j < this.committedOffset ? false : this.completedWork.add(new CompletedWork(j, optional, completableFuture))) {
                    this.outstandingUncommittedWork++;
                    requestCommit();
                } else {
                    completableFuture.completeExceptionally(new IllegalStateException("recordDone called more than once for offset " + j));
                }
            }
        } catch (Throwable th) {
            FFDCFilter.processException(th, "com.ibm.ws.microprofile.reactive.messaging.kafka.CommittingPartitionTracker", "133", this, new Object[]{Long.valueOf(j), optional});
            Tr.error(tc, "internal.kafka.connector.error.CWMRX1000E", new Object[]{th});
            completableFuture.completeExceptionally(new KafkaConnectorException(Tr.formatMessage(tc, "internal.kafka.connector.error.CWMRX1000E", new Object[0]), th));
        }
        return completableFuture;
    }

    private void requestCommit() {
        if (this.maxCommitBatchSize > 0 && this.outstandingUncommittedWork > this.maxCommitBatchSize) {
            if (this.pendingCommitTask != null) {
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    Tr.debug(this, tc, "Cancelling scheduled commit task because we're committing right now", new Object[]{this});
                }
                this.pendingCommitTask.cancel(true);
            }
            commitCompletedWork();
            return;
        }
        if (this.pendingCommitTask != null || this.maxCommitBatchInterval.isZero()) {
            return;
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            Tr.debug(this, tc, "Scheduling deferred commit task", new Object[]{this});
        }
        this.pendingCommitTask = this.executor.schedule(this::commitCompletedWork, this.maxCommitBatchInterval.toNanos(), TimeUnit.NANOSECONDS);
    }

    private void commitCompletedWork() {
        synchronized (this.completedWork) {
            if (Thread.interrupted()) {
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    Tr.debug(this, tc, "Commit task running but has been cancelled", new Object[]{this});
                }
                return;
            }
            if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                Tr.debug(this, tc, "Checking for new work to commit, last committed offset is " + this.committedOffset, new Object[]{this});
                Tr.debug(this, tc, "Current completed work", new Object[]{this.completedWork});
            }
            long j = this.committedOffset;
            CompletedWork completedWork = null;
            for (CompletedWork completedWork2 : this.completedWork) {
                if (completedWork2.offset >= j) {
                    if (completedWork2.offset != j) {
                        break;
                    }
                    j++;
                    completedWork = completedWork2;
                }
            }
            if (completedWork != null) {
                long j2 = this.committedOffset;
                long j3 = j;
                if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                    Tr.event(this, tc, "Committing from " + j2 + " to " + j3, new Object[]{this});
                }
                commitUpTo(completedWork).whenCompleteAsync((r12, th) -> {
                    processCommittedWork(j2, j3, th);
                }, this.executor);
            }
            this.outstandingUncommittedWork = (int) (this.outstandingUncommittedWork - (j - this.committedOffset));
            this.pendingCommitTask = null;
            this.committedOffset = j;
        }
    }

    private CompletionStage<Void> commitUpTo(CompletedWork completedWork) {
        return this.kafkaInput.commitOffsets(this, this.factory.newOffsetAndMetadata(completedWork.offset + 1, completedWork.leaderEpoch, (String) null));
    }

    private void processCommittedWork(long j, long j2, Throwable th) {
        boolean z = false;
        if (th != null && this.factory.getRetryableExceptionClass().isInstance(th)) {
            z = true;
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            if (th == null) {
                Tr.debug(this, tc, "Commit from " + j + " to " + j2 + " completed successfully", new Object[]{this});
            } else if (z) {
                Tr.debug(this, tc, "Commit from " + j + " to " + j2 + " failed with retriable exception", new Object[]{this, th});
            } else {
                Tr.debug(this, tc, "Commit from " + j + " to " + j2 + " failed with non-retriable exception", new Object[]{this, th});
            }
        }
        if (z) {
            synchronized (this.completedWork) {
                if (j2 == this.committedOffset) {
                    if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                        Tr.debug(this, tc, "Retriable exception is for the most recent commit attempt", new Object[]{this});
                    }
                    CompletedWork completedWork = null;
                    for (CompletedWork completedWork2 : this.completedWork) {
                        if (completedWork2.offset >= j2) {
                            break;
                        } else {
                            completedWork = completedWork2;
                        }
                    }
                    if (completedWork != null) {
                        commitUpTo(completedWork).whenCompleteAsync((r12, th2) -> {
                            processCommittedWork(j, j2, th2);
                        }, this.executor);
                    } else if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                        Tr.debug(this, tc, "Would retry commit to " + j2 + " but completedWork contains nothing before this offset", new Object[0]);
                    }
                }
            }
            return;
        }
        ArrayList arrayList = new ArrayList();
        synchronized (this.completedWork) {
            Iterator<CompletedWork> it = this.completedWork.iterator();
            while (it.hasNext()) {
                CompletedWork next = it.next();
                if (next.offset >= j) {
                    if (next.offset >= j2) {
                        break;
                    }
                    arrayList.add(next);
                    it.remove();
                }
            }
        }
        if (th == null) {
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((CompletedWork) it2.next()).completion.complete(null);
            }
        } else {
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                ((CompletedWork) it3.next()).completion.completeExceptionally(th);
            }
        }
    }

    @Override // com.ibm.ws.microprofile.reactive.messaging.kafka.PartitionTracker
    public void close() {
        synchronized (this.completedWork) {
            try {
                this.kafkaInput.runPendingActions();
                if (!this.completedWork.isEmpty()) {
                    commitCompletedWork();
                }
                super.close();
            } catch (Throwable th) {
                super.close();
                throw th;
            }
        }
    }
}
