package com.ibm.ws.microprofile.reactive.messaging.kafka;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.ras.annotation.TraceObjectField;
import com.ibm.websphere.ras.annotation.TraceOptions;
import com.ibm.ws.ras.instrument.annotation.InjectedFFDC;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;

@InjectedFFDC
@TraceObjectField(fieldName = "tc", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
@TraceOptions
/* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/ThresholdCounterImpl.class */
public class ThresholdCounterImpl implements ThresholdCounter {
    private static final TraceComponent tc = Tr.register(ThresholdCounterImpl.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");
    private final int threshold;
    private final AtomicInteger counter = new AtomicInteger(0);
    private CompletableFuture<Void> counterThresholdStage = CompletableFuture.completedFuture(null);
    static final long serialVersionUID = 4586848603809325997L;

    public ThresholdCounterImpl(int i) {
        this.threshold = i;
    }

    @Override // com.ibm.ws.microprofile.reactive.messaging.kafka.ThresholdCounter
    public void increment() {
        synchronized (this.counter) {
            int incrementAndGet = this.counter.incrementAndGet();
            if (incrementAndGet == this.threshold) {
                if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                    Tr.event(tc, "Too many outstanding unacked messages, stopping polling kafka. Current count: " + incrementAndGet, new Object[0]);
                }
                this.counterThresholdStage = new CompletableFuture<>();
            }
        }
    }

    @Override // com.ibm.ws.microprofile.reactive.messaging.kafka.ThresholdCounter
    public void decrement() {
        int andDecrement;
        CompletableFuture<Void> completableFuture;
        synchronized (this.counter) {
            andDecrement = this.counter.getAndDecrement();
            completableFuture = this.counterThresholdStage;
        }
        if (andDecrement == this.threshold) {
            if (TraceComponent.isAnyTracingEnabled() && tc.isEventEnabled()) {
                Tr.event(tc, "Outstanding unacked message count dropped below threshold, resuming polling kafka. Current count: " + andDecrement, new Object[0]);
            }
            completableFuture.complete(null);
        }
    }

    @Override // com.ibm.ws.microprofile.reactive.messaging.kafka.ThresholdCounter
    public CompletionStage<Void> waitForBelowThreshold() {
        return this.counterThresholdStage;
    }
}
