package com.urbancode.devilfish.services.jms;

import com.urbancode.commons.util.Exceptions;
import com.urbancode.commons.util.concurrent.ServiceThread;
import com.urbancode.devilfish.server.jms.JmsUtils;
import com.urbancode.devilfish.server.jms.ServiceReply;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/urbancode/devilfish/services/jms/ReceiveThread.class */
class ReceiveThread extends ServiceThread {
    private static final Logger log = Logger.getLogger(ReceiveThread.class);
    private final Session session;
    private final Destination source;
    private final Map<String, BlockingQueue<ServiceReply>> synchronizerMap;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReceiveThread(CountDownLatch countDownLatch, Session session, Destination destination, Map<String, BlockingQueue<ServiceReply>> map) {
        super(MessageService.class.getSimpleName() + "-" + ReceiveThread.class.getSimpleName(), countDownLatch);
        if (session == null) {
            throw new NullPointerException("session");
        }
        if (destination == null) {
            throw new NullPointerException("source");
        }
        if (map == null) {
            throw new NullPointerException("synchronizerMap");
        }
        this.session = session;
        this.source = destination;
        this.synchronizerMap = map;
    }

    protected void doRun() throws Exception {
        MessageConsumer createConsumer = this.session.createConsumer(this.source);
        while (!isInterrupted()) {
            try {
                try {
                    Message receive = createConsumer.receive();
                    if (receive != null) {
                        completeReceive(receive);
                    }
                } catch (Exception e) {
                    if (Exceptions.isCauseInterruption(e)) {
                        Thread.currentThread().interrupt();
                    } else {
                        log.error("Non-fatal errror in JMS receive: " + e, e);
                    }
                }
            } finally {
                JmsUtils.close(createConsumer);
            }
        }
    }

    protected void completeReceive(Message message) throws JMSException, InterruptedException {
        synchronizeReply(message.getJMSCorrelationID(), extractReply(message));
    }

    protected ServiceReply extractReply(Message message) throws JMSException {
        Logger logger = getLogger();
        ServiceReply serviceReply = null;
        if (message instanceof ObjectMessage) {
            Serializable object = ((ObjectMessage) message).getObject();
            if (object instanceof ServiceReply) {
                serviceReply = (ServiceReply) object;
            } else {
                logger.warn("Expected ServiceReply, but got " + object);
            }
        } else {
            logger.warn("Expected object message, but got " + message);
        }
        return serviceReply;
    }

    protected void synchronizeReply(String str, ServiceReply serviceReply) throws InterruptedException {
        BlockingQueue<ServiceReply> remove = this.synchronizerMap.remove(str);
        if (remove != null) {
            remove.offer(serviceReply);
        }
    }

    protected Logger getLogger() {
        return log;
    }
}
