package io.smallrye.reactive.messaging;

import io.smallrye.reactive.messaging.MediatorConfiguration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.lang3.ClassUtils;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.CompletionSubscriber;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/smallrye/reactive/messaging/SubscriberMediator.class */
public class SubscriberMediator extends AbstractMediator {
    private PublisherBuilder<Message> source;
    private SubscriberBuilder subscriber;
    private AtomicReference<Subscription> subscription;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SubscriberMediator(MediatorConfiguration mediatorConfiguration) {
        super(mediatorConfiguration);
        this.subscription = new AtomicReference<>();
        if (mediatorConfiguration.shape() != Shape.SUBSCRIBER) {
            throw new IllegalArgumentException("Expected a Subscriber shape, received a " + mediatorConfiguration.shape());
        }
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public void initialize(Object obj) {
        super.initialize(obj);
        switch (this.configuration.consumption()) {
            case STREAM_OF_MESSAGE:
            case STREAM_OF_PAYLOAD:
                processMethodReturningASubscriber();
                break;
            case MESSAGE:
            case PAYLOAD:
                if (!ClassUtils.isAssignable(this.configuration.getMethod().getReturnType(), CompletionStage.class)) {
                    processMethodReturningVoid();
                    break;
                } else {
                    processMethodReturningACompletionStage();
                    break;
                }
            default:
                throw new IllegalArgumentException("Unexpected consumption type: " + this.configuration.consumption());
        }
        if (!$assertionsDisabled && this.subscriber == null) {
            throw new AssertionError();
        }
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public SubscriberBuilder<Message, Void> getComputedSubscriber() {
        return this.subscriber;
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public boolean isConnected() {
        return this.source != null;
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public void connectToUpstream(PublisherBuilder<? extends Message> publisherBuilder) {
        this.source = publisherBuilder;
    }

    @Override // io.smallrye.reactive.messaging.AbstractMediator
    public void run() {
        if (!$assertionsDisabled && this.source == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && this.subscriber == null) {
            throw new AssertionError();
        }
        final Logger logger = LoggerFactory.getLogger(this.configuration.methodAsString());
        final AtomicReference atomicReference = new AtomicReference();
        final CompletionSubscriber build = this.subscriber.build();
        this.source.to(new Subscriber() { // from class: io.smallrye.reactive.messaging.SubscriberMediator.1
            public void onSubscribe(Subscription subscription) {
                SubscriberMediator.this.subscription.set(subscription);
                build.onSubscribe(subscription);
            }

            public void onNext(Object obj) {
                build.onNext(obj);
            }

            public void onError(Throwable th) {
                logger.error("Error caught during the stream processing", th);
                atomicReference.set(th);
                build.onError(th);
            }

            public void onComplete() {
                build.onComplete();
            }
        }).run();
        Throwable th = (Throwable) atomicReference.get();
        if (th != null) {
            throw new WeavingException(this.configuration.getIncoming(), th);
        }
    }

    private void processMethodReturningVoid() {
        this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(managePreProcessingAck()).map(message -> {
            invoke(message.getPayload());
            return message;
        }).flatMapCompletionStage(message2 -> {
            return this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING ? getAckOrCompletion(message2) : CompletableFuture.completedFuture(message2);
        }).ignore();
    }

    private void processMethodReturningACompletionStage() {
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.PAYLOAD) {
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(managePreProcessingAck()).flatMapCompletionStage(message -> {
                return ((CompletionStage) invoke(message.getPayload())).thenApply(obj -> {
                    return message;
                });
            }).flatMapCompletionStage(message2 -> {
                return this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING ? getAckOrCompletion(message2) : CompletableFuture.completedFuture(message2);
            }).ignore();
        } else {
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(managePreProcessingAck()).flatMapCompletionStage(message3 -> {
                return ((CompletionStage) invoke(message3)).thenApply(obj -> {
                    return message3;
                });
            }).flatMapCompletionStage(message4 -> {
                return this.configuration.getAcknowledgment() == Acknowledgment.Strategy.POST_PROCESSING ? getAckOrCompletion(message4) : CompletableFuture.completedFuture(message4);
            }).ignore();
        }
    }

    private void processMethodReturningASubscriber() {
        Object invoke = invoke(new Object[0]);
        if (!(invoke instanceof Subscriber) && !(invoke instanceof SubscriberBuilder)) {
            throw new IllegalStateException("Invalid return type: " + invoke + " - expected a Subscriber or a SubscriberBuilder");
        }
        Subscriber build = invoke instanceof Subscriber ? (Subscriber) invoke : ((SubscriberBuilder) invoke).build();
        if (this.configuration.consumption() == MediatorConfiguration.Consumption.STREAM_OF_PAYLOAD) {
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(managePreProcessingAck()).via(new SubscriberWrapper(build, obj -> {
                return ((Message) obj).getPayload();
            })).ignore();
        } else {
            this.subscriber = ReactiveStreams.builder().flatMapCompletionStage(managePreProcessingAck()).via(new SubscriberWrapper(build, Function.identity())).ignore();
        }
    }

    static {
        $assertionsDisabled = !SubscriberMediator.class.desiredAssertionStatus();
    }
}
