package com.ibm.ws.sib.msgstore.persistence.dispatcher;

import com.ibm.websphere.ras.TraceComponent;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.sib.admin.JsMessagingEngine;
import com.ibm.ws.sib.msgstore.MessageStoreConstants;
import com.ibm.ws.sib.msgstore.PersistenceException;
import com.ibm.ws.sib.msgstore.PersistentDataEncodingException;
import com.ibm.ws.sib.msgstore.SevereMessageStoreException;
import com.ibm.ws.sib.msgstore.SeverePersistenceException;
import com.ibm.ws.sib.msgstore.cache.links.AbstractItemLink;
import com.ibm.ws.sib.msgstore.impl.MessageStoreImpl;
import com.ibm.ws.sib.msgstore.persistence.BatchingContext;
import com.ibm.ws.sib.msgstore.persistence.BatchingContextFactory;
import com.ibm.ws.sib.msgstore.persistence.impl.Tuple;
import com.ibm.ws.sib.msgstore.task.Task;
import com.ibm.ws.sib.msgstore.transactions.impl.PersistentTransaction;
import com.ibm.ws.sib.msgstore.transactions.impl.TransactionState;
import com.ibm.ws.sib.utils.ras.SibTr;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import net.sf.ehcache.concurrent.Sync;
import net.sf.ehcache.constructs.CacheDecoratorFactory;

/* loaded from: input_file:wlp/lib/com.ibm.ws.messaging.msgstore_1.0.20.jar:com/ibm/ws/sib/msgstore/persistence/dispatcher/SpillDispatcher.class */
public class SpillDispatcher extends DispatcherBase {
    private static TraceComponent tc = SibTr.register(SpillDispatcher.class, "SIBMessageStore", MessageStoreConstants.MSG_BUNDLE);
    private long _minBytesPerBatch;
    private long _maxBytesPerBatch;
    private int _maxTasksPerBatch;
    private long _maxDispatchedBytesPerThread;
    public int _writesToResetErrorState;
    private int _maxThreads;
    private int _numThreads;
    private MessageStoreImpl _msi;
    private Thread[] _threads;
    private SpillDispatcherThread[] _workers;
    private BatchingContextFactory _bcfactory;
    private boolean _stopRequested;
    private boolean _running;
    private int _threadWriteErrorsOutstanding;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:wlp/lib/com.ibm.ws.messaging.msgstore_1.0.20.jar:com/ibm/ws/sib/msgstore/persistence/dispatcher/SpillDispatcher$DispatchingLock.class */
    public static final class DispatchingLock {
        private DispatchingLock() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:wlp/lib/com.ibm.ws.messaging.msgstore_1.0.20.jar:com/ibm/ws/sib/msgstore/persistence/dispatcher/SpillDispatcher$QueueElement.class */
    public static class QueueElement {
        private Task _task;
        private int _dataSize;
        private DispatchNotifier _waiter;
        private boolean _notified;
        private boolean _cancelled;
        private boolean _batched;
        private boolean _isAdd;
        private boolean _isDelete;

        public QueueElement(Task task, DispatchNotifier dispatchNotifier) {
            this._task = task;
            this._dataSize = task.getPersistableInMemorySizeApproximation(TransactionState.STATE_COMMITTED);
            this._waiter = dispatchNotifier;
            this._isAdd = this._task.isCreateOfPersistentRepresentation();
            this._isDelete = this._task.isDeleteOfPersistentRepresentation();
        }

        public Task getTask() {
            return this._task;
        }

        public int getDataSize() {
            return this._dataSize;
        }

        public void setCancelled() {
            this._cancelled = true;
        }

        public boolean isCancelled() {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "isCancelled=" + this._cancelled);
            }
            return this._cancelled;
        }

        public void setBatched(boolean z) {
            this._batched = z;
        }

