package com.ibm.ws.microprofile.reactive.messaging.kafka;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.ras.annotation.TraceObjectField;
import com.ibm.websphere.ras.annotation.TraceOptions;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaAdapterFactory;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaProducer;
import com.ibm.ws.ras.instrument.annotation.InjectedFFDC;
import io.openliberty.microprofile.reactive.messaging.internal.interfaces.MessageAccess;
import io.openliberty.microprofile.reactive.messaging.internal.interfaces.MessageAccessProvider;
import java.time.Duration;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

@InjectedFFDC
@TraceObjectField(fieldName = "tc", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
@TraceOptions
/* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/KafkaOutput.class */
public class KafkaOutput<K, V> {
    private static final TraceComponent tc = Tr.register(KafkaOutput.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");
    private final KafkaProducer<K, V> kafkaProducer;
    private final String configuredTopic;
    private final String channelName;
    private final KafkaAdapterFactory kafkaAdapterFactory;
    static final long serialVersionUID = 4183184007906130842L;
    private volatile boolean running = true;
    private final MessageAccess messageAccess = MessageAccessProvider.getMessageAccess();

    public KafkaOutput(KafkaAdapterFactory kafkaAdapterFactory, String str, String str2, KafkaProducer<K, V> kafkaProducer) {
        this.configuredTopic = str;
        this.kafkaProducer = kafkaProducer;
        this.channelName = str2;
        this.kafkaAdapterFactory = kafkaAdapterFactory;
    }

    public SubscriberBuilder<Message<V>, Void> getSubscriber() {
        return ReactiveStreams.builder().takeWhile(message -> {
            return this.running;
        }).onError(KafkaOutput::reportErrorSignal).forEach(this::sendMessage);
    }

    public void shutdown(Duration duration) {
        this.running = false;
        this.kafkaProducer.close(duration);
    }

    private void sendMessage(Message<V> message) {
        try {
            this.kafkaProducer.send(this.kafkaAdapterFactory.newProducerRecord(this.configuredTopic, this.channelName, message.getPayload()), (recordMetadata, exc) -> {
                if (exc == null) {
                    message.ack();
                } else {
                    reportSendException(exc);
                    this.messageAccess.nack(message, exc);
                }
            });
        } catch (Exception e) {
            FFDCFilter.processException(e, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaOutput", "69", this, new Object[]{message});
            reportSendException(e);
            this.messageAccess.nack(message, e);
            throw e;
        }
    }

    private static void reportSendException(Throwable th) {
        Tr.error(tc, "kafka.send.error.CWMRX1003E", new Object[]{th});
    }

    private static void reportErrorSignal(Throwable th) {
        Tr.error(tc, "kafka.output.error.signal.CWMRX1004E", new Object[]{th});
    }
}
