package com.ibm.ws.microprofile.reactive.messaging.kafka;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.ras.annotation.TraceObjectField;
import com.ibm.websphere.ras.annotation.TraceOptions;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.ffdc.annotation.FFDCIgnore;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.ConsumerRebalanceListener;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.ConsumerRecord;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.ConsumerRecords;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaAdapterFactory;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaConsumer;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.OffsetAndMetadata;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.TopicPartition;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.WakeupException;
import com.ibm.ws.ras.instrument.annotation.InjectedFFDC;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

@InjectedFFDC
@TraceObjectField(fieldName = "tc", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
@TraceOptions
/* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/KafkaInput.class */
public class KafkaInput<K, V> implements ConsumerRebalanceListener {
    private static final TraceComponent tc = Tr.register(KafkaInput.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");
    private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
    private final KafkaConsumer<K, V> kafkaConsumer;
    private final ExecutorService executor;
    private final Collection<String> topics;
    private PublisherBuilder<Message<V>> publisher;
    private final KafkaAdapterFactory kafkaAdapterFactory;
    private final ThresholdCounter unackedMessageCounter;
    private final PartitionTrackerFactory partitionTrackerFactory;
    static final long serialVersionUID = 8893472182362959538L;
    private boolean subscribed = false;
    private volatile boolean running = true;
    private volatile Throwable error = null;
    private volatile Map<TopicPartition, PartitionTracker> partitionTrackers = Collections.emptyMap();
    private volatile Collection<TopicPartition> newPartitionsPending = null;
    private final ReentrantLock lock = new ReentrantLock();
    private final ConcurrentLinkedQueue<KafkaConsumerAction> tasks = new ConcurrentLinkedQueue<>();

    @FunctionalInterface
    /* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/KafkaInput$KafkaConsumerAction.class */
    public interface KafkaConsumerAction {
        void run(KafkaConsumer<?, ?> kafkaConsumer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    @InjectedFFDC
    @TraceObjectField(fieldName = "$$$tc$$$", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
    @TraceOptions
    /* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/KafkaInput$TrackedMessage.class */
    public static class TrackedMessage<V> {
        private final Message<V> message;
        private final PartitionTracker tracker;
        static final long serialVersionUID = -1747247204024794888L;
        private static final /* synthetic */ TraceComponent $$$tc$$$ = Tr.register("com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaInput$TrackedMessage", TrackedMessage.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");

        public TrackedMessage(Message<V> message, PartitionTracker partitionTracker) {
            this.message = message;
            this.tracker = partitionTracker;
        }
    }

    public KafkaInput(KafkaAdapterFactory kafkaAdapterFactory, PartitionTrackerFactory partitionTrackerFactory, KafkaConsumer<K, V> kafkaConsumer, ExecutorService executorService, String str, int i) {
        this.kafkaConsumer = kafkaConsumer;
        this.executor = executorService;
        this.topics = Collections.singleton(str);
        this.kafkaAdapterFactory = kafkaAdapterFactory;
        this.partitionTrackerFactory = partitionTrackerFactory;
        if (i > 0) {
            this.unackedMessageCounter = new ThresholdCounterImpl(i);
        } else {
            this.unackedMessageCounter = ThresholdCounter.UNLIMITED;
        }
    }

    public PublisherBuilder<Message<V>> getPublisher() {
        if (this.publisher == null) {
            this.publisher = createPublisher();
        }
        return this.publisher;
    }

    private PublisherBuilder<Message<V>> createPublisher() {
        return ReactiveStreams.generate(() -> {
            return 0;
        }).flatMapCompletionStage(num -> {
            return this.unackedMessageCounter.waitForBelowThreshold().thenCompose(r3 -> {
                return pollKafkaAsync();
            });
        }).flatMap(Function.identity()).peek(message -> {
            this.unackedMessageCounter.increment();
        }).takeWhile(message2 -> {
            return this.running;
        }).flatMap(this::errorMapper).onError(th -> {
            shutdown();
        });
    }

    private PublisherBuilder<Message<V>> errorMapper(Message<V> message) {
        return this.error != null ? ReactiveStreams.failed(this.error) : ReactiveStreams.of(message);
    }

    public CompletionStage<Void> commitOffsets(PartitionTracker partitionTracker, OffsetAndMetadata offsetAndMetadata) {
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            Map singletonMap = Collections.singletonMap(partitionTracker.getTopicPartition(), offsetAndMetadata);
            runAction(kafkaConsumer -> {
                if (!partitionTracker.isClosed()) {
                    kafkaConsumer.commitAsync(singletonMap, (map, exc) -> {
                        if (exc != null) {
                            Tr.warning(tc, "kafka.read.offsets.commit.warning.CWMRX1001W", new Object[]{exc});
                            completableFuture.completeExceptionally(exc);
                        } else {
                            if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                                Tr.debug(tc, "Committed offsets successfully", new Object[]{map});
                            }
                            completableFuture.complete(null);
                        }
                    });
                    return;
                }
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    Tr.debug(this, tc, "Rejecting commit attempt because partition is closed", new Object[]{this});
                }
                completableFuture.completeExceptionally(new Exception("Partition is closed"));
            });
            return completableFuture;
        } catch (Throwable th) {
            FFDCFilter.processException(th, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaInput", "174", this, new Object[]{partitionTracker, offsetAndMetadata});
            Tr.warning(tc, "kafka.read.offsets.commit.warning.CWMRX1001W", new Object[]{th});
            completableFuture.completeExceptionally(th);
            return completableFuture;
        }
    }

    private static <T> Void logPollFailure(T t, Throwable th) {
        if (th == null) {
            return null;
        }
        Tr.error(tc, "kafka.poll.error.CWMRX1002E", new Object[]{th});
        return null;
    }

    public void shutdown() {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
            Tr.event(tc, "Shutting down Kafka connection", new Object[0]);
        }
        this.running = false;
        this.kafkaConsumer.wakeup();
        this.lock.lock();
        try {
            this.kafkaConsumer.close();
            Iterator<PartitionTracker> it = this.partitionTrackers.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
        } finally {
            this.lock.unlock();
        }
    }

    @FFDCIgnore({WakeupException.class, RejectedExecutionException.class})
    private CompletionStage<PublisherBuilder<Message<V>>> pollKafkaAsync() {
        if (!this.subscribed) {
            this.lock.lock();
            try {
                this.kafkaConsumer.subscribe(this.topics, this);
                this.subscribed = true;
            } finally {
                this.lock.unlock();
            }
        }
        if (!this.running) {
            if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                Tr.event(tc, "Not running, returning incomplete future", new Object[0]);
            }
            return new CompletableFuture();
        }
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.handle((v0, v1) -> {
            return logPollFailure(v0, v1);
        });
        ConsumerRecords<K, V> consumerRecords = null;
        while (true) {
            if (!this.lock.tryLock()) {
                break;
            }
            try {
                consumerRecords = this.kafkaConsumer.poll(Duration.ZERO);
                break;
            } catch (WakeupException e) {
                runPendingActions();
            } catch (Throwable th) {
                throw th;
            }
        }
        if (consumerRecords == null || consumerRecords.isEmpty()) {
            try {
                this.executor.submit(() -> {
                    executePollActions(completableFuture);
                });
            } catch (RejectedExecutionException e2) {
                if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                    Tr.event(tc, "Asynchronous execution rejected, returning incomplete future", new Object[0]);
                }
                return new CompletableFuture();
            }
        } else {
            completableFuture.complete(wrapInMessageStream(consumerRecords));
        }
        return completableFuture;
    }

    @FFDCIgnore({WakeupException.class})
    private void executePollActions(CompletableFuture<PublisherBuilder<Message<V>>> completableFuture) {
        this.lock.lock();
        while (this.running) {
            try {
                try {
                    runPendingActions();
                    completableFuture.complete(wrapInMessageStream(this.kafkaConsumer.poll(FOREVER)));
                    break;
                } catch (WakeupException e) {
                } catch (Throwable th) {
                    FFDCFilter.processException(th, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaInput", "278", this, new Object[]{completableFuture});
                    completableFuture.completeExceptionally(th);
                }
            } finally {
                this.lock.unlock();
            }
        }
        runPendingActions();
    }

    private PublisherBuilder<Message<V>> wrapInMessageStream(ConsumerRecords<K, V> consumerRecords) {
        Map<TopicPartition, PartitionTracker> map = this.partitionTrackers;
        return ReactiveStreams.fromIterable(consumerRecords).map(consumerRecord -> {
            try {
                PartitionTracker partitionTracker = (PartitionTracker) map.get(this.kafkaAdapterFactory.newTopicPartition(consumerRecord.topic(), consumerRecord.partition()));
                return new TrackedMessage(this.kafkaAdapterFactory.newIncomingKafkaMessage(consumerRecord, () -> {
                    this.unackedMessageCounter.decrement();
                    return partitionTracker.recordDone(consumerRecord.offset(), consumerRecord.leaderEpoch());
                }, th -> {
                    logNackedMessage(consumerRecord, th);
                    this.error = th;
                    return CompletableFuture.completedFuture(null);
                }), partitionTracker);
            } catch (Throwable th2) {
                Tr.error(tc, "internal.kafka.connector.error.CWMRX1000E", new Object[]{th2});
                throw th2;
            }
        }).filter(trackedMessage -> {
            return !trackedMessage.tracker.isClosed();
        }).map(trackedMessage2 -> {
            return trackedMessage2.message;
        });
    }

    private static void logNackedMessage(ConsumerRecord<?, ?> consumerRecord, Throwable th) {
        Tr.error(tc, "kafka.input.message.nacked.CWMRX1011E", new Object[]{consumerRecord, th});
        FFDCFilter.processException(th, KafkaInput.class.getName(), "message-nack");
    }

    public void runAction(KafkaConsumerAction kafkaConsumerAction) {
        this.tasks.add(kafkaConsumerAction);
        this.kafkaConsumer.wakeup();
        runPendingActions();
    }

    public void runPendingActions() {
        while (!this.tasks.isEmpty() && this.lock.tryLock()) {
            try {
                if (!this.running) {
                    return;
                }
                while (true) {
                    KafkaConsumerAction poll = this.tasks.poll();
                    if (poll != null) {
                        try {
                            poll.run(this.kafkaConsumer);
                        } catch (Throwable th) {
                            FFDCFilter.processException(th, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaInput", "368", this, new Object[0]);
                            Tr.error(tc, "internal.kafka.connector.error.CWMRX1000E", new Object[]{th});
                            throw th;
                        }
                    }
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled() && this.newPartitionsPending != null) {
            Tr.debug(this, tc, "onPartitionsRevoked called while new partitions pending:", new Object[]{this.newPartitionsPending});
        }
        HashMap hashMap = new HashMap(this.partitionTrackers);
        for (TopicPartition topicPartition : collection) {
            PartitionTracker partitionTracker = (PartitionTracker) hashMap.get(topicPartition);
            if (partitionTracker != null) {
                partitionTracker.close();
                hashMap.remove(topicPartition);
            } else if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                Tr.debug(this, tc, "Partition revoked that we didn't know we were assigned", new Object[]{topicPartition});
            }
        }
        this.partitionTrackers = hashMap;
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        if (this.newPartitionsPending != null) {
            if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                Tr.debug(this, tc, "onPartitionsAssigned called with pending partitions", new Object[]{this.newPartitionsPending});
            }
            this.newPartitionsPending.addAll(collection);
        } else {
            this.newPartitionsPending = new HashSet(collection);
        }
        HashMap hashMap = new HashMap(this.partitionTrackers);
        for (TopicPartition topicPartition : this.newPartitionsPending) {
            hashMap.put(topicPartition, this.partitionTrackerFactory.create(this, topicPartition, this.kafkaConsumer.position(topicPartition)));
        }
        this.newPartitionsPending = null;
        this.partitionTrackers = hashMap;
    }
}