        public boolean isBatched() {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "isBatched=" + this._batched);
            }
            return this._batched;
        }

        public final boolean isAdd() {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "isAdd=" + this._isAdd);
            }
            return this._isAdd;
        }

        public final boolean isUpdate() {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "isUpdate=" + ((this._isAdd || this._isDelete) ? false : true));
            }
            return (this._isAdd || this._isDelete) ? false : true;
        }

        public final boolean isDelete() {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "isDelete=" + this._isDelete);
            }
            return this._isDelete;
        }

        public void notifyDispatch() {
            if (this._notified) {
                return;
            }
            this._notified = true;
            this._waiter.notifyDispatch();
        }
    }

    /* loaded from: input_file:wlp/lib/com.ibm.ws.messaging.msgstore_1.0.20.jar:com/ibm/ws/sib/msgstore/persistence/dispatcher/SpillDispatcher$SpillDispatcherThread.class */
    private class SpillDispatcherThread implements Runnable {
        private int _threadNum;
        private String _threadName;
        private LinkedList _waitingQueue;
        private LinkedList _dispatchAddingQueue;
        private LinkedList _dispatchUpdateQueue;
        private LinkedList _dispatchRemoveQueue;
        private long _dispatchedBytes;
        private long _writeErrorRetryDelay;
        private boolean _threadActive;
        private final DispatchingLock _dispatchingLock = new DispatchingLock();
        private boolean _writeErrorOccurred = false;
        private int _goodWritesSinceLastError = 0;
        private int _consecutiveWriteErrors = 0;
        private boolean _isContributingToThreadWriteErrors = false;
        private boolean _interruptible = false;
        private boolean _notifyOutstanding = false;

        SpillDispatcherThread(int i, String str) {
            this._threadNum = i;
            this._threadName = str;
            SpillDispatcher.access$408(SpillDispatcher.this);
            this._waitingQueue = new LinkedList();
            this._threadActive = true;
            this._dispatchAddingQueue = new LinkedList();
            this._dispatchUpdateQueue = new LinkedList();
            this._dispatchRemoveQueue = new LinkedList();
        }

        @Override // java.lang.Runnable
        public void run() {
            if (SpillDispatcher.this._msi != null && SpillDispatcher.this._msi._getMessagingEngine() != null) {
                SibTr.push(SpillDispatcher.this._msi._getMessagingEngine());
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "run");
            }
            LinkedList linkedList = null;
            boolean z = false;
            while (!z) {
                try {
                    try {
                        boolean z2 = false;
                        synchronized (SpillDispatcher.this) {
                            if (SpillDispatcher.this._stopRequested) {
                                z = true;
                            }
                        }
                        while (!z2 && !z) {
                            synchronized (this._dispatchingLock) {
                                promoteWaiters();
                                linkedList = buildBatch();
                                z2 = !linkedList.isEmpty();
                                if (!z2) {
                                    this._threadActive = false;
                                    try {
                                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                            SibTr.debug(this, SpillDispatcher.tc, "Spill dispatcher started indefinite wait", this);
                                        }
                                        this._dispatchingLock.wait(0L);
                                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                            SibTr.debug(this, SpillDispatcher.tc, "Spill dispatcher completed wait", this);
                                        }
                                        this._threadActive = true;
                                        this._notifyOutstanding = false;
                                    } catch (InterruptedException e) {
                                        this._threadActive = true;
                                        this._notifyOutstanding = false;
                                    } catch (Throwable th) {
                                        this._threadActive = true;
                                        this._notifyOutstanding = false;
                                        throw th;
                                    }
                                }
                            }
                            synchronized (SpillDispatcher.this) {
                                if (SpillDispatcher.this._stopRequested) {
                                    z = true;
                                }
                            }
                        }
                        if (!z && this._writeErrorOccurred && this._consecutiveWriteErrors > 1) {
                            synchronized (this._dispatchingLock) {
                                this._interruptible = true;
                            }
                            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                SibTr.debug(this, SpillDispatcher.tc, "Dispatcher started retry wait of " + this._writeErrorRetryDelay + " ms", this);
                            }
                            try {
                                try {
                                    Thread.sleep(this._writeErrorRetryDelay);
                                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                        SibTr.debug(this, SpillDispatcher.tc, "Dispatcher completed wait", this);
                                    }
                                    synchronized (this._dispatchingLock) {
                                        this._interruptible = false;
                                    }
                                } catch (Throwable th2) {
                                    synchronized (this._dispatchingLock) {
                                        this._interruptible = false;
                                        throw th2;
                                    }
                                }
                            } catch (InterruptedException e2) {
                                if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                    SibTr.debug(this, SpillDispatcher.tc, "Dispatcher interrupted during wait", this);
                                }
                                synchronized (this._dispatchingLock) {
                                    this._interruptible = false;
                                }
                            }
                        }
                        if (!z && z2) {
                            try {
                                if (writeBatch(linkedList)) {
                                    batchCompleted(linkedList);
                                    z2 = false;
                                    if (this._writeErrorOccurred) {
                                        this._consecutiveWriteErrors = 0;
                                        this._goodWritesSinceLastError++;
                                        if (this._goodWritesSinceLastError > SpillDispatcher.this._writesToResetErrorState) {
                                            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                                SibTr.debug(this, SpillDispatcher.tc, "Good write threshold passed. Re-enabling batch mode.");
                                            }
                                            this._writeErrorOccurred = false;
                                            if (this._isContributingToThreadWriteErrors) {
                                                this._isContributingToThreadWriteErrors = false;
                                                SpillDispatcher.this.threadWriteErrorCleared(this._threadNum);
                                            }
                                        }
                                    }
                                }
                                if (z2) {
                                    handleWriteError(1 == 0, linkedList.size() - batchCancelled(linkedList));
                                }
                            } catch (Throwable th3) {
                                if (z2) {
                                    handleWriteError(0 == 0, linkedList.size() - batchCancelled(linkedList));
                                }
                                throw th3;
                            }
                        }
                    } catch (Throwable th4) {
                        synchronized (SpillDispatcher.this) {
                            SpillDispatcher.access$410(SpillDispatcher.this);
                            if (!SpillDispatcher.this._stopRequested && SpillDispatcher.this._msi != null) {
                                SpillDispatcher.this._msi.reportLocalError();
                            }
                            if (SpillDispatcher.this._numThreads == 0) {
                                SpillDispatcher.this._running = false;
                            }
                            throw th4;
                        }
                    }
                } catch (Throwable th5) {
                    FFDCFilter.processException(th5, "com.ibm.ws.sib.msgstore.persistence.dispatcher.SpillDispatcherThread.run", "1:769:1.48", this);
                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEventEnabled()) {
                        SibTr.event(this, SpillDispatcher.tc, "Unexpected exception caught in SpillDispatcher thread!", th5);
                    }
                    synchronized (SpillDispatcher.this) {
                        SpillDispatcher.access$410(SpillDispatcher.this);
                        if (!SpillDispatcher.this._stopRequested && SpillDispatcher.this._msi != null) {
                            SpillDispatcher.this._msi.reportLocalError();
                        }
                        if (SpillDispatcher.this._numThreads == 0) {
                            SpillDispatcher.this._running = false;
                        }
                    }
                }
            }
            synchronized (SpillDispatcher.this) {
                SpillDispatcher.access$410(SpillDispatcher.this);
                if (!SpillDispatcher.this._stopRequested && SpillDispatcher.this._msi != null) {
                    SpillDispatcher.this._msi.reportLocalError();
                }
                if (SpillDispatcher.this._numThreads == 0) {
                    SpillDispatcher.this._running = false;
                }
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "run");
            }
            if (SpillDispatcher.this._msi == null || SpillDispatcher.this._msi._getMessagingEngine() == null) {
                return;
            }
            SibTr.pop();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cleanup() {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "cleanup");
            }
            synchronized (this._dispatchingLock) {
                Iterator it = this._waitingQueue.iterator();
                while (it.hasNext()) {
                    ((QueueElement) it.next()).notifyDispatch();
                }
                this._waitingQueue.clear();
                if (isInterruptible()) {
                    SpillDispatcher.this._threads[this._threadNum].interrupt();
                }
                this._dispatchingLock.notify();
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "cleanup");
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean addTask(Task task, long j, DispatchNotifier dispatchNotifier) throws SevereMessageStoreException {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "addTask", new Object[]{task, Long.valueOf(j), dispatchNotifier});
            }
            boolean z = false;
            QueueElement queueElement = new QueueElement(task, dispatchNotifier);
            int dataSize = queueElement.getDataSize();
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "bytesForTask=" + dataSize);
            }
            int i = 0;
            int i2 = 0;
            synchronized (this._dispatchingLock) {
                if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                    SibTr.debug(this, SpillDispatcher.tc, "_dispatchAddingQueue.size()=" + this._dispatchAddingQueue.size());
                    SibTr.debug(this, SpillDispatcher.tc, "_dispatchUpdateQueue.size()=" + this._dispatchUpdateQueue.size());
                    SibTr.debug(this, SpillDispatcher.tc, "_dispatchRemoveQueue.size()=" + this._dispatchRemoveQueue.size());
                    SibTr.debug(this, SpillDispatcher.tc, "_waitingQueue.size()=" + this._waitingQueue.size());
                    SibTr.debug(this, SpillDispatcher.tc, "_dispatchedBytes=" + this._dispatchedBytes);
                    SibTr.debug(this, SpillDispatcher.tc, "_maxDispatchedBytesPerThread=" + SpillDispatcher.this._maxDispatchedBytesPerThread);
                }
                boolean z2 = false;
                if (queueElement.isDelete()) {
                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                        SibTr.debug(this, SpillDispatcher.tc, "New task IS a delete. ATTEMPTING cancellation.");
                    }
                    boolean z3 = false;
                    boolean z4 = true;
                    if (((Tuple) task.getPersistable()).persistableOperationsOutstanding() == 1) {
                        z4 = false;
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "No outstanding operations. STOPPING cancellation.");
                        }
                    }
                    if (z4 && !((Tuple) task.getPersistable()).persistableRepresentationWasCreated()) {
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "Searching the ADDING queue.");
                        }
                        boolean z5 = false;
                        Iterator it = this._dispatchAddingQueue.iterator();
                        while (!z5 && !z3 && it.hasNext()) {
                            QueueElement queueElement2 = (QueueElement) it.next();
                            Task task2 = queueElement2.getTask();
                            AbstractItemLink link = task2.getLink();
                            if (!queueElement2.isBatched()) {
                                synchronized (queueElement2) {
                                    if (queueElement2.isBatched()) {
                                        if (task.getLink() == link) {
                                            z5 = true;
                                        }
                                    } else if (link != null && link == task.getLink() && link.isRemoving() && !((Tuple) task2.getPersistable()).persistableRepresentationWasCreated()) {
                                        it.remove();
                                        int dataSize2 = queueElement2.getDataSize();
                                        this._dispatchedBytes -= dataSize2;
                                        ((Tuple) task2.getPersistable()).persistableOperationCancelled();
                                        if (((Tuple) task.getPersistable()).persistableOperationsOutstanding() == 1) {
                                            z4 = false;
                                        }
                                        queueElement2.setCancelled();
                                        z3 = true;
                                        i++;
                                        i2 += dataSize2;
                                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                            SibTr.debug(this, SpillDispatcher.tc, "Task matched and removed from ADDING queue: New Task: " + task + ", Cancelled Task: " + task2);
                                        }
                                    }
                                }
                            } else if (task.getLink() == link) {
                                z5 = true;
                            }
                        }
                    }
                    if (z4 && !this._dispatchUpdateQueue.isEmpty()) {
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "Searching the UPDATE queue.");
                        }
                        Iterator it2 = this._dispatchUpdateQueue.iterator();
                        while (z4 && it2.hasNext()) {
                            QueueElement queueElement3 = (QueueElement) it2.next();
                            Task task3 = queueElement3.getTask();
                            AbstractItemLink link2 = task3.getLink();
                            if (!queueElement3.isBatched()) {
                                synchronized (queueElement3) {
                                    if (queueElement3.isBatched()) {
                                        if (task.getLink() == link2 && z3) {
                                            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEventEnabled()) {
                                                SibTr.event(this, SpillDispatcher.tc, "Update task batched after cancellation of add!");
                                            }
                                            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEventEnabled()) {
                                                SibTr.exit(this, SpillDispatcher.tc, "addTask");
                                            }
                                            throw new SevereMessageStoreException("Update task batched after cancellation of add!");
                                        }
                                    } else if (link2 != null && link2 == task.getLink() && link2.isRemoving()) {
                                        it2.remove();
                                        int dataSize3 = queueElement3.getDataSize();
                                        this._dispatchedBytes -= dataSize3;
                                        ((Tuple) task3.getPersistable()).persistableOperationCancelled();
                                        if (((Tuple) task.getPersistable()).persistableOperationsOutstanding() == 1) {
                                            z4 = false;
                                        }
                                        queueElement3.setCancelled();
                                        i++;
                                        i2 += dataSize3;
                                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                            SibTr.debug(this, SpillDispatcher.tc, "Task matched and removed from UPDATE queue: New Task: " + task + ", Cancelled Task: " + task3);
                                        }
                                    }
                                }
                            } else if (task.getLink() == link2 && z3) {
                                if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEventEnabled()) {
                                    SibTr.event(this, SpillDispatcher.tc, "Update task batched after cancellation of add!");
                                }
                                if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEventEnabled()) {
                                    SibTr.exit(this, SpillDispatcher.tc, "addTask");
                                }
                                throw new SevereMessageStoreException("Update task batched after cancellation of add!");
                            }
                        }
                    }
                    if (z4 && !this._waitingQueue.isEmpty()) {
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "Searching the WAITING queue.");
                        }
                        Iterator it3 = this._waitingQueue.iterator();
                        while (z4 && it3.hasNext()) {
                            QueueElement queueElement4 = (QueueElement) it3.next();
                            Task task4 = queueElement4.getTask();
                            AbstractItemLink link3 = task4.getLink();
                            if (link3 != null && link3 == task.getLink() && link3.isRemoving() && !task4.isDeleteOfPersistentRepresentation()) {
                                it3.remove();
                                int dataSize4 = queueElement4.getDataSize();
                                this._dispatchedBytes -= dataSize4;
                                ((Tuple) task4.getPersistable()).persistableOperationCancelled();
                                if (((Tuple) task.getPersistable()).persistableOperationsOutstanding() == 1) {
                                    z4 = false;
                                }
                                queueElement4.notifyDispatch();
                                if (queueElement4.isAdd()) {
                                    z3 = true;
                                }
                                i++;
                                i2 += dataSize4;
                                if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                    SibTr.debug(this, SpillDispatcher.tc, "Task matched and removed from WAITING queue: New Task: " + task + ", Cancelled Task: " + task4);
                                }
                            }
                        }
                    }
                    z2 = z3;
                } else if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                    SibTr.debug(this, SpillDispatcher.tc, "New task is NOT a delete. SKIPPING cancellation.");
                }
                if (z2) {
                    ((Tuple) task.getPersistable()).persistableOperationCancelled();
                    queueElement.notifyDispatch();
                    int i3 = i + 1;
                    int i4 = i2 + dataSize;
                } else if (dataSize >= SpillDispatcher.this._maxBytesPerBatch) {
                    if (queueElement.isAdd()) {
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "Adding to FRONT of dispatch ADDING queue");
                        }
                        this._dispatchAddingQueue.addFirst(queueElement);
                    } else if (queueElement.isUpdate()) {
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "Adding to FRONT of dispatch UPDATE queue");
                        }
                        this._dispatchUpdateQueue.addFirst(queueElement);
                    } else if (queueElement.isDelete()) {
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "Adding to FRONT of dispatch REMOVE queue");
                        }
                        this._dispatchRemoveQueue.addFirst(queueElement);
                    }
                    this._dispatchedBytes += dataSize;
                    queueElement.notifyDispatch();
                } else if (queueElement.isDelete() || (this._waitingQueue.isEmpty() && this._dispatchedBytes + dataSize <= SpillDispatcher.this._maxDispatchedBytesPerThread)) {
                    if (queueElement.isAdd()) {
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "Adding to dispatch ADDING queue");
                        }
                        this._dispatchAddingQueue.add(queueElement);
                    } else if (queueElement.isUpdate()) {
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "Adding to dispatch UPDATE queue");
                        }
                        this._dispatchUpdateQueue.add(queueElement);
                    } else if (queueElement.isDelete()) {
                        if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                            SibTr.debug(this, SpillDispatcher.tc, "Adding to dispatch REMOVE queue");
                        }
                        this._dispatchRemoveQueue.add(queueElement);
                    }
                    this._dispatchedBytes += dataSize;
                    queueElement.notifyDispatch();
                } else {
                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                        SibTr.debug(this, SpillDispatcher.tc, "Adding to WAITING queue");
                    }
                    this._waitingQueue.add(queueElement);
                    if (this._isContributingToThreadWriteErrors) {
                        queueElement.notifyDispatch();
                    } else {
                        z = true;
                    }
                }
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "addTask", Boolean.valueOf(z));
            }
            return z;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void notifyDispatchArrived() {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "notifyDispatchArrived");
            }
            synchronized (this._dispatchingLock) {
                int i = this._writeErrorOccurred ? 1 : SpillDispatcher.this._maxTasksPerBatch;
                int size = this._dispatchAddingQueue.size();
                int size2 = this._dispatchUpdateQueue.size();
                int size3 = this._dispatchRemoveQueue.size();
                if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                    SibTr.debug(this, SpillDispatcher.tc, "_notifyOutstanding=" + this._notifyOutstanding);
                    SibTr.debug(this, SpillDispatcher.tc, "_threadActive=" + this._threadActive);
                    SibTr.debug(this, SpillDispatcher.tc, "_dispatchAddingQueue.size()=" + size);
                    SibTr.debug(this, SpillDispatcher.tc, "_dispatchUpdateQueue.size()=" + size2);
                    SibTr.debug(this, SpillDispatcher.tc, "_dispatchRemoveQueue.size()=" + size3);
                    SibTr.debug(this, SpillDispatcher.tc, "_waitingQueue.size()=" + this._waitingQueue.size());
                    SibTr.debug(this, SpillDispatcher.tc, "maxTasksInBatch=" + i);
                    SibTr.debug(this, SpillDispatcher.tc, "_dispatchedBytes=" + this._dispatchedBytes);
                    SibTr.debug(this, SpillDispatcher.tc, "_minBytesPerBatch=" + SpillDispatcher.this._minBytesPerBatch);
                    SibTr.debug(this, SpillDispatcher.tc, "_maxBytesPerBatch=" + SpillDispatcher.this._maxBytesPerBatch);
                }
                if (!this._notifyOutstanding && !this._threadActive && (size + size2 + size3 >= i || this._dispatchedBytes >= SpillDispatcher.this._maxBytesPerBatch || !this._waitingQueue.isEmpty())) {
                    this._notifyOutstanding = false;
                    this._dispatchingLock.notify();
                }
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "notifyDispatchArrived");
            }
        }

        private LinkedList buildBatch() {
            Iterator it;
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "buildBatch");
            }
            int i = this._writeErrorOccurred ? 1 : SpillDispatcher.this._maxTasksPerBatch;
            LinkedList linkedList = new LinkedList();
            int size = this._dispatchAddingQueue.size();
            int size2 = this._dispatchUpdateQueue.size();
            int size3 = this._dispatchRemoveQueue.size();
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "_dispatchAddingQueue.size()=" + size);
                SibTr.debug(this, SpillDispatcher.tc, "_dispatchUpdateQueue.size()=" + size2);
                SibTr.debug(this, SpillDispatcher.tc, "_dispatchRemoveQueue.size()=" + size3);
                SibTr.debug(this, SpillDispatcher.tc, "_waitingQueue.size()=" + this._waitingQueue.size());
                SibTr.debug(this, SpillDispatcher.tc, "maxTasksInBatch=" + i);
                SibTr.debug(this, SpillDispatcher.tc, "_dispatchedBytes=" + this._dispatchedBytes);
                SibTr.debug(this, SpillDispatcher.tc, "_minBytesPerBatch=" + SpillDispatcher.this._minBytesPerBatch);
                SibTr.debug(this, SpillDispatcher.tc, "_maxBytesPerBatch=" + SpillDispatcher.this._maxBytesPerBatch);
            }
            if (size + size2 + size3 >= i || this._dispatchedBytes >= SpillDispatcher.this._minBytesPerBatch || !this._waitingQueue.isEmpty()) {
                int i2 = 0;
                long j = 0;
                long j2 = this._dispatchedBytes;
                boolean z = true;
                boolean z2 = false;
                if (size3 > 0) {
                    it = this._dispatchRemoveQueue.iterator();
                } else if (size > 0) {
                    it = this._dispatchAddingQueue.iterator();
                    z2 = true;
                } else {
                    it = this._dispatchUpdateQueue.iterator();
                    z2 = 2;
                }
                while (true) {
                    if (z && i2 < i && j < SpillDispatcher.this._maxBytesPerBatch) {
                        if (!it.hasNext()) {
                            switch (z2) {
                                case false:
                                    z = false;
                                    break;
                                case true:
                                    it = this._dispatchUpdateQueue.iterator();
                                    z2 = 2;
                                    break;
                                default:
                                    z = false;
                                    break;
                            }
                        } else {
                            QueueElement queueElement = (QueueElement) it.next();
                            synchronized (queueElement) {
                                Task task = queueElement.getTask();
                                if (queueElement.isCancelled()) {
                                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                        SibTr.debug(this, SpillDispatcher.tc, "Task cancelled from under us: " + task);
                                    }
                                } else if (!queueElement.isUpdate() || !task.getLink().isRemoving()) {
                                    int dataSize = queueElement.getDataSize();
                                    if (dataSize + j < SpillDispatcher.this._maxBytesPerBatch || i2 == 0) {
                                        if (j2 >= SpillDispatcher.this._minBytesPerBatch) {
                                            j += dataSize;
                                            i2++;
                                            j2 -= dataSize;
                                            queueElement.setBatched(true);
                                            linkedList.add(queueElement);
                                            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                                SibTr.debug(this, SpillDispatcher.tc, "Task added to batch: " + task);
                                            }
                                        } else if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                            SibTr.debug(this, SpillDispatcher.tc, "Stopping batch creation as dispatchedBytes < minBytesPerBatch");
                                        }
                                    } else if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                        SibTr.debug(this, SpillDispatcher.tc, "Stopping batch creation as (bytesInBatch + newTask.size) > maxBytesPerBatch");
                                    }
                                } else if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                    SibTr.debug(this, SpillDispatcher.tc, "Task skipped as it is a candidate for cancellation: " + task);
                                }
                            }
                        }
                    }
                }
            } else if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "No batch built");
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "buildBatch", Integer.valueOf(linkedList.size()));
            }
            return linkedList;
        }

        private boolean writeBatch(LinkedList linkedList) throws SevereMessageStoreException {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "writeBatch", Integer.valueOf(linkedList.size()));
            }
            boolean z = false;
            BatchingContext batchingContext = null;
            int i = 0;
            int i2 = 0;
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                QueueElement queueElement = (QueueElement) it.next();
                Task task = queueElement.getTask();
                if (queueElement.isDelete() && !((Tuple) task.getPersistable()).persistableRepresentationWasCreated()) {
                    queueElement.setCancelled();
                }
                try {
                    task.ensureDataAvailable();
                } catch (PersistentDataEncodingException e) {
                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                        SibTr.debug(this, SpillDispatcher.tc, "ensureDataAvailable encountered item which couldn't encode");
                    }
                    queueElement.setCancelled();
                } catch (SevereMessageStoreException e2) {
                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                        SibTr.debug(this, SpillDispatcher.tc, "ensureDataAvailable encountered item not in store");
                    }
                    queueElement.setCancelled();
                }
                if (!queueElement.isCancelled()) {
                    if (batchingContext == null) {
                        batchingContext = SpillDispatcher.this._bcfactory.createBatchingContext();
                    }
                    task.persist(batchingContext, TransactionState.STATE_COMMITTED);
                }
                if (queueElement.isCancelled()) {
                    i++;
                    i2 += queueElement.getDataSize();
                }
            }
            if (batchingContext != null) {
                try {
                    batchingContext.executeBatch();
                    z = true;
                } catch (SeverePersistenceException e3) {
                    FFDCFilter.processException(e3, "com.ibm.ws.sib.msgstore.persistence.impl.SpillDispatcher.writeBatch", "1:1757:1.48", this);
                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEventEnabled()) {
                        SibTr.event(this, SpillDispatcher.tc, "Exception persisting batch", e3);
                    }
                    throw new SevereMessageStoreException((Throwable) e3);
                } catch (PersistenceException e4) {
                    FFDCFilter.processException(e4, "com.ibm.ws.sib.msgstore.persistence.impl.SpillDispatcher.writeBatch", "1:1766:1.48", this);
                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEventEnabled()) {
                        SibTr.event(this, SpillDispatcher.tc, "Exception persisting batch", e4);
                    }
                }
            } else {
                z = true;
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "writeBatch", Boolean.valueOf(z));
            }
            return z;
        }

        private void batchCompleted(LinkedList linkedList) throws SevereMessageStoreException {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "batchCompleted");
            }
            int size = linkedList.size();
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "batchSize=" + size);
            }
            if (size > 0) {
                Iterator it = linkedList.iterator();
                synchronized (this._dispatchingLock) {
                    while (it.hasNext()) {
                        QueueElement queueElement = (QueueElement) it.next();
                        if (queueElement.isCancelled()) {
                            ((Tuple) queueElement.getTask().getPersistable()).persistableOperationCancelled();
                        } else {
                            ((Tuple) queueElement.getTask().getPersistable()).persistableOperationCompleted();
                        }
                        if (queueElement.isAdd()) {
                            QueueElement queueElement2 = (QueueElement) this._dispatchAddingQueue.remove(0);
                            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                SibTr.debug(this, SpillDispatcher.tc, "Task removed from ADDING queue after batch completion: " + queueElement2.getTask());
                            }
                        } else if (queueElement.isUpdate()) {
                            this._dispatchUpdateQueue.remove(queueElement);
                            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                SibTr.debug(this, SpillDispatcher.tc, "Task removed from UPDATE queue after batch completion: " + queueElement.getTask());
                            }
                        } else if (queueElement.isDelete()) {
                            QueueElement queueElement3 = (QueueElement) this._dispatchRemoveQueue.remove(0);
                            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                SibTr.debug(this, SpillDispatcher.tc, "Task removed from REMOVE queue after batch completion: " + queueElement3.getTask());
                            }
                        }
                        this._dispatchedBytes -= queueElement.getDataSize();
                    }
                }
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "batchCompleted");
            }
        }

        private int batchCancelled(LinkedList linkedList) throws SevereMessageStoreException {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "batchCancelled");
            }
            int size = linkedList.size();
            int i = 0;
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                SibTr.debug(this, SpillDispatcher.tc, "batchSize=" + size);
            }
            if (size > 0) {
                Iterator it = linkedList.iterator();
                synchronized (this._dispatchingLock) {
                    while (it.hasNext()) {
                        QueueElement queueElement = (QueueElement) it.next();
                        if (queueElement.isCancelled()) {
                            ((Tuple) queueElement.getTask().getPersistable()).persistableOperationCancelled();
                            if (queueElement.isAdd()) {
                                if (this._dispatchAddingQueue.remove(queueElement)) {
                                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                        SibTr.debug(this, SpillDispatcher.tc, "Cancelled element removed from ADDING queue.");
                                    }
                                } else if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                    SibTr.debug(this, SpillDispatcher.tc, "Cancelled element not found in ADDING queue!");
                                }
                            } else if (queueElement.isUpdate()) {
                                if (this._dispatchUpdateQueue.remove(queueElement)) {
                                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                        SibTr.debug(this, SpillDispatcher.tc, "Cancelled element removed from UPDATE queue.");
                                    }
                                } else if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                    SibTr.debug(this, SpillDispatcher.tc, "Cancelled element not found in UPDATE queue!");
                                }
                            } else if (queueElement.isDelete()) {
                                if (this._dispatchRemoveQueue.remove(queueElement)) {
                                    if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                        SibTr.debug(this, SpillDispatcher.tc, "Cancelled element removed from REMOVE queue.");
                                    }
                                } else if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isDebugEnabled()) {
                                    SibTr.debug(this, SpillDispatcher.tc, "Cancelled element not found in REMOVE queue!");
                                }
                            }
                            this._dispatchedBytes -= queueElement.getDataSize();
                            i++;
                        }
                    }
                }
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "batchCancelled", Integer.valueOf(i));
            }
            return i;
        }

        private void handleWriteError(boolean z, int i) {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "handleWriteError", new Object[]{Boolean.valueOf(z), new Integer(i)});
            }
            boolean z2 = false;
            if (!this._writeErrorOccurred || this._consecutiveWriteErrors == 0) {
                this._writeErrorOccurred = true;
                this._goodWritesSinceLastError = 0;
                this._consecutiveWriteErrors = 1;
                SpillDispatcher.this._writesToResetErrorState = i;
                this._writeErrorRetryDelay = 1L;
            } else {
                z2 = true;
            }
            if (z || z2) {
                if (!this._isContributingToThreadWriteErrors) {
                    this._isContributingToThreadWriteErrors = true;
                    SpillDispatcher.this.threadWriteErrorOccurred(this._threadNum);
                }
                this._consecutiveWriteErrors++;
                this._writeErrorRetryDelay = 5000 * (this._consecutiveWriteErrors > 5 ? 5 : this._consecutiveWriteErrors);
                synchronized (this._dispatchingLock) {
                    Iterator it = this._waitingQueue.iterator();
                    while (it.hasNext()) {
                        ((QueueElement) it.next()).notifyDispatch();
                    }
                }
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "handleWriteError");
            }
        }

        private void promoteWaiters() {
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.entry(this, SpillDispatcher.tc, "promoteWaiters");
            }
            synchronized (this._dispatchingLock) {
                if (!this._waitingQueue.isEmpty() && this._dispatchedBytes < SpillDispatcher.this._maxDispatchedBytesPerThread) {
                    Iterator it = this._waitingQueue.iterator();
                    while (it.hasNext()) {
                        QueueElement queueElement = (QueueElement) it.next();
                        if (this._dispatchedBytes >= SpillDispatcher.this._minBytesPerBatch && this._dispatchedBytes + queueElement.getDataSize() > SpillDispatcher.this._maxDispatchedBytesPerThread) {
                            break;
                        }
                        queueElement.notifyDispatch();
                        it.remove();
                        if (queueElement.isAdd()) {
                            this._dispatchAddingQueue.add(queueElement);
                        } else if (queueElement.isUpdate()) {
                            this._dispatchUpdateQueue.add(queueElement);
                        } else if (queueElement.isDelete()) {
                            this._dispatchRemoveQueue.add(queueElement);
                        }
                        this._dispatchedBytes += queueElement.getDataSize();
                    }
                }
            }
            if (TraceComponent.isAnyTracingEnabled() && SpillDispatcher.tc.isEntryEnabled()) {
                SibTr.exit(this, SpillDispatcher.tc, "promoteWaiters");
            }
        }

        public boolean isInterruptible() {
            return this._interruptible;
        }

        public String toString() {
            return this._threadName;
        }
    }

    public SpillDispatcher(MessageStoreImpl messageStoreImpl, BatchingContextFactory batchingContextFactory) {
        this(messageStoreImpl, batchingContextFactory, obtainIntConfigParameter(messageStoreImpl, MessageStoreConstants.PROP_JDBC_SPILL_THREADS, "8", 1, 32));
    }

    public SpillDispatcher(MessageStoreImpl messageStoreImpl, BatchingContextFactory batchingContextFactory, int i) {
        this._stopRequested = false;
        this._running = false;
        this._threadWriteErrorsOutstanding = 0;
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "<init>", new Object[]{messageStoreImpl, batchingContextFactory, Integer.valueOf(i)});
        }
        this._msi = messageStoreImpl;
        this._bcfactory = batchingContextFactory;
        this._maxThreads = i;
        this._minBytesPerBatch = obtainLongConfigParameter(this._msi, MessageStoreConstants.PROP_JDBC_SPILL_MIN_BYTES_PER_BATCH, MessageStoreConstants.PROP_JDBC_SPILL_MIN_BYTES_PER_BATCH_DEFAULT, 10000L, 100000000L);
        this._maxBytesPerBatch = obtainLongConfigParameter(this._msi, MessageStoreConstants.PROP_JDBC_SPILL_MAX_BYTES_PER_BATCH, "4000000", 100000L, 100000000L);
        this._maxTasksPerBatch = obtainIntConfigParameter(this._msi, MessageStoreConstants.PROP_JDBC_SPILL_MAX_TASKS_PER_BATCH, "64", 1, 10000);
        this._maxDispatchedBytesPerThread = obtainLongConfigParameter(this._msi, MessageStoreConstants.PROP_JDBC_SPILL_MAX_DISPATCHED_BYTES_PER_THREAD, MessageStoreConstants.PROP_JDBC_SPILL_MAX_DISPATCHED_BYTES_PER_THREAD_DEFAULT, 100000L, 1000000000L);
        if (this._minBytesPerBatch > this._maxDispatchedBytesPerThread) {
            this._minBytesPerBatch = this._maxDispatchedBytesPerThread;
        }
        if (this._minBytesPerBatch > this._maxBytesPerBatch) {
            this._minBytesPerBatch = this._maxBytesPerBatch;
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "<init>");
        }
    }

    @Override // com.ibm.ws.sib.msgstore.persistence.Dispatcher
    public synchronized boolean isHealthy() {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "isHealthy");
        }
        boolean z = this._running && !this._stopRequested && this._threadWriteErrorsOutstanding == 0;
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "isHealthy", Boolean.valueOf(z));
        }
        return z;
    }

    @Override // com.ibm.ws.sib.msgstore.persistence.Dispatcher
    public void dispatch(Collection collection, PersistentTransaction persistentTransaction, boolean z) throws PersistenceException, SevereMessageStoreException {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "dispatch", new Object[]{collection, Boolean.valueOf(z)});
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (collection != null) {
            boolean z2 = false;
            DispatchNotifier dispatchNotifier = new DispatchNotifier(collection.size(), z);
            boolean[] zArr = new boolean[this._maxThreads];
            boolean[] zArr2 = new boolean[this._maxThreads];
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                Task task = (Task) it.next();
                task.copyDataIfVulnerable();
                ((Tuple) task.getPersistable()).persistableOperationBegun();
                int uniqueId = (int) (task.getPersistable().getUniqueId() % this._maxThreads);
                zArr2[uniqueId] = this._workers[uniqueId].addTask(task, currentTimeMillis, dispatchNotifier);
                zArr[uniqueId] = true;
            }
            for (int i = 0; i < this._maxThreads; i++) {
                if (zArr[i]) {
                    this._workers[i].notifyDispatchArrived();
                }
                z2 |= zArr2[i];
            }
            if (z2) {
                try {
                    dispatchNotifier.waitForDispatch();
                } catch (PersistenceException e) {
                    FFDCFilter.processException(e, "com.ibm.ws.sib.msgstore.persistence.impl.SpillDispatcher.writeBatch", "1:326:1.48", this);
                    if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                        SibTr.event(this, tc, "Exception persisting batch", e);
                    }
                }
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "dispatch");
        }
    }

    @Override // com.ibm.ws.sib.msgstore.persistence.Dispatcher
    public void start() {
        JsMessagingEngine _getMessagingEngine;
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "start");
        }
        int obtainIntConfigParameter = obtainIntConfigParameter(this._msi, MessageStoreConstants.PROP_JDBC_SPILL_THREAD_PRIORITY_DELTA, "0", -4, 5);
        this._threads = new Thread[this._maxThreads];
        this._workers = new SpillDispatcherThread[this._maxThreads];
        synchronized (this) {
            this._stopRequested = false;
            this._running = true;
        }
        String str = "";
        if (this._msi != null && (_getMessagingEngine = this._msi._getMessagingEngine()) != null) {
            str = _getMessagingEngine.getUuid().toString() + CacheDecoratorFactory.DASH;
        }
        for (int i = 0; i < this._maxThreads; i++) {
            String str2 = "sib.SpillDispatcher-" + str + i;
            this._workers[i] = new SpillDispatcherThread(i, str2);
            this._threads[i] = new Thread(this._workers[i], str2);
            this._threads[i].setDaemon(true);
            this._threads[i].setPriority(5 + obtainIntConfigParameter);
            this._threads[i].start();
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "start");
        }
    }

    @Override // com.ibm.ws.sib.msgstore.persistence.Dispatcher
    public void stop(int i) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "stop", Integer.valueOf(i));
        }
        boolean z = false;
        synchronized (this) {
            if (this._running && !this._stopRequested) {
                z = true;
                this._stopRequested = true;
            }
        }
        if (z) {
            for (int i2 = 0; i2 < this._maxThreads; i2++) {
                this._workers[i2].cleanup();
            }
            long currentTimeMillis = System.currentTimeMillis() + Sync.ONE_MINUTE;
            for (int i3 = 0; i3 < this._maxThreads; i3++) {
                long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                if (currentTimeMillis2 <= 0) {
                    currentTimeMillis2 = 1;
                }
                try {
                    if (this._threads[i3] != null) {
                        this._threads[i3].join(currentTimeMillis2);
                        if (this._threads[i3].isAlive() && TraceComponent.isAnyTracingEnabled() && tc.isDebugEnabled()) {
                            SibTr.debug(this, tc, "Cannot join dispatcher thread " + this._workers[i3]);
                        }
                    }
                } catch (InterruptedException e) {
                }
            }
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "stop");
        }
    }

    public synchronized void threadWriteErrorOccurred(int i) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "threadWriteErrorOccurred", Integer.valueOf(i));
        }
        this._threadWriteErrorsOutstanding++;
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "threadWriteErrorOccurred", "_threadWriteErrorsOutstanding=" + this._threadWriteErrorsOutstanding);
        }
    }

    public synchronized void threadWriteErrorCleared(int i) {
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.entry(this, tc, "threadWriteErrorCleared", Integer.valueOf(i));
        }
        if (this._threadWriteErrorsOutstanding > 0) {
            this._threadWriteErrorsOutstanding--;
        }
        if (TraceComponent.isAnyTracingEnabled() && tc.isEntryEnabled()) {
            SibTr.exit(this, tc, "threadWriteErrorCleared", "_threadWriteErrorsOutstanding=" + this._threadWriteErrorsOutstanding);
        }
    }

    public String toString() {
        return super.toString() + (this._stopRequested ? " (STOP REQUESTED)" : "") + (!this._running ? " (STOPPED)" : "") + (this._threadWriteErrorsOutstanding > 0 ? " (ERROR)" : "");
    }

    static /* synthetic */ int access$408(SpillDispatcher spillDispatcher) {
        int i = spillDispatcher._numThreads;
        spillDispatcher._numThreads = i + 1;
        return i;
    }

    static /* synthetic */ int access$410(SpillDispatcher spillDispatcher) {
        int i = spillDispatcher._numThreads;
        spillDispatcher._numThreads = i - 1;
        return i;
    }
}
