package com.ibm.ws.sib.comms.client.proxyqueue.queue;

import com.ibm.ejs.ras.TraceNLS;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.sib.exception.SIErrorException;
import com.ibm.websphere.sib.exception.SIException;
import com.ibm.websphere.sib.exception.SIResourceException;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.sib.comms.CommsConstants;
import com.ibm.ws.sib.comms.client.ConsumerSessionProxy;
import com.ibm.ws.sib.comms.client.proxyqueue.AsynchConsumerProxyQueue;
import com.ibm.ws.sib.comms.client.proxyqueue.ProxyQueue;
import com.ibm.ws.sib.comms.client.proxyqueue.impl.ConversationHelper;
import com.ibm.ws.sib.comms.client.proxyqueue.impl.LockedMessageEnumerationImpl;
import com.ibm.ws.sib.comms.common.CommsByteBuffer;
import com.ibm.ws.sib.mfp.JsMessage;
import com.ibm.ws.sib.utils.ras.SibTr;
import com.ibm.wsspi.sib.core.exception.SIConnectionDroppedException;
import com.ibm.wsspi.sib.core.exception.SIConnectionLostException;
import java.util.Iterator;
import java.util.LinkedList;
import net.sf.ehcache.config.TimeoutBehaviorConfiguration;

/* loaded from: input_file:wlp/lib/com.ibm.ws.messaging.comms.client_1.0.17.jar:com/ibm/ws/sib/comms/client/proxyqueue/queue/AsynchConsumerQueue.class */
public class AsynchConsumerQueue implements Queue {
    private static String CLASS_NAME = AsynchConsumerQueue.class.getName();
    private static final TraceComponent tc = SibTr.register(AsynchConsumerQueue.class, "SIBCommunications", CommsConstants.MSG_BUNDLE);
    private static final TraceNLS nls = TraceNLS.getTraceNLS(CommsConstants.MSG_BUNDLE);
    private boolean ordered;
    private int batchesReady = 0;
    private LinkedList<QueueData> queue = new LinkedList<>();
    private long batchesReceived = 0;
    private long messagesReceived = 0;
    private Object concurrentAccessLock = new Object();

