package io.smallrye.reactive.messaging.impl;

import io.smallrye.reactive.messaging.StreamRegistar;
import io.smallrye.reactive.messaging.StreamRegistry;
import io.smallrye.reactive.messaging.spi.PublisherFactory;
import io.smallrye.reactive.messaging.spi.SubscriberFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:io/smallrye/reactive/messaging/impl/ConfiguredStreamFactory.class */
public class ConfiguredStreamFactory implements StreamRegistar {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConfiguredStreamFactory.class);
    private static final String SOURCE_CONFIG_PREFIX = "smallrye.messaging.source";
    private static final String SINK_CONFIG_PREFIX = "smallrye.messaging.sink";
    private final List<PublisherFactory> sourceFactories;
    private final List<SubscriberFactory> sinkFactories;
    private final Config config;
    private final StreamRegistry registry;
    private final Map<String, Publisher<? extends Message>> sources;
    private final Map<String, Subscriber<? extends Message>> sinks;

    ConfiguredStreamFactory() {
        this.sources = new HashMap();
        this.sinks = new HashMap();
        this.sourceFactories = null;
        this.sinkFactories = null;
        this.config = null;
        this.registry = null;
    }

    @Inject
    public ConfiguredStreamFactory(@Any Instance<PublisherFactory> instance, @Any Instance<SubscriberFactory> instance2, Instance<Config> instance3, @Any Instance<StreamRegistry> instance4) {
        this.sources = new HashMap();
        this.sinks = new HashMap();
        this.registry = (StreamRegistry) instance4.get();
        if (instance3.isUnsatisfied()) {
            this.sourceFactories = Collections.emptyList();
            this.sinkFactories = Collections.emptyList();
            this.config = null;
        } else {
            this.sourceFactories = (List) instance.stream().collect(Collectors.toList());
            this.sinkFactories = (List) instance2.stream().collect(Collectors.toList());
            LOGGER.info("Found source types: {}", instance.stream().map((v0) -> {
                return v0.type();
            }).collect(Collectors.toList()));
            LOGGER.info("Found sink types: {}", instance2.stream().map((v0) -> {
                return v0.type();
            }).collect(Collectors.toList()));
            this.config = (Config) instance3.stream().findFirst().orElseThrow(() -> {
                return new IllegalStateException("Unable to retrieve the config");
            });
        }
    }

    static Map<String, Map<String, String>> extractConfigurationFor(String str, Config config) {
        Iterable propertyNames = config.getPropertyNames();
        HashMap hashMap = new HashMap();
        propertyNames.forEach(str2 -> {
            if (str2.startsWith(str)) {
                String substring = str2.substring(str.length() + 1);
                if (!substring.contains(".")) {
                    ((Map) hashMap.computeIfAbsent(substring, str2 -> {
                        return new HashMap();
                    })).put("name", config.getValue(str2, String.class));
                    return;
                }
                String substring2 = substring.substring(0, substring.indexOf(46));
                ((Map) hashMap.computeIfAbsent(substring2, str3 -> {
                    return new HashMap();
                })).put(substring.substring(substring.indexOf(46) + 1), config.getValue(str2, String.class));
            }
        });
        return hashMap;
    }

    @Override // io.smallrye.reactive.messaging.StreamRegistar
    public CompletionStage<Void> initialize() {
        if (this.config == null) {
            LOGGER.info("No MicroProfile Config found, skipping");
            return CompletableFuture.completedFuture(null);
        }
        LOGGER.info("Stream manager initializing...");
        Map<String, Map<String, String>> extractConfigurationFor = extractConfigurationFor(SOURCE_CONFIG_PREFIX, this.config);
        Map<String, Map<String, String>> extractConfigurationFor2 = extractConfigurationFor(SINK_CONFIG_PREFIX, this.config);
        ArrayList arrayList = new ArrayList();
        extractConfigurationFor.forEach((str, map) -> {
            arrayList.add(createSourceFromConfig(str, map));
        });
        extractConfigurationFor2.forEach((str2, map2) -> {
            arrayList.add(createSinkFromConfig(str2, map2));
        });
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.stream().map((v0) -> {
            return v0.toCompletableFuture();
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).whenComplete((r5, th) -> {
            if (th != null) {
                LOGGER.error("Unable to create the publisher or subscriber during initialization", th);
                return;
            }
            LOGGER.info("Publishers created during initialization: {}", this.sources.keySet());
            LOGGER.info("Subscribers created during initialization: {}", this.sinks.keySet());
            Map<String, Publisher<? extends Message>> map3 = this.sources;
            StreamRegistry streamRegistry = this.registry;
            streamRegistry.getClass();
            map3.forEach(streamRegistry::register);
            Map<String, Subscriber<? extends Message>> map4 = this.sinks;
            StreamRegistry streamRegistry2 = this.registry;
            streamRegistry2.getClass();
            map4.forEach(streamRegistry2::register);
        });
    }

    private CompletionStage createSourceFromConfig(String str, Map<String, String> map) {
        String str2 = (String) Optional.ofNullable(map.get("type")).map((v0) -> {
            return v0.toString();
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Invalid source, no type for " + str);
        });
        return this.sourceFactories.stream().filter(publisherFactory -> {
            return publisherFactory.type().getName().equalsIgnoreCase(str2);
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("Unknown source type for " + str + ", supported types are " + this.sourceFactories.stream().map(publisherFactory2 -> {
                return publisherFactory2.type().getName();
            }).collect(Collectors.toList()));
        }).createPublisher(map).thenAccept(publisher -> {
            this.sources.put(str, publisher);
        });
    }

    private CompletionStage createSinkFromConfig(String str, Map<String, String> map) {
        String str2 = (String) Optional.ofNullable(map.get("type")).map((v0) -> {
            return v0.toString();
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Invalid sink, no type for " + str);
        });
        return this.sinkFactories.stream().filter(subscriberFactory -> {
            return subscriberFactory.type().getName().equalsIgnoreCase(str2);
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException("Unknown sink type for " + str + ", supported types are " + this.sinkFactories.stream().map(subscriberFactory2 -> {
                return subscriberFactory2.type().getName();
            }).collect(Collectors.toList()));
        }).createSubscriber(map).thenAccept(subscriber -> {
            this.sinks.put(str, subscriber);
        });
    }
}
