package com.ibm.ws.sib.comms.server.clientsupport;

import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.sib.exception.SIException;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.sib.comms.CommsConstants;
import com.ibm.ws.sib.comms.common.CommsUtils;
import com.ibm.ws.sib.comms.server.ConversationState;
import com.ibm.ws.sib.jfapchannel.JFapChannelConstants;
import com.ibm.ws.sib.utils.ras.SibTr;
import com.ibm.wsspi.sib.core.AsynchConsumerCallback;
import com.ibm.wsspi.sib.core.LockedMessageEnumeration;
import com.ibm.wsspi.sib.core.SIBusMessage;

/* loaded from: input_file:wlp/lib/com.ibm.ws.messaging.comms.server_1.0.16.jar:com/ibm/ws/sib/comms/server/clientsupport/CATAsynchReadAheadReader.class */
public class CATAsynchReadAheadReader implements AsynchConsumerCallback {
    private static String CLASS_NAME = CATAsynchReadAheadReader.class.getName();
    private static final TraceComponent tc = SibTr.register(CATAsynchReadAheadReader.class, "SIBCommunications", CommsConstants.MSG_BUNDLE);
    final CATProxyConsumer consumerSession;
    final CATMainConsumer mainConsumer;

    public CATAsynchReadAheadReader(CATProxyConsumer cATProxyConsumer, CATMainConsumer cATMainConsumer) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "<init>", new Object[]{cATProxyConsumer, cATMainConsumer});
        }
        this.consumerSession = cATProxyConsumer;
        this.mainConsumer = cATMainConsumer;
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(tc, "<init>");
        }
    }

    @Override // com.ibm.wsspi.sib.core.AsynchConsumerCallback
    public void consumeMessages(LockedMessageEnumeration lockedMessageEnumeration) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "consumeMessages", lockedMessageEnumeration);
        }
        if (this.mainConsumer.getConversation().getConnectionReference().isClosed()) {
            if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                SibTr.debug(this, tc, "The connection is closed so we shouldn't consume anymore messages. Consumer Session should be closed soon");
            }
            stopConsumer();
        } else {
            try {
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    SibTr.debug(this, tc, "Getting next locked message");
                }
                SIBusMessage nextLocked = lockedMessageEnumeration.nextLocked();
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    SibTr.debug(this, tc, "Received message", nextLocked);
                }
                int sendMessage = this.consumerSession.sendMessage(nextLocked);
                if (!CommsUtils.isRecoverable(nextLocked, this.consumerSession.getUnrecoverableReliability())) {
                    if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                        SibTr.debug(this, tc, "Deleting the message");
                    }
                    lockedMessageEnumeration.deleteCurrent(null);
                }
                this.consumerSession.setLowestPriority(JFapChannelConstants.getJFAPPriority(nextLocked.getPriority()));
                synchronized (this.consumerSession) {
                    int sentBytes = this.consumerSession.getSentBytes() + sendMessage;
                    this.consumerSession.setSentBytes(sentBytes);
                    if (sendMessage == 0 || sentBytes >= this.consumerSession.getRequestedBytes()) {
                        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                            SibTr.debug(this, tc, "Stopping consumer session (sent bytes >= requested bytes || msgLen = 0)");
                        }
                        stopConsumer();
                    }
                }
            } catch (Throwable th) {
                ConversationState conversationState = (ConversationState) this.consumerSession.getConversation().getAttachment();
                if (!(th instanceof SIException) || !conversationState.hasMETerminated()) {
                    FFDCFilter.processException(th, CLASS_NAME + ".consumeMessages", CommsConstants.CATASYNCHRHREADER_CONSUME_MSGS_01, this);
                }
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    SibTr.debug(tc, th.getMessage(), th);
                }
                StaticCATHelper.sendAsyncExceptionToClient(th, CommsConstants.CATASYNCHRHREADER_CONSUME_MSGS_01, this.consumerSession.getClientSessionId(), this.consumerSession.getConversation(), 0);
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "consumeMessages");
        }
    }

    public void stopConsumer() {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "stopConsumer");
        }
        try {
            synchronized (this.consumerSession) {
                this.consumerSession.getConsumerSession().stop();
                this.consumerSession.started = false;
            }
        } catch (Throwable th) {
            FFDCFilter.processException(th, CLASS_NAME + ".consumeMessages", CommsConstants.CATASYNCHRHREADER_CONSUME_MSGS_02, this);
            if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                SibTr.debug(this, tc, "Unable to stop consumer session due to Throwable: " + th);
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "stopConsumer");
        }
    }

    static {
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            SibTr.debug(tc, "Source info: @(#)SIB/ws/code/sib.comms.server.impl/src/com/ibm/ws/sib/comms/server/clientsupport/CATAsynchReadAheadReader.java, SIB.comms, WASX.SIB, aa1225.01 1.38");
        }
    }
}