    public AsynchConsumerQueue(boolean z) {
        this.ordered = false;
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "<init>", "" + z);
        }
        this.ordered = z;
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "<init>");
        }
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public void put(QueueData queueData, short s) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "put", new Object[]{queueData, "" + ((int) s)});
        }
        synchronized (this) {
            if (!this.ordered && this.batchesReady == 1) {
                SIErrorException sIErrorException = new SIErrorException(nls.getFormattedMessage("ASYNC_BATCH_ALREADY_READY_SICO1031", (Object[]) null, (String) null));
                FFDCFilter.processException(sIErrorException, CLASS_NAME + ".put", CommsConstants.ASYNCHPQ_PUT_01, this);
                throw sIErrorException;
            }
            this.queue.addLast(queueData);
            if (!queueData.isChunkedMessage()) {
                notifyMessageReceived(queueData.isLastInBatch());
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            SibTr.debug(this, tc, "Put has completed: " + this);
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "put");
        }
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public void appendToLastMessage(CommsByteBuffer commsByteBuffer, boolean z) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "appendToLastMessage", new Object[]{commsByteBuffer, Boolean.valueOf(z)});
        }
        synchronized (this) {
            QueueData last = this.queue.getLast();
            last.addSlice(commsByteBuffer, z);
            if (z) {
                notifyMessageReceived(last.isLastInBatch());
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            SibTr.debug(this, tc, "Append has completed: " + this);
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "appendToLastMessage");
        }
    }

    private void notifyMessageReceived(boolean z) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "notifyMessageReceived", Boolean.valueOf(z));
        }
        if (z) {
            this.batchesReceived++;
            this.batchesReady++;
        }
        this.messagesReceived++;
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "notifyMessageReceived");
        }
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public synchronized boolean isEmpty(short s) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "isEmpty", "sessionId=" + ((int) s));
        }
        boolean z = true;
        if (!isQueueEmpty() && this.queue.getFirst().getProxyQueue().getId() == s && this.batchesReady > 0) {
            z = false;
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "isEmpty", "" + z);
        }
        return z;
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public synchronized boolean isQueueEmpty() {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "isQueueEmpty");
        }
        boolean isEmpty = this.queue.isEmpty();
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "isQueueEmpty", "" + isEmpty);
        }
        return isEmpty;
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public synchronized void purge(short s) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "purge", "" + ((int) s));
        }
        int[] iArr = new int[this.queue.size()];
        int i = 0;
        boolean z = false;
        for (int i2 = 0; i2 < this.queue.size(); i2++) {
            QueueData queueData = this.queue.get(i2);
            if (queueData.getProxyQueue().getId() == s) {
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    SibTr.debug(this, tc, "Deleting item: " + queueData);
                }
                int i3 = i;
                i++;
                iArr[i3] = i2;
                z = queueData.isLastInBatch();
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            SibTr.debug(this, tc, "Removing " + i + " entries");
        }
        for (int i4 = i - 1; i4 >= 0; i4--) {
            this.queue.remove(iArr[i4]);
        }
        if (z) {
            this.batchesReady--;
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            SibTr.debug(this, tc, "Purge completed. Queue is now: " + this);
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "purge");
        }
    }

    public String toString() {
        return "AsyncQueue@" + Integer.toHexString(hashCode()) + ":- CurDepth: " + this.queue.size() + ", ordered: " + this.ordered + ", messagesReceived: " + this.messagesReceived + ", batchesReceived: " + this.batchesReceived + ", batchesReady: " + this.batchesReady;
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public Object getConcurrentAccessLock() {
        return this.concurrentAccessLock;
    }

    private synchronized JsMessage[] getBatch(int i, ConversationHelper conversationHelper) throws SIResourceException, SIConnectionDroppedException {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "getBatch", new Object[]{Integer.valueOf(i), conversationHelper});
        }
        if (this.batchesReady == 0) {
            if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                SibTr.debug(this, tc, "No batches are ready!!");
            }
            SIErrorException sIErrorException = new SIErrorException(nls.getFormattedMessage("ASYNC_BATCH_NOT_READY_SICO1033", (Object[]) null, (String) null));
            FFDCFilter.processException(sIErrorException, CLASS_NAME + ".getBatch", CommsConstants.ASYNCHPQ_GETBATCH_01, this);
            throw sIErrorException;
        }
        short s = 0;
        JsMessage[] jsMessageArr = new JsMessage[i];
        for (int i2 = 0; i2 < jsMessageArr.length; i2++) {
            QueueData removeFirst = this.queue.removeFirst();
            s = removeFirst.getProxyQueue().getId();
            jsMessageArr[i2] = (JsMessage) removeFirst.getMessage();
            if (removeFirst.isLastInBatch()) {
                break;
            }
        }
        this.batchesReady--;
        if (!isQueueEmpty()) {
            AsynchConsumerProxyQueue asynchConsumerProxyQueue = (AsynchConsumerProxyQueue) this.queue.get(0).getProxyQueue();
            if (asynchConsumerProxyQueue.getId() != s) {
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    SibTr.debug(tc, "Next data on the queue is for a different session:", asynchConsumerProxyQueue);
                }
                asynchConsumerProxyQueue.nudge();
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(tc, "getBatch", jsMessageArr);
        }
        return jsMessageArr;
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public void deliverBatch(int i, short s, ConversationHelper conversationHelper) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "deliverBatch", new Object[]{"" + i, "" + ((int) s), conversationHelper});
        }
        AsynchConsumerProxyQueue asynchConsumerProxyQueue = (AsynchConsumerProxyQueue) this.queue.get(0).getProxyQueue();
        asynchConsumerProxyQueue.setAsynchConsumerThread(Thread.currentThread());
        ConsumerSessionProxy consumerSessionProxy = (ConsumerSessionProxy) asynchConsumerProxyQueue.getDestinationSessionProxy();
        consumerSessionProxy.resetCallbackThreadState();
        boolean z = false;
        try {
            LockedMessageEnumerationImpl lockedMessageEnumerationImpl = new LockedMessageEnumerationImpl(asynchConsumerProxyQueue, this, getBatch(i, conversationHelper), Thread.currentThread(), asynchConsumerProxyQueue.getLMEOperationMonitor());
            if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                SibTr.debug(this, tc, " ** About to call user callback **");
            }
            asynchConsumerProxyQueue.getAsynchConsumerCallback().consumeMessages(lockedMessageEnumerationImpl);
            if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                SibTr.debug(this, tc, " ** User callback has returned  **");
            }
            int remainingMessageCount = lockedMessageEnumerationImpl.getRemainingMessageCount();
            if (remainingMessageCount != 0) {
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    SibTr.debug(this, tc, "There are still " + remainingMessageCount + " locked messages! - Unlocking them");
                }
                lockedMessageEnumerationImpl.unlockUnseen();
            }
        } catch (Throwable th) {
            FFDCFilter.processException(th, CLASS_NAME + ".deliverBatch", CommsConstants.ASYNCHPQ_DELIVER_01, this);
            if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                SibTr.debug(this, tc, "exception thrown");
            }
            if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                SibTr.event(tc, TimeoutBehaviorConfiguration.EXCEPTION_TYPE_NAME, th);
            }
            asynchConsumerProxyQueue.setAsynchConsumerThread(null);
            consumerSessionProxy.deliverAsyncException(th);
            z = true;
        }
        asynchConsumerProxyQueue.setAsynchConsumerThread(null);
        this.concurrentAccessLock.notifyAll();
        try {
            if (consumerSessionProxy.performInCallbackActions()) {
                if (!z && asynchConsumerProxyQueue.getStarted() && !asynchConsumerProxyQueue.isStopping()) {
                    conversationHelper.requestNextMessageBatch();
                } else if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    SibTr.debug(this, tc, "Not requesting next message batch: unlockAllMessages=" + z + "isStarted=" + asynchConsumerProxyQueue.getStarted() + "isStopping=" + asynchConsumerProxyQueue.isStopping());
                }
            }
        } catch (SIException e) {
            FFDCFilter.processException(e, CLASS_NAME + ".deliverBatch", CommsConstants.ASYNCHPQ_DELIVER_04, this);
            consumerSessionProxy.deliverAsyncException(e);
        }
        if (z) {
            try {
                consumerSessionProxy.unlockAll();
            } catch (SIException e2) {
                FFDCFilter.processException(e2, CLASS_NAME + ".deliverBatch", CommsConstants.ASYNCHPQ_DELIVER_03, this);
                if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                    SibTr.debug(this, tc, "exception thrown");
                }
                if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                    SibTr.event(tc, TimeoutBehaviorConfiguration.EXCEPTION_TYPE_NAME, e2);
                }
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "deliverBatch");
        }
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public JsMessage get(short s) throws SIResourceException, SIConnectionLostException, SIConnectionDroppedException {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "get", Short.valueOf(s));
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            SibTr.debug(this, tc, "Method not permitted for async queues");
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "get", "SIErrorException");
        }
        throw new SIErrorException();
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public void unlockAll() {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "unlockAll");
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "unlockAll");
        }
    }

    @Override // com.ibm.ws.sib.comms.client.proxyqueue.queue.Queue
    public void waitUntilEmpty(short s) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "waitUntilEmpty", "sessionId=" + ((int) s));
        }
        synchronized (this.concurrentAccessLock) {
            while (!isQueueEmptyForSessionId(s)) {
                try {
                    if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                        SibTr.debug(this, tc, "waiting");
                    }
                    this.concurrentAccessLock.wait();
                } catch (InterruptedException e) {
                    if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                        SibTr.exception((Object) this, tc, (Exception) e);
                    }
                }
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "waitUntilEmpty");
        }
    }

    private synchronized boolean isQueueEmptyForSessionId(short s) {
        if (tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "isQueueEmptyForSessionId", "sessionId=" + ((int) s));
        }
        boolean z = true;
        Iterator<QueueData> it = this.queue.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ProxyQueue proxyQueue = it.next().getProxyQueue();
            if (proxyQueue.getId() == s) {
                if (tc.isDebugEnabled()) {
                    SibTr.debug(this, tc, "Found match: ", proxyQueue);
                }
                z = false;
            }
        }
        if (tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "isQueueEmptyForSessionId", Boolean.valueOf(z));
        }
        return z;
    }

    static {
        if (TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
            SibTr.debug(tc, "Source info: @(#) SIB/ws/code/sib.comms.client.impl/src/com/ibm/ws/sib/comms/client/proxyqueue/queue/AsynchConsumerQueue.java, SIB.comms, WASX.SIB, uu1215.01 1.44");
        }
    }
}
