package io.smallrye.reactive.messaging.providers;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.converters.ReactiveTypeConverter;
import io.smallrye.reactive.converters.Registry;
import io.smallrye.reactive.messaging.MediatorConfiguration;
import io.smallrye.reactive.messaging.providers.helpers.KeyMultiUtils;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.i18n.ProviderExceptions;
import io.smallrye.reactive.messaging.providers.i18n.ProviderMessages;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.function.Function;
import mutiny.zero.flow.adapters.AdaptersToFlow;
import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Publisher;

/* loaded from: input_file:io/smallrye/reactive/messaging/providers/StreamTransformerMediator.class */
public class StreamTransformerMediator extends AbstractMediator {
    Function<Multi<? extends Message<?>>, Multi<? extends Message<?>>> function;
    private Multi<? extends Message<?>> publisher;
    static final /* synthetic */ boolean $assertionsDisabled;

    public StreamTransformerMediator(MediatorConfiguration mediatorConfiguration) {
        super(mediatorConfiguration);
        if (mediatorConfiguration.consumption() == MediatorConfiguration.Consumption.STREAM_OF_MESSAGE && mediatorConfiguration.production() == MediatorConfiguration.Production.STREAM_OF_PAYLOAD) {
            throw ProviderExceptions.ex.definitionProducePayloadStreamAndConsumeMessageStream(mediatorConfiguration.methodAsString());
        }
        if (mediatorConfiguration.consumption() == MediatorConfiguration.Consumption.STREAM_OF_PAYLOAD && mediatorConfiguration.production() == MediatorConfiguration.Production.STREAM_OF_MESSAGE) {
            throw ProviderExceptions.ex.definitionProduceMessageStreamAndConsumePayloadStream(mediatorConfiguration.methodAsString());
        }
        if (mediatorConfiguration.consumption() == MediatorConfiguration.Consumption.KEYED_MULTI && mediatorConfiguration.production() == MediatorConfiguration.Production.STREAM_OF_MESSAGE) {
            throw ProviderExceptions.ex.definitionProduceMessageStreamAndConsumePayloadStream(mediatorConfiguration.methodAsString());
        }
        if (mediatorConfiguration.consumption() == MediatorConfiguration.Consumption.KEYED_MULTI_MESSAGE && mediatorConfiguration.production() == MediatorConfiguration.Production.STREAM_OF_PAYLOAD) {
            throw ProviderExceptions.ex.definitionProducePayloadStreamAndConsumeMessageStream(mediatorConfiguration.methodAsString());
        }
    }

    @Override // io.smallrye.reactive.messaging.providers.AbstractMediator
    public void connectToUpstream(Multi<? extends Message<?>> multi) {
        Objects.requireNonNull(this.function);
        this.publisher = decorate(this.function.apply(convert(multi)));
    }

    @Override // io.smallrye.reactive.messaging.providers.AbstractMediator
    public Multi<? extends Message<?>> getStream() {
        Objects.requireNonNull(this.publisher);
        return this.publisher;
    }

    @Override // io.smallrye.reactive.messaging.providers.AbstractMediator
    public boolean isConnected() {
        return this.publisher != null;
    }

    @Override // io.smallrye.reactive.messaging.providers.AbstractMediator
    public void initialize(Object obj) {
        super.initialize(obj);
        switch (this.configuration.consumption()) {
            case STREAM_OF_MESSAGE:
                if (!this.configuration.usesBuilderTypes()) {
                    if (!this.configuration.usesReactiveStreams()) {
                        processMethodConsumingAPublisherOfMessages();
                        break;
                    } else {
                        processMethodConsumingAReactiveStreamsPublisherOfMessages();
                        break;
                    }
                } else {
                    processMethodConsumingAPublisherBuilderOfMessages();
                    break;
                }
            case STREAM_OF_PAYLOAD:
                if (!this.configuration.usesBuilderTypes()) {
                    if (!this.configuration.usesReactiveStreams()) {
                        processMethodConsumingAPublisherOfPayload();
                        break;
                    } else {
                        processMethodConsumingAReactiveStreamsPublisherOfPayload();
                        break;
                    }
                } else {
                    processMethodConsumingAPublisherBuilderOfPayload();
                    break;
                }
            case KEYED_MULTI:
                processMethodConsumingAPublisherOfKeyValue();
                break;
            case KEYED_MULTI_MESSAGE:
                processMethodConsumingAPublisherOfKeyValueMessage();
                break;
            default:
                throw ProviderExceptions.ex.illegalArgumentForUnexpectedConsumption(this.configuration.consumption());
        }
        if (!$assertionsDisabled && this.function == null) {
            throw new AssertionError();
        }
    }

