package com.ibm.ws.microprofile.reactive.messaging.kafka;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.ras.annotation.TraceObjectField;
import com.ibm.websphere.ras.annotation.TraceOptions;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.ConsumerRebalanceListener;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.ConsumerRecord;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaAdapterFactory;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.OffsetAndMetadata;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.TopicPartition;
import com.ibm.ws.ras.instrument.annotation.InjectedFFDC;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

@InjectedFFDC
@TraceObjectField(fieldName = "tc", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
@TraceOptions
/* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/AckTracker.class */
public class AckTracker implements ConsumerRebalanceListener {
    private static final TraceComponent tc = Tr.register(AckTracker.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");
    private final ScheduledExecutorService executor;
    private final int ackThreshold;
    private final KafkaAdapterFactory kafkaAdapterFactory;
    static final long serialVersionUID = 5797060897036150652L;
    private CommitAction commitAction = (topicPartition, offsetAndMetadata) -> {
        return CompletableFuture.completedFuture(null);
    };
    private CompletableFuture<Void> ackThresholdStage = CompletableFuture.completedFuture(null);
    private final Map<TopicPartition, PartitionAckTracker> partitionTrackers = Collections.synchronizedMap(new HashMap());
    private final AtomicInteger outstandingAcks = new AtomicInteger(0);

    @FunctionalInterface
    /* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/AckTracker$CommitAction.class */
    public interface CommitAction {
        CompletionStage<Void> doCommit(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata);
    }

    /* JADX INFO: Access modifiers changed from: private */
    @InjectedFFDC
    @TraceObjectField(fieldName = "$$$tc$$$", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
    @TraceOptions
    /* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/AckTracker$MessageAckData.class */
    public static class MessageAckData {
        private final long offset;
        private CompletableFuture<Void> completion = null;
        private final Optional<Integer> leaderEpoch;
        static final long serialVersionUID = -3157412437713254902L;
        private static final /* synthetic */ TraceComponent $$$tc$$$ = Tr.register(MessageAckData.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");

        public MessageAckData(long j, Optional<Integer> optional) {
            this.offset = j;
            this.leaderEpoch = optional;
        }

        public void setCompletion(CompletableFuture<Void> completableFuture) {
            this.completion = completableFuture;
        }

        public CompletableFuture<Void> getCompletion() {
            return this.completion;
        }

        public Optional<Integer> getLeaderEpoch() {
            return this.leaderEpoch;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @InjectedFFDC
    @TraceObjectField(fieldName = "$$$tc$$$", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
    @TraceOptions
    /* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/AckTracker$PartitionAckTracker.class */
    public class PartitionAckTracker {
        private final List<MessageAckData> unackedMessages;
        private final Batcher<MessageAckData> messageBatcher;
        private final TopicPartition partition;
        static final long serialVersionUID = 8789116834583355532L;
        private static final /* synthetic */ TraceComponent $$$tc$$$ = Tr.register(PartitionAckTracker.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");

        private PartitionAckTracker(TopicPartition topicPartition) {
            this.messageBatcher = Batcher.create(MessageAckData.class).withMaxBatchSize(2000).withMaxBatchTime(Duration.ofMillis(500L)).withProcessBatchAction(this::commitAckBatch).withExecutor(AckTracker.this.executor).build();
            this.partition = topicPartition;
            this.unackedMessages = new LinkedList();
        }

        public MessageAckData recordSent(ConsumerRecord<?, ?> consumerRecord) {
            MessageAckData messageAckData = new MessageAckData(consumerRecord.offset(), consumerRecord.leaderEpoch());
            synchronized (this.unackedMessages) {
                this.unackedMessages.add(messageAckData);
            }
            AckTracker.this.incrementOutstandingAcks();
            return messageAckData;
        }

        public CompletionStage<Void> recordAck(MessageAckData messageAckData) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            try {
                if (AckTracker.this.partitionTrackers.get(this.partition) == null) {
                    completableFuture.completeExceptionally(AckTracker.this.kafkaAdapterFactory.newCommitFailedException());
                } else {
                    synchronized (this.unackedMessages) {
                        messageAckData.setCompletion(completableFuture);
                        batchPendingAcks();
                    }
                    AckTracker.this.decrementOutstandingAcks();
                }
                return CompletableFuture.completedFuture(null);
            } catch (Throwable th) {
                FFDCFilter.processException(th, "com.ibm.ws.microprofile.reactive.messaging.kafka.AckTracker$PartitionAckTracker", "237", this, new Object[]{messageAckData});
                completableFuture.completeExceptionally(th);
                throw th;
            }
        }

        private void batchPendingAcks() {
            synchronized (this.unackedMessages) {
                Iterator<MessageAckData> it = this.unackedMessages.iterator();
                while (it.hasNext()) {
                    MessageAckData next = it.next();
                    if (next.getCompletion() == null) {
                        break;
                    }
                    this.messageBatcher.addToBatch(next);
                    it.remove();
                }
            }
        }

        private void commitAckBatch(List<MessageAckData> list) {
            if (list.isEmpty()) {
                return;
            }
            MessageAckData messageAckData = list.get(list.size() - 1);
            AckTracker.this.commitAction.doCommit(this.partition, AckTracker.this.kafkaAdapterFactory.newOffsetAndMetadata(messageAckData.offset + 1, messageAckData.getLeaderEpoch(), (String) null)).handleAsync((r4, th) -> {
                if (th == null) {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        ((MessageAckData) it.next()).getCompletion().complete(null);
                    }
                    return null;
                }
                Iterator it2 = list.iterator();
                while (it2.hasNext()) {
                    ((MessageAckData) it2.next()).getCompletion().completeExceptionally(th);
                }
                return null;
            }, AckTracker.this.executor);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            this.messageBatcher.close();
            synchronized (this.unackedMessages) {
                for (MessageAckData messageAckData : this.unackedMessages) {
                    if (messageAckData.getCompletion() != null) {
                        messageAckData.getCompletion().completeExceptionally(AckTracker.this.kafkaAdapterFactory.newCommitFailedException());
                    }
                }
            }
        }
    }

    public AckTracker(KafkaAdapterFactory kafkaAdapterFactory, ScheduledExecutorService scheduledExecutorService, int i) {
        this.kafkaAdapterFactory = kafkaAdapterFactory;
        this.executor = scheduledExecutorService;
        this.ackThreshold = i;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
            Tr.event(tc, collection.size() + " removed partitions", new Object[]{collection});
        }
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            PartitionAckTracker remove = this.partitionTrackers.remove(it.next());
            if (remove != null) {
                remove.close();
            }
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
            Tr.event(tc, collection.size() + " new partitions", new Object[]{collection});
        }
        for (TopicPartition topicPartition : collection) {
            if (this.partitionTrackers.get(topicPartition) == null) {
                this.partitionTrackers.put(topicPartition, new PartitionAckTracker(topicPartition));
            }
        }
    }

    public Supplier<CompletionStage<Void>> trackRecord(ConsumerRecord<?, ?> consumerRecord) {
        PartitionAckTracker partitionAckTracker = this.partitionTrackers.get(this.kafkaAdapterFactory.newTopicPartition(consumerRecord.topic(), consumerRecord.partition()));
        if (partitionAckTracker != null) {
            MessageAckData recordSent = partitionAckTracker.recordSent(consumerRecord);
            return () -> {
                return partitionAckTracker.recordAck(recordSent);
            };
        }
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.completeExceptionally(this.kafkaAdapterFactory.newCommitFailedException());
        return () -> {
            return completableFuture;
        };
    }

    public void setCommitAction(CommitAction commitAction) {
        this.commitAction = commitAction;
    }

    public CompletionStage<Void> waitForAckThreshold() {
        return this.ackThresholdStage;
    }

    public void shutdown() {
        synchronized (this.partitionTrackers) {
            Iterator<PartitionAckTracker> it = this.partitionTrackers.values().iterator();
            while (it.hasNext()) {
                PartitionAckTracker next = it.next();
                it.remove();
                try {
                    next.close();
                } catch (Exception e) {
                    FFDCFilter.processException(e, "com.ibm.ws.microprofile.reactive.messaging.kafka.AckTracker", "147", this, new Object[0]);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void incrementOutstandingAcks() {
        synchronized (this.outstandingAcks) {
            int incrementAndGet = this.outstandingAcks.incrementAndGet();
            if (incrementAndGet == this.ackThreshold) {
                if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                    Tr.event(tc, "Too many outstanding unacked messages, stopping polling kafka. Current count: " + incrementAndGet, new Object[0]);
                }
                this.ackThresholdStage = new CompletableFuture<>();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void decrementOutstandingAcks() {
        int andDecrement;
        CompletableFuture<Void> completableFuture;
        synchronized (this.outstandingAcks) {
            andDecrement = this.outstandingAcks.getAndDecrement();
            completableFuture = this.ackThresholdStage;
        }
        if (andDecrement == this.ackThreshold) {
            if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                Tr.event(tc, "Outstanding unacked message count dropped below threshold, resuming polling kafka. Current count: " + andDecrement, new Object[0]);
            }
            completableFuture.complete(null);
        }
    }
}
