package com.urbancode.devilfish.server.jms;

import com.urbancode.commons.util.Exceptions;
import com.urbancode.commons.util.concurrent.ServiceThread;
import com.urbancode.devilfish.server.Service;
import com.urbancode.devilfish.server.ServiceRegistry;
import java.io.Serializable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/urbancode/devilfish/server/jms/ReceiveThread.class */
class ReceiveThread extends ServiceThread {
    private static final Logger log = Logger.getLogger(ReceiveThread.class);
    private final ServiceRegistry registry;
    private final Session session;
    private final Destination source;
    private final String endpointId;
    private final CompletionService<RequestTask> requestCompletionQueue;
    private MessageConsumer consumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReceiveThread(CountDownLatch countDownLatch, ServiceRegistry serviceRegistry, Session session, Destination destination, String str, CompletionService<RequestTask> completionService) {
        super(JmsDevilfishServer.class.getSimpleName() + "-" + ReceiveThread.class.getSimpleName(), countDownLatch);
        if (serviceRegistry == null) {
            throw new NullPointerException("registry");
        }
        if (session == null) {
            throw new NullPointerException("session");
        }
        if (destination == null) {
            throw new NullPointerException("source");
        }
        if (str == null) {
            throw new NullPointerException("endpointId");
        }
        if (completionService == null) {
            throw new NullPointerException("completionService");
        }
        this.registry = serviceRegistry;
        this.session = session;
        this.source = destination;
        this.endpointId = str;
        this.requestCompletionQueue = completionService;
    }

    protected void doRun() throws Exception {
        this.consumer = this.session.createConsumer(this.source, createJmsMessageSelector(this.endpointId));
        while (!isInterrupted()) {
            try {
                try {
                    Message receive = this.consumer.receive();
                    if (receive != null) {
                        queueRequest(extractRequest(receive), receive.getJMSReplyTo(), receive.getJMSCorrelationID());
                    }
                } catch (Exception e) {
                    if (Exceptions.isCauseInterruption(e)) {
                        Thread.currentThread().interrupt();
                    } else {
                        log.error("Non-fatal error in JMS receive: " + e, e);
                    }
                }
            } finally {
                JmsUtils.close(this.consumer);
            }
        }
    }

    protected String createJmsMessageSelector(String str) {
        return "endpointId = '" + str + "'";
    }

    protected ServiceRequest extractRequest(Message message) throws JMSException {
        ServiceRequest serviceRequest = null;
        if (message instanceof ObjectMessage) {
            Serializable object = ((ObjectMessage) message).getObject();
            if (object instanceof ServiceRequest) {
                serviceRequest = (ServiceRequest) object;
            } else {
                log.warn("Expected ServiceRequest, but got " + object);
            }
        } else {
            log.warn("Expected object message, but got " + message);
        }
        return serviceRequest;
    }

    protected void queueRequest(ServiceRequest serviceRequest, Destination destination, String str) throws JMSException {
        if (destination == null && str != null) {
            log.warn("Message has a correlationID, by no replyTo");
        }
        if (destination != null && str == null) {
            log.warn("Message has a replyTo, but no correlationID");
        }
        Service service = this.registry.getService(serviceRequest.getServiceName());
        if (service != null) {
            queueTask(serviceRequest, service, destination, str);
        }
    }

    protected void queueTask(ServiceRequest serviceRequest, Service service, Destination destination, String str) {
        this.requestCompletionQueue.submit(new RequestTask(service, serviceRequest, destination, str));
    }

    protected Logger getLogger() {
        return log;
    }
}
