package com.ibm.rmm.ptl.mstp.transmitter;

import com.ibm.jms.JMSStringResources;
import com.ibm.rmm.intrn.util.Sutils;
import com.ibm.rmm.intrn.util.TokenBucket;
import com.ibm.rmm.ptl.ifc.transmitter.FullBufferListener;
import com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf;
import com.ibm.rmm.ptl.ifc.transmitter.StreamTIf;
import com.ibm.rmm.util.LogEventListener;
import com.ibm.rmm.util.RmmAddress;
import com.ibm.rmm.util.RmmLogger;
import com.ibm.rmm.util.StackTracer;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Properties;

/* loaded from: input_file:MQJMS/rmm.jar:com/ibm/rmm/ptl/mstp/transmitter/PTransmitter.class */
public class PTransmitter implements PTransmitterIf {
    private static DatagramSocket nackSocket;
    static int nackPort;
    static byte[] ipAddress;
    static boolean logError;
    static short packetSize;
    static int tracingLevel;
    private static LogEventListener tracingListener;
    private static LogEventListener problemListener;
    static TokenBucket tokenBucket;
    static int maxTrans;
    static int nPending;
    static int nPendingMax;
    static int nSentMax;
    static InetAddress mcInterf;
    static FullBufferListener fullBufListener;
    static Hashtable streamHT;
    static boolean isRunning;
    private static short idSeed;
    static PacketFireout streamFireout;
    static MemCleaner streamCleaner;
    static NackServer nackServer;
    static Repairer streamRepairer;
    static ControlPacketSender hbSender;
    private static Object lock = new Object();
    private static boolean needRelServices = false;
    private static boolean relSrvRun = false;

    @Override // com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf
    public void init(Properties properties, short s, InetAddress inetAddress) {
        packetSize = s;
        Config.set(properties);
        nPendingMax = 4000000 / packetSize;
        nSentMax = Config.maxStreamHistorySize / packetSize;
        maxTrans = 10;
        if (Config.limitRate != 0) {
            tokenBucket = new TokenBucket(Config.transmissionRateKbps);
        }
        if (Config.limitRate == 2) {
            RmmLogger.baseInfo("Starting Global Congestion Control", "PTL");
        }
        idSeed = (short) 0;
        mcInterf = inetAddress;
        nackSocket = RmmAddress.getUdpSocket();
        nackPort = nackSocket.getLocalPort();
        RmmLogger.baseInfo(new StringBuffer("Nack port:").append(nackPort).toString(), "PTL");
        if (RmmAddress.getInstance() == null || RmmAddress.getInstance().getInetAddress() == null) {
            ipAddress = new byte[4];
            ipAddress[0] = (byte) System.currentTimeMillis();
            ipAddress[1] = (byte) problemListener.hashCode();
            ipAddress[2] = (byte) tracingListener.hashCode();
            ipAddress[3] = (byte) hashCode();
        } else {
            ipAddress = RmmAddress.getInstance().getInetAddress().getAddress();
        }
        streamHT = new Hashtable();
        streamFireout = new PacketFireout();
        streamFireout.setPriority(10);
        hbSender = new ControlPacketSender();
        start();
    }

    @Override // com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf
    public synchronized StreamTIf createStreamTransmitter(boolean z, InetAddress inetAddress, byte[] bArr, boolean z2) {
        short s = idSeed;
        idSeed = (short) (s + 1);
        StreamT streamT = new StreamT();
        streamT.init(bArr, s, z, inetAddress, z2);
        if (Config.limitRate == 2) {
            streamT.startCongestionControl(Config.transmissionRateKbps);
        }
        streamHT.put(new Short(s), streamT);
        streamFireout.addStream(streamT);
        if (!needRelServices && z) {
            needRelServices = true;
            if (isRunning && !relSrvRun) {
                startRelServices();
            }
        }
        return streamT;
    }

    @Override // com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf
    public synchronized void changeTransmissionRate(int i) {
        tokenBucket.setRate(i);
    }

    @Override // com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf
    public String getProtocolVersion() {
        return "5";
    }

    @Override // com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf
    public StreamTIf getStream(byte[] bArr) {
        Enumeration elements = streamHT.elements();
        while (elements.hasMoreElements()) {
            StreamTIf streamTIf = (StreamTIf) elements.nextElement();
            if (Sutils.compareByteArrays(streamTIf.getTag(), bArr)) {
                return streamTIf;
            }
        }
        return null;
    }

    @Override // com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf
    public synchronized boolean isRunning() {
        return isRunning;
    }

    @Override // com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf
    public synchronized Enumeration listStreams() {
        return streamHT.elements();
    }

    @Override // com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf
    public void setFullBufferListener(FullBufferListener fullBufferListener) {
        if (fullBufListener != null) {
            RmmLogger.baseWarn("Replacing FullBufferListener", new StackTracer(), "PTL");
        }
        fullBufListener = fullBufferListener;
    }

    private void startRelServices() {
        nackServer = new NackServer(nackSocket);
        nackServer.setPriority(10);
        nackServer.start();
        streamRepairer = new Repairer();
        streamRepairer.setPriority(10);
        streamRepairer.start();
        streamCleaner = new MemCleaner();
        streamCleaner.start();
        relSrvRun = true;
    }

    private void start() {
        isRunning = true;
        streamFireout.start();
        hbSender.start();
        if (needRelServices) {
            startRelServices();
        }
    }

    @Override // com.ibm.rmm.ptl.ifc.transmitter.PTransmitterIf
    public synchronized void stop() {
        RmmLogger.baseLog(1, new Object[]{"PTransmitter"}, null, "PTL");
        RmmLogger.baseInfo(new StringBuffer("Waiting for ").append(Config.cpTimeout / JMSStringResources.MQJMS_EXCEPTION_MSG_CREATE_ERROR).append("sec (control packet timeout)\n").append("to let transmitter send pending packets and receivers complete the reception").toString(), "PTL");
        Enumeration keys = streamHT.keys();
        while (keys.hasMoreElements()) {
            ((StreamT) streamHT.get((Short) keys.nextElement())).close();
        }
        try {
            Thread.sleep(Config.cpTimeout);
        } catch (InterruptedException e) {
        }
        isRunning = false;
        hbSender.interrupt();
        if (streamFireout != null) {
            streamFireout.interrupt();
        }
        if (nackServer != null) {
            nackServer.interrupt();
        }
        if (streamRepairer != null) {
            streamRepairer.interrupt();
        }
        if (streamCleaner != null) {
            streamCleaner.interrupt();
        }
    }
}
