package com.ibm.ws.microprofile.reactive.messaging.kafka;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.ras.annotation.TraceObjectField;
import com.ibm.websphere.ras.annotation.TraceOptions;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.ffdc.annotation.FFDCIgnore;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaAdapterException;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaAdapterFactory;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaProducer;
import com.ibm.ws.ras.instrument.annotation.InjectedFFDC;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

@ApplicationScoped
@InjectedFFDC
@TraceObjectField(fieldName = "tc", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
@Connector(KafkaConnectorConstants.CONNECTOR_NAME)
@TraceOptions
/* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/KafkaOutgoingConnector.class */
public class KafkaOutgoingConnector implements OutgoingConnectorFactory {
    private static final TraceComponent tc = Tr.register(KafkaOutgoingConnector.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");

    @Inject
    private KafkaAdapterFactory kafkaAdapterFactory;
    private final List<KafkaOutput<?, ?>> kafkaOutputs = Collections.synchronizedList(new ArrayList());
    static final long serialVersionUID = 369199060001387231L;

    public SubscriberBuilder<Message<Object>, Void> getSubscriberBuilder(Config config) {
        String str = (String) config.getValue("channel-name", String.class);
        try {
            int intValue = ((Integer) config.getOptionalValue(KafkaConnectorConstants.CREATION_RETRY_SECONDS, Integer.class).orElse(0)).intValue();
            HashMap hashMap = new HashMap();
            hashMap.put(KafkaConnectorConstants.KEY_SERIALIZER, KafkaConnectorConstants.STRING_SERIALIZER);
            hashMap.put(KafkaConnectorConstants.VALUE_SERIALIZER, KafkaConnectorConstants.STRING_SERIALIZER);
            hashMap.putAll((Map) StreamSupport.stream(config.getPropertyNames().spliterator(), false).filter(str2 -> {
                return !KafkaConnectorConstants.NON_KAFKA_PROPS.contains(str2);
            }).collect(Collectors.toMap(Function.identity(), str3 -> {
                return (String) config.getValue(str3, String.class);
            })));
            KafkaOutput<?, ?> kafkaOutput = new KafkaOutput<>(this.kafkaAdapterFactory, (String) config.getOptionalValue(KafkaConnectorConstants.TOPIC, String.class).orElse(null), str, getKafkaProducerWithRetry(hashMap, intValue, str));
            this.kafkaOutputs.add(kafkaOutput);
            return ReactiveStreams.builder().to(kafkaOutput.getSubscriber());
        } catch (Exception e) {
            FFDCFilter.processException(e, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaOutgoingConnector", "83", this, new Object[]{config});
            throw new KafkaConnectorException(Tr.formatMessage(tc, "kafka.create.outgoing.error.CWMRX1008E", new Object[]{str, e.getMessage()}), e);
        }
    }

    @FFDCIgnore({KafkaAdapterException.class})
    private <K, V> KafkaProducer<K, V> getKafkaProducerWithRetry(Map<String, Object> map, int i, String str) throws InterruptedException {
        if (i == 0) {
            return this.kafkaAdapterFactory.newKafkaProducer(map);
        }
        long nanos = Duration.ofSeconds(i).toNanos();
        long nanoTime = System.nanoTime();
        while (true) {
            try {
                return this.kafkaAdapterFactory.newKafkaProducer(map);
            } catch (KafkaAdapterException e) {
                if (System.nanoTime() - nanoTime > nanos) {
                    throw e;
                }
                Tr.warning(tc, "kafka.create.outgoing.retry.CWMRX1010W", new Object[]{str, e.getMessage()});
                Thread.sleep(1000L);
            }
        }
    }

    @PreDestroy
    private void shutdown() {
        synchronized (this.kafkaOutputs) {
            Iterator<KafkaOutput<?, ?>> it = this.kafkaOutputs.iterator();
            while (it.hasNext()) {
                it.next().shutdown(Duration.ZERO);
            }
        }
    }
}
