package com.ibm.ws.microprofile.reactive.messaging.kafka;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.ras.annotation.TraceObjectField;
import com.ibm.websphere.ras.annotation.TraceOptions;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.ffdc.annotation.FFDCIgnore;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.ConsumerRecord;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.ConsumerRecords;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaAdapterFactory;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaConsumer;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.OffsetAndMetadata;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.TopicPartition;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.WakeupException;
import com.ibm.ws.ras.instrument.annotation.InjectedFFDC;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

@InjectedFFDC
@TraceObjectField(fieldName = "tc", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
@TraceOptions
/* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/KafkaInput.class */
public class KafkaInput<K, V> {
    private static final TraceComponent tc = Tr.register(KafkaInput.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");
    private static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE);
    private final KafkaConsumer<K, V> kafkaConsumer;
    private final ExecutorService executor;
    private final Collection<String> topics;
    private PublisherBuilder<Message<V>> publisher;
    private final AckTracker ackTracker;
    static final long serialVersionUID = 5473976236015471039L;
    private boolean subscribed = false;
    private volatile boolean running = true;
    private final ReentrantLock lock = new ReentrantLock();
    private final ConcurrentLinkedQueue<KafkaConsumerAction> tasks = new ConcurrentLinkedQueue<>();

    @FunctionalInterface
    /* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/KafkaInput$KafkaConsumerAction.class */
    public interface KafkaConsumerAction {
        void run(KafkaConsumer<?, ?> kafkaConsumer);
    }

    public KafkaInput(KafkaAdapterFactory kafkaAdapterFactory, KafkaConsumer<K, V> kafkaConsumer, ExecutorService executorService, String str, AckTracker ackTracker) {
        this.kafkaConsumer = kafkaConsumer;
        this.executor = executorService;
        this.topics = Collections.singleton(str);
        this.ackTracker = ackTracker;
        if (ackTracker != null) {
            this.ackTracker.setCommitAction(this::commitOffsets);
        }
    }

    public PublisherBuilder<Message<V>> getPublisher() {
        if (this.publisher == null) {
            this.publisher = createPublisher();
        }
        return this.publisher;
    }

    private PublisherBuilder<Message<V>> createPublisher() {
        return this.ackTracker != null ? ReactiveStreams.generate(() -> {
            return 0;
        }).flatMapCompletionStage(num -> {
            return this.ackTracker.waitForAckThreshold().thenCompose(r3 -> {
                return pollKafkaAsync();
            });
        }).flatMap(Function.identity()).map(this::wrapInMessage).takeWhile(message -> {
            return this.running;
        }) : ReactiveStreams.generate(() -> {
            return 0;
        }).flatMapCompletionStage(num2 -> {
            return pollKafkaAsync();
        }).flatMap(Function.identity()).map(consumerRecord -> {
            return Message.of(consumerRecord.value());
        }).takeWhile(message2 -> {
            return this.running;
        });
    }

    private CompletionStage<Void> commitOffsets(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            Map singletonMap = Collections.singletonMap(topicPartition, offsetAndMetadata);
            runAction(kafkaConsumer -> {
                kafkaConsumer.commitAsync(singletonMap, (map, exc) -> {
                    if (exc != null) {
                        Tr.warning(tc, "kafka.read.offsets.commit.warning.CWMRX1001W", new Object[]{exc});
                        completableFuture.completeExceptionally(exc);
                    } else {
                        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                            Tr.debug(tc, "Committed offsets successfully", new Object[]{map});
                        }
                        completableFuture.complete(null);
                    }
                });
            });
            return completableFuture;
        } catch (Throwable th) {
            FFDCFilter.processException(th, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaInput", "128", this, new Object[]{topicPartition, offsetAndMetadata});
            Tr.warning(tc, "kafka.read.offsets.commit.warning.CWMRX1001W", new Object[]{th});
            completableFuture.completeExceptionally(th);
            return completableFuture;
        }
    }

    private static <T> Void logPollFailure(T t, Throwable th) {
        if (th == null) {
            return null;
        }
        Tr.error(tc, "kafka.poll.error.CWMRX1002E", new Object[]{th});
        return null;
    }

    private Message<V> wrapInMessage(ConsumerRecord<K, V> consumerRecord) {
        try {
            return Message.of(consumerRecord.value(), this.ackTracker.trackRecord(consumerRecord));
        } catch (Throwable th) {
            FFDCFilter.processException(th, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaInput", "145", this, new Object[]{consumerRecord});
            Tr.error(tc, "internal.kafka.connector.error.CWMRX1000E", new Object[]{th});
            throw th;
        }
    }

    public void shutdown() {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
            Tr.event(tc, "Shutting down Kafka connection", new Object[0]);
        }
        this.running = false;
        this.kafkaConsumer.wakeup();
        this.lock.lock();
        try {
            this.kafkaConsumer.close();
            if (this.ackTracker != null) {
                this.ackTracker.shutdown();
            }
        } finally {
            this.lock.unlock();
        }
    }

    @FFDCIgnore({WakeupException.class, RejectedExecutionException.class})
    private CompletionStage<PublisherBuilder<ConsumerRecord<K, V>>> pollKafkaAsync() {
        if (!this.subscribed) {
            this.lock.lock();
            try {
                if (this.ackTracker != null) {
                    this.kafkaConsumer.subscribe(this.topics, this.ackTracker);
                } else {
                    this.kafkaConsumer.subscribe(this.topics);
                }
                this.subscribed = true;
            } finally {
                this.lock.unlock();
            }
        }
        if (!this.running) {
            if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                Tr.event(tc, "Not running, returning incomplete future", new Object[0]);
            }
            return new CompletableFuture();
        }
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.handle((v0, v1) -> {
            return logPollFailure(v0, v1);
        });
        ConsumerRecords consumerRecords = null;
        while (true) {
            if (!this.lock.tryLock()) {
                break;
            }
            try {
                consumerRecords = this.kafkaConsumer.poll(Duration.ZERO);
                break;
            } catch (WakeupException e) {
                runPendingActions();
            } catch (Throwable th) {
                throw th;
            }
        }
        if (consumerRecords == null || consumerRecords.isEmpty()) {
            try {
                this.executor.submit(() -> {
                    executePollActions(completableFuture);
                });
            } catch (RejectedExecutionException e2) {
                if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                    Tr.event(tc, "Asynchronous execution rejected, returning incomplete future", new Object[0]);
                }
                return new CompletableFuture();
            }
        } else {
            completableFuture.complete(ReactiveStreams.fromIterable(consumerRecords));
        }
        return completableFuture;
    }

    @FFDCIgnore({WakeupException.class})
    private void executePollActions(CompletableFuture<PublisherBuilder<ConsumerRecord<K, V>>> completableFuture) {
        this.lock.lock();
        while (this.running) {
            try {
                try {
                    runPendingActions();
                    completableFuture.complete(ReactiveStreams.fromIterable(this.kafkaConsumer.poll(FOREVER)));
                    break;
                } catch (WakeupException e) {
                } catch (Throwable th) {
                    FFDCFilter.processException(th, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaInput", "245", this, new Object[]{completableFuture});
                    completableFuture.completeExceptionally(th);
                }
            } finally {
                this.lock.unlock();
            }
        }
        runPendingActions();
    }

    public void runAction(KafkaConsumerAction kafkaConsumerAction) {
        this.tasks.add(kafkaConsumerAction);
        this.kafkaConsumer.wakeup();
        runPendingActions();
    }

    private void runPendingActions() {
        while (!this.tasks.isEmpty() && this.lock.tryLock()) {
            try {
                if (!this.running) {
                    return;
                }
                while (true) {
                    KafkaConsumerAction poll = this.tasks.poll();
                    if (poll != null) {
                        try {
                            poll.run(this.kafkaConsumer);
                        } catch (Throwable th) {
                            FFDCFilter.processException(th, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaInput", "288", this, new Object[0]);
                            Tr.error(tc, "internal.kafka.connector.error.CWMRX1000E", new Object[]{th});
                            throw th;
                        }
                    }
                }
            } finally {
                this.lock.unlock();
            }
        }
    }
}