    private void processMethodConsumingAPublisherBuilderOfMessages() {
        this.function = multi -> {
            PublisherBuilder publisherBuilder = (PublisherBuilder) invoke(ReactiveStreams.fromPublisher(AdaptersToReactiveStreams.publisher(MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration))));
            Objects.requireNonNull(publisherBuilder, ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
            return MultiUtils.publisher(AdaptersToFlow.publisher(publisherBuilder.buildRs()));
        };
    }

    private void processMethodConsumingAReactiveStreamsPublisherOfMessages() {
        this.function = multi -> {
            Publisher publisher = (Publisher) invoke(convertToDesiredReactiveStreamPublisherType(MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration)));
            Objects.requireNonNull(publisher, ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
            return MultiUtils.publisher(AdaptersToFlow.publisher(publisher));
        };
    }

    private void processMethodConsumingAPublisherOfMessages() {
        this.function = multi -> {
            Flow.Publisher publisher = (Flow.Publisher) invoke(convertToDesiredPublisherType(MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration)));
            Objects.requireNonNull(publisher, ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
            return MultiUtils.publisher(publisher);
        };
    }

    private <T> Flow.Publisher<T> convertToDesiredPublisherType(Multi<T> multi) {
        Class<?> cls = this.configuration.getParameterDescriptor().getTypes().get(0);
        if (cls.equals(Multi.class)) {
            return multi;
        }
        Optional lookup = Registry.lookup(cls);
        Multi<T> multi2 = multi;
        if (lookup.isPresent()) {
            multi2 = (Flow.Publisher) ((ReactiveTypeConverter) lookup.get()).fromFlowPublisher(multi);
        }
        return multi2;
    }

    private <T> Publisher<T> convertToDesiredReactiveStreamPublisherType(Multi<T> multi) {
        Optional lookup = Registry.lookup(this.configuration.getParameterDescriptor().getTypes().get(0));
        Publisher<T> publisher = AdaptersToReactiveStreams.publisher(multi);
        if (lookup.isPresent()) {
            publisher = (Publisher) ((ReactiveTypeConverter) lookup.get()).fromFlowPublisher(multi);
        }
        return publisher;
    }

    private void processMethodConsumingAPublisherBuilderOfPayload() {
        this.function = multi -> {
            PublisherBuilder publisherBuilder = (PublisherBuilder) invoke(ReactiveStreams.fromPublisher(AdaptersToReactiveStreams.publisher(MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration).onItem().transform((v0) -> {
                return v0.getPayload();
            }))));
            Objects.requireNonNull(publisherBuilder, ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
            return MultiUtils.publisher(AdaptersToFlow.publisher(publisherBuilder.buildRs())).onItem().transform(Message::of);
        };
    }

    private void processMethodConsumingAReactiveStreamsPublisherOfPayload() {
        this.function = multi -> {
            Publisher publisher = (Publisher) invoke(convertToDesiredReactiveStreamPublisherType(MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration).onItem().transform((v0) -> {
                return v0.getPayload();
            })));
            Objects.requireNonNull(publisher, ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
            return Multi.createFrom().publisher(AdaptersToFlow.publisher(publisher)).onItem().transform(Message::of);
        };
    }

    private void processMethodConsumingAPublisherOfPayload() {
        this.function = multi -> {
            Flow.Publisher publisher = (Flow.Publisher) invoke(convertToDesiredPublisherType(MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration).onItem().transform((v0) -> {
                return v0.getPayload();
            })));
            Objects.requireNonNull(publisher, ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
            return MultiUtils.publisher(publisher).onItem().transform(Message::of);
        };
    }

    private void processMethodConsumingAPublisherOfKeyValue() {
        this.function = multi -> {
            return KeyMultiUtils.convertToKeyedMulti(MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration), extractors(), this.configuration).flatMap(keyedMulti -> {
                return (Flow.Publisher) Objects.requireNonNull((Flow.Publisher) invoke(keyedMulti), ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
            }).map(Message::of);
        };
    }

    private void processMethodConsumingAPublisherOfKeyValueMessage() {
        this.function = multi -> {
            return KeyMultiUtils.convertToKeyedMultiMessage(MultiUtils.handlePreProcessingAcknowledgement(multi, this.configuration), extractors(), this.configuration).flatMap(keyedMulti -> {
                return (Flow.Publisher) Objects.requireNonNull((Flow.Publisher) invoke(keyedMulti), ProviderMessages.msg.methodReturnedNull(this.configuration.methodAsString()));
            });
        };
    }

    static {
        $assertionsDisabled = !StreamTransformerMediator.class.desiredAssertionStatus();
    }
}
