從非同步 Bean 和 CommonJ 移轉至 EE Concurrency 的範例

您可以將使用非同步 Bean 及 CommonJ 的計時器和工作管理員 API 的應用程式移轉為使用 Concurrency Utilities for Java™ EE。

程式碼範例中所用的資源

這個頁面的程式碼範例假設應用程式已注入或查閱下列資源:

@Resource(lookup = "wm/default")
private com.ibm.websphere.asynchbeans.WorkManager abWorkManager;

@Resource(lookup = "wm/default")
private commonj.work.WorkManager cjWorkManager;

@Resource
private ContextService contextService;

@Resource(name = "java:app/env/jdbc/dsRef")
private DataSource dataSource;

@Resource
private ManagedScheduledExecutorService executor;

@Resource
private ManagedThreadFactory threadFactory;

@Resource(name = "java:comp/env/tm/default", lookup = "tm/default", shareable = false)
private TimerManager timerManager;

@Resource
private UserTransaction tran;

基本作業實作

這一節提供若干簡單作業實作的範例,這份文件其餘部分的其他範例會用到這些實作。 非同步 Bean 需要個別的 AlarmListener 介面,以用於排定在將來執行的作業。 CommonJ 需要個別的 TimerListener 介面,以用於排定在將來執行的作業。 Concurrency Utilities for Java EE 作業可能是 RunnableCallable,不論提交作業來立即執行或排定在將來執行,它都容許使用其中一個介面。 在某些情況下,可能會將非同步 Bean 或 CommonJ Work 當作 Runnable 提交給受管理執行程式,不進行任何變更。 受管理執行程式取消及岔斷執行中之執行緒的能力會取代 Work 的 release 方法。 LONGRUNNING_HINT 執行內容會取代 Work 的 isDaemon 方法。

非同步 Bean 和 CommonJ 兩者用來尋找下一個質數的 Work 作業範例:

public class PrimeFinderWork implements
    com.ibm.websphere.asynchbeans.Work, commonj.work.Work {
    private long num;
    private volatile boolean released;
    private long result;

    public PrimeFinderWork(long startingValue) {
        num = startingValue;
    }

    public boolean isDaemon() {
        return false;
    }
    public void release() {
        released = true;
    }

    public void run() {
        while (!isPrime(num))
            if (released || Thread.currentThread().isInterrupted())
                throw new RuntimeException(new InterruptedException());
            else
                num++;
        result = num++;
    }

    public long getPrimeNumber() {
        if (result > 0)
            return result;
        else
            throw new IllegalStateException();
    }
}

非同步 Bean 用來尋找下一個質數的 AlarmListener 作業範例:

public class PrimeFinderAlarmListener implements AlarmListener {
    private volatile boolean aborted;
    private int count;
    private long num;
    private long result;

    public PrimeFinderAlarmListener(long startingValue) {
        num = startingValue;
    }

    public void abort() {
        aborted = true;
    }

    public void fired(Alarm alarm) {
        while (!isPrime(num))
            if (aborted || Thread.currentThread().isInterrupted())
                throw new RuntimeException(new InterruptedException());
            else
                num++;
        result = num++;
        // optionally reschedule:
        Object delays = alarm.getContext();
        if (delays instanceof Integer)
            alarm.reset((Integer) delays);
        else if (delays instanceof int[] && count < ((int[]) delays).length)
            alarm.reset(((int[]) delays)[count++]);
    }

    public long getPrimeNumber() {
        if (result > 0)
            return result;
        else
            throw new IllegalStateException();
    }

非同步 Bean 用來尋找下一個質數的 TimerListener 作業範例:

public class PrimeFinderTimerListener implements CancelTimerListener, TimerListener {
    private volatile boolean aborted;
    private int count;
    private final long[] delays;
    private long num;
    private long result;

    public PrimeFinderTimerListener(long startingValue, long... delays) {
        num = startingValue;
        this.delays = delays;
    }

    public void timerCancel(Timer timer) {
        aborted = true;
    }

    public void timerExpired(Timer timer) {
        while (!isPrime(num))
            if (aborted || Thread.currentThread().isInterrupted())
                throw new RuntimeException(new InterruptedException());
            else
                num++;
        result = num++;
        // optionally reschedule:
        if (count < delays.length)
            try {
                TimerManager timerManager = (TimerManager) new InitialContext().lookup(
                    "java:comp/env/tm/default");
                timerManager.schedule(this, delays[count++]);
            } catch (NamingException x) {
                throw new RuntimeException(x);
            }
    }

    public long getPrimeNumber() {
        if (result > 0)
            return result;
        else
            throw new IllegalStateException();
    }
}

Concurrency Utilities for Java EE 用來尋找下一個質數的 Runnable 作業範例:

public class PrimeFinderRunnable implements Runnable {
    private long num;
    private long result;

    public PrimeFinderRunnable(long startingValue) {
        num = startingValue;
    }

    public void run() {
        while (!isPrime(num))
            if (Thread.currentThread().isInterrupted())
                throw new RuntimeException(new InterruptedException());
            else
                num++;
        result = num++;
    }

    public long getPrimeNumber() {
        if (result > 0)
            return result;
        else
            throw new IllegalStateException();
    }
}

Concurrency Utilities for Java EE 用來尋找下一個質數的 Callable 作業範例:

public class PrimeFinderTask implements Callable<Long> {
    private long num;

    public PrimeFinderTask(long startingValue) {
        num = startingValue;
    }

    public Long call() throws InterruptedException {
        while (!isPrime(num))
            if (Thread.currentThread().isInterrupted())
                throw new InterruptedException();
            else
                num++;
        return num++;
    }
}

非同步 Bean 用來執行基本資料庫插入的 Work 作業範例:

public class DBInsertWorkAB implements Work, Serializable {
    private static final long serialVersionUID = 2606824039439594442L;
    private transient Thread executionThread;
    private final String code;
    private final String name;
    private boolean released;
    private volatile int result = -1;

    public DBInsertWorkAB(String code, String name) {
        this.code = code;
        this.name = name;
    }

    public int getResult() {
        return result;
    }

    public synchronized void release() {
        released = true;
        if (executionThread != null)
            executionThread.interrupt();
    }

    public void run() {
	synchronized (this) {
            if (released)
                throw new RuntimeException("Work was canceled");
            executionThread = Thread.currentThread();
        }
        try {
            DataSource ds = (DataSource) new InitialContext().lookup(
                "java:app/env/jdbc/dsRef");
            Connection con = ds.getConnection();
            try {
                PreparedStatement stmt = con.prepareStatement(
                    "INSERT INTO AIRPORTS VALUES(?,?)");
                stmt.setString(1, code);
                stmt.setString(2, name);
                result = stmt.executeUpdate();
            		} finally {
                con.close();
            }
        } catch (NamingException x) {
            throw new RuntimeException(x);
        } catch (SQLException x) {
            throw new RuntimeException(x);
        		} finally {
            synchronized (this) {
                executionThread = null;
            }
        }
    }
}

非同步 Bean 用來執行基本資料庫插入的 AlarmListener 作業範例:

public class DBInsertAlarmListener implements AlarmListener {
    private volatile int result = -1;

    public int getResult() {
        return result;
    }

    public void fired(Alarm alarm) {
        String[] alarmContext = (String[]) alarm.getContext();
        try {
            DataSource ds = (DataSource) new InitialContext().lookup(
                "java:app/env/jdbc/dsRef");
            Connection con = ds.getConnection();
            try {
                PreparedStatement stmt = con.prepareStatement(
                    "INSERT INTO AIRPORTS VALUES(?,?)");
                stmt.setString(1, alarmContext[0]);
                stmt.setString(2, alarmContext[1]);
                result = stmt.executeUpdate();
            		} finally {
                con.close();
            }
        } catch (NamingException x) {
            throw new RuntimeException(x);
        } catch (SQLException x) {
            throw new RuntimeException(x);
        }
    }
}

CommonJ 用來執行基本資料庫插入的 Work 作業範例:

public class DBInsertWorkCJ implements Work, Serializable {
    private static final long serialVersionUID = -8801347489043041978L;
    private transient Thread executionThread;
    private final String code;
    private final String name;
    private boolean isDaemon;
    private boolean released;
    private volatile int result = -1;

    public DBInsertWorkCJ(String code, String name) {
        this.code = code;
        this.name = name;
    }

    public int getResult() {
        return result;
    }

    public boolean isDaemon() {
        return isDaemon;
    }

    public synchronized void release() {
        released = true;
        if (executionThread != null)
            executionThread.interrupt();
    }

    public void run() {
	synchronized (this) {
            if (released)
                throw new RuntimeException("Work was canceled");
            executionThread = Thread.currentThread();
        }
        try {
            DataSource ds = (DataSource) new InitialContext().lookup(
                "java:app/env/jdbc/dsRef");
            Connection con = ds.getConnection();
            try {
                PreparedStatement stmt = con.prepareStatement(
                    "INSERT INTO AIRPORTS VALUES(?,?)");
                stmt.setString(1, code);
                stmt.setString(2, name);
                result = stmt.executeUpdate();
            		} finally {
                con.close();
            }
        } catch (NamingException x) {
            throw new RuntimeException(x);
        } catch (SQLException x) {
            throw new RuntimeException(x);
        		} finally {
            synchronized (this) {
                executionThread = null;
            }
        }
    }

CommonJ 用來執行基本資料庫插入的 TimerListener 作業範例:

public class DBInsertTimerListener implements TimerListener {
    private volatile int result = -1;

    private final String code;
    private final String name;

    public DBInsertTimerListener(String code, String name) {
        this.code = code;
        this.name = name;
    }

    public int getResult() {
        return result;
    }

    public void timerExpired(Timer timer) {
        try {
            DataSource ds = (DataSource) new InitialContext().lookup(
                "java:app/env/jdbc/dsRef");
            Connection con = ds.getConnection();
            try {
                PreparedStatement stmt = con.prepareStatement(
                    "INSERT INTO AIRPORTS VALUES(?,?)");
                stmt.setString(1, code);
                stmt.setString(2, name);
                result = stmt.executeUpdate();
            		} finally {
                con.close();
            }
        } catch (NamingException x) {
            throw new RuntimeException(x);
        } catch (SQLException x) {
            throw new RuntimeException(x);
        }
    }
}

Concurrency Utilities for Java EE 用來執行基本資料庫插入的 Callable 作業範例:

public class DBInsertTask implements Callable<Integer>, Serializable {
    private static final long serialVersionUID = 5556464104788801400L;
    private final String code;
    private final String name;

    public DBInsertTask(String code, String name) {
        this.code = code;
        this.name = name;
    }

    public Integer call() throws NamingException, SQLException {
        DataSource ds = (DataSource) new InitialContext().lookup(
            "java:app/env/jdbc/dsRef");
        Connection con = ds.getConnection();
        try {
            PreparedStatement stmt = con.prepareStatement(
                "INSERT INTO AIRPORTS VALUES(?,?)");
            stmt.setString(1, code);
            stmt.setString(2, name);
            return stmt.executeUpdate();
        		} finally {
            con.close();
        }
    }
}

提交作業

三個程式設計模型都提供了將基本作業提交給聯合排存的執行緒來執行並取得結果的方式。

非同步 Bean 範例:

WorkItem workItem = abWorkManager.startWork(
    new DBInsertWorkAB("DLH", "Duluth International Airport"));
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS)) {
    DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
    int numUpdates = work.getResult();
}

CommonJ 範例:

WorkItem workItem = cjWorkManager.schedule(
    new DBInsertWorkCJ("HIB", "Chisholm-Hibbing Airport"));
if (cjWorkManager.waitForAll(Collections.singletonList(workItem), TIMEOUT_MS)) {
    DBInsertWorkCJ work = (DBInsertWorkCJ) workItem.getResult();
    int numUpdates = work.getResult();
}

Concurrency Utilities for Java EE 範例:

Future<Integer> future = executor.submit(
    new DBInsertTask("INL", "Falls International Airport"));
int numUpdates = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);

提交作業時的其他選項

當您提交作業時,可以選擇性地指派接聽器和啟動逾時值,並指示是否預期執行一段很長的時間。 在非同步 Bean 中,啟動逾時值只能作為一個參數來使用,但對 CommonJ 和 Concurrency Utilities for Java EE 而言,啟動逾時值可以在 WorkListenerManagedTaskListener 內實作。

非同步 Bean 範例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
WorkItem workItem = abWorkManager.startWork(
    new DBInsertWorkAB("SGS", "South Saint Paul Municipal Airport"),
    startTimeout,
    new WorkListenerAB(),
    isLongRunning);
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, Integer.MAX_VALUE)) {
    DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
    int numUpdates = work.getResult();
}

CommonJ 範例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
DBInsertWorkCJ work = new DBInsertWorkCJ("STP", "Saint Paul Downtown Airport");
work.setDaemon(isLongRunning);
WorkItem workItem = cjWorkManager.schedule(
    work, new WorkListenerCJ(work, startTimeout));
Collection<WorkItem> items = Collections.singleton(workItem);
if (cjWorkManager.waitForAll(items, WorkManager.INDEFINITE)) {
    work = (DBInsertWorkCJ) workItem.getResult();
    int numUpdates = work.getResult();
}

Concurrency Utilities for Java EE 範例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
Callable<Integer> contextualTask = ManagedExecutors.managedTask(
    new DBInsertTask("LVN", "Airlake Airport"),
    Collections.singletonMap(ManagedTask.LONGRUNNING_HINT,
        Boolean.toString(isLongRunning)),
    new TaskListener(startTimeout));
Future<Integer> future = executor.submit(contextualTask);
int numUpdates = future.get();

等待完成一個作業群組

三個程式設計模型都提供了若干等待作業群組完成的方法。 下列範例指定最長等待時間。有可能會無限期等待,或可能使用 Concurrency Utilities for Java EE 在 future 變數上循序執行 get

ArrayList<WorkItem> items = new ArrayList<WorkItem>(3);
items.add(abWorkManager.startWork(
    new DBInsertWorkAB("COQ", "Cloquet/Carlton County Airport")));
items.add(abWorkManager.startWork(
    new DBInsertWorkAB("CQM", "Cook Municipal Airport")));
items.add(abWorkManager.startWork(
    new DBInsertWorkAB("CKN", "Crookston Municipal Airport")));
boolean allCompleted = abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS);
int numUpdates = 0;
for (WorkItem workItem : items) {
    if (workItem.getStatus() == WorkEvent.WORK_COMPLETED) {
        DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
        numUpdates += work.getResult();
    } else
        ((Work) workItem.getEventTrigger(Work.class)).release();
}

CommonJ 範例:

List<DBInsertWorkCJ> workList = Arrays.asList(
    new DBInsertWorkCJ("DTL", "Detroit Lakes Airport"),
    new DBInsertWorkCJ("TOB", "Dodge Center Airport"),
    new DBInsertWorkCJ("DYT", "Sky Harbor Airport"));
List<WorkItem> items = new ArrayList<WorkItem>(workList.size());
for (DBInsertWorkCJ work : workList)
    items.add(cjWorkManager.schedule(work));
boolean allCompleted = cjWorkManager.waitForAll(items, TIMEOUT_MS);
int numUpdates = 0;
for (int i = 0; i < items.size(); i++) {
    WorkItem workItem = items.get(i);
    if (workItem.getStatus() == WorkEvent.WORK_COMPLETED) {
        DBInsertWorkCJ work = (DBInsertWorkCJ) workItem.getResult();
        numUpdates += work.getResult();
    } else
        workList.get(i).release();
}

Concurrency Utilities for Java EE 範例:

List<DBInsertTask> tasks = Arrays.asList(
    new DBInsertTask("CFE", "Buffalo Municipal Airport"),
    new DBInsertTask("CHU", "Caledonia-Houston County Airport"),
    new DBInsertTask("CBG", "Cambridge Municipal Airport"));
int numUpdates = 0;
List<Future<Integer>> futures = executor.invokeAll(tasks, TIMEOUT_MS, TimeUnit.MILLISECONDS);
for (Future<Integer> future : futures)
    numUpdates += future.get();

等待完成群組內的一項單一作業

三個程式設計模型都提供了若干等待群組內單一作業完成的方法。 下列範例指定最大的等待時間量,但也有可能無限期等待。

非同步 Bean 範例:

ArrayList<WorkItem> items = new ArrayList<WorkItem>(3);
items.add(abWorkManager.startWork(new PrimeFinderWork(20)));
items.add(abWorkManager.startWork(new PrimeFinderWork(50)));
items.add(abWorkManager.startWork(new PrimeFinderWork(80)));
boolean anyCompleted = abWorkManager.join(items, WorkManager.JOIN_OR, TIMEOUT_MS);
long prime = -1;
for (WorkItem workItem : items) {
    if (workItem.getStatus() == WorkEvent.WORK_COMPLETED) {
        PrimeFinderWork work = (PrimeFinderWork) workItem.getResult();
        prime = work.getPrimeNumber();
    } else
        ((Work) workItem.getEventTrigger(Work.class)).release();
}

CommonJ 範例:

List<PrimeFinderWork> workList = Arrays.asList(
    new PrimeFinderWork(20),
    new PrimeFinderWork(50),
    new PrimeFinderWork(80));
List<WorkItem> items = new ArrayList<WorkItem>(workList.size());
for (PrimeFinderWork work : workList)
    items.add(cjWorkManager.schedule(work));
Collection<WorkItem> completedItems = cjWorkManager.waitForAny(items, TIMEOUT_MS);
long prime = -1;
for (int i = 0; i < items.size(); i++) {
    WorkItem workItem = items.get(i);
    if (completedItems.contains(workItem)) {
        PrimeFinderWork work = (PrimeFinderWork) workItem.getResult();
        prime = work.getPrimeNumber();
    } else
        workList.get(i).release();
}

Concurrency Utilities for Java EE 範例:

List<PrimeFinderTask> tasks = Arrays.asList(
    new PrimeFinderTask(20),
    new PrimeFinderTask(50),
    new PrimeFinderTask(80));
long prime = executor.invokeAny(tasks, TIMEOUT_MS, TimeUnit.MILLISECONDS);

當作業失敗時,取得原因異常狀況

三個程式設計模型都提供了若干在作業執行失敗時取得原因異常狀況的方法。 這可以利用接聽器來完成(這一頁稍後可找到範例),或從 WorkItemFuture 取得作業結果而完成。 這會引發 WorkExceptionExecutionException,其中包含原始異常狀況來作為原因。

非同步 Bean 範例:

boolean isLongRunning = false;
WorkItem workItem = abWorkManager.startWork(
    new DBInsertWorkAB("KADC", "Wadena Municipal Airport"),
    isLongRunning);
Throwable exception = null;
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS))
    try {
        DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
        int numUpdates = work.getResult();
    } catch (WorkException x) {
        exception = x.getCause();
    }

CommonJ 範例:

boolean isLongRunning = false;
DBInsertWorkCJ work = new DBInsertWorkCJ("KBDH", "Willmar Municipal Airport");
work.setDaemon(isLongRunning);
WorkItem workItem = cjWorkManager.schedule(work);
Throwable exception = null;
if (cjWorkManager.waitForAll(Collections.singleton(workItem), TIMEOUT_MS))
    try {
        work = (DBInsertWorkCJ) workItem.getResult();
        int numUpdates = work.getResult();
    } catch (WorkException x) {
        exception = x.getCause();
    }

Concurrency Utilities for Java EE 範例:

boolean isLongRunning = false;
Callable<Integer> task = ManagedExecutors.managedTask(
    new DBInsertTask("KACQ", "Waseca Municipal Airport"),
    Collections.singletonMap(ManagedTask.LONGRUNNING_HINT,
        Boolean.toString(isLongRunning)),
    null);
Future<Integer> future = executor.submit(task);
Throwable exception = null;
try {
    int numUpdates = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (ExecutionException x) {
    exception = x.getCause();
}

排定在間隔之後執行一次性作業

三個程式設計模型都提供了將基本作業排定在未來某時由聯合排存的執行緒來執行並取得結果的方式。

非同步 Bean 範例:

AsynchScope asynchScope = abWorkManager.findOrCreateAsynchScope("MyScope");
AlarmManager alarmManager = asynchScope.getAlarmManager();
Alarm alarm = alarmManager.create(
    new DBInsertAlarmListener(),
    new String[] { "MSP", "Minneapolis-Saint Paul International Airport"},
    (int) TimeUnit.SECONDS.toMillis(1));
DBInsertAlarmListener alarmListener = (DBInsertAlarmListener) alarm.getAlarmListener();
// Poll for result to appear
for (long start = System.nanoTime();
    alarmListener.getResult() < 0 && System.nanoTime() - start < TIMEOUT_NS;
    Thread.sleep(200)) ;
int numUpdates = alarmListener.getResult();

CommonJ 範例:

Timer timer = timerManager.schedule(
    new DBInsertTimerListener("STC", "Saint Cloud Regional Airport"),
    TimeUnit.SECONDS.toMillis(1));
DBInsertTimerListener timerListener = (DBInsertTimerListener) timer.getTimerListener();
// Poll for result to appear
for (long start = System.nanoTime();
    timerListener.getResult() < 0 && System.nanoTime() - start < TIMEOUT_NS;
    Thread.sleep(200)) ;
int numUpdates = timerListener.getResult();

Concurrency Utilities for Java EE 範例:

ScheduledFuture<Integer> future = executor.schedule(
    new DBInsertTask("RST", "Rochester International Airport"),
    1,
    TimeUnit.SECONDS);
int numUpdates = future.get(TIMEOUT_NS, TimeUnit.NANOSECONDS);

排定固定速率的重複作業,以及查詢下次執行之前的間隔

CommonJ 和 Concurrency Utilities for Java EE 程式設計模型提供一種排定依固定速率來執行重複作業的方法(例如,在每小時的開頭)。 不保證即時排程。 在這個時間之後,作業可以在任何點開始,但不能更快。 CommonJ 和 Concurrency Utilities for Java EE 程式設計模型也提供一種便利方法,用來計算在下次執行之前的延遲。

CommonJ 範例:

Timer timer = timerManager.scheduleAtFixedRate(
    new PrimeFinderTimerListener(120),
    TimeUnit.MINUTES.toMillis(90),
    TimeUnit.MINUTES.toMillis(30));
long nextExecTime = timer.getScheduledExecutionTime();
long delay = TimeUnit.MILLISECONDS.toSeconds(
    nextExecTime - System.currentTimeMillis());

Concurrency Utilities for Java EE 範例:

ScheduledFuture<?> future = executor.scheduleAtFixedRate(
    new PrimeFinderRunnable(120), 90, 30, TimeUnit.MINUTES);
long delay = future.getDelay(TimeUnit.SECONDS);

在執行和取消作業之間,以固定延遲來排定重複作業

CommonJ 和 Concurrency Utilities for Java EE 程式設計模型提供一種方法,用來排定在每次執行結束與下次開始之間,依固定間隔來執行重複作業。 不保證即時排程。 在這個時間之後,作業可以在任何點開始,但不能更快。 非同步 Bean 在 Alarm 上提供 reset 方法,可用來實現這個相同的行為。 三個程式設計模型都提供了取消進一步啟動執行排定作業的方式。

非同步 Bean 範例:

AsynchScope asynchScope = abWorkManager.findOrCreateAsynchScope("MyScope");
AlarmManager alarmManager = asynchScope.getAlarmManager();
Alarm alarm = alarmManager.create(
    new PrimeFinderAlarmListener(90), 50, 10);
// ... eventually cancel the alarm
alarm.cancel();

CommonJ 範例:

Timer timer = timerManager.schedule(
    new PrimeFinderTimerListener(90), 50, 50);
// ... eventually cancel the timer
timer.cancel();

Concurrency Utilities for Java EE 範例:

ScheduledFuture<?> future = executor.scheduleWithFixedDelay(
    new PrimeFinderRunnable(90), 50, 50, TimeUnit.MILLISECONDS);
// ... eventually cancel the task
future.cancel(false);

排定依變動間隔來執行重複作業

在三個程式設計模型中,都有可能用每次的執行來計算間隔,過了這個間隔,重複作業便進行下一次的重複。 非同步 Bean 提供警示的重設方法。 Concurrency Utilities for Java EE 可讓您外掛一個 Trigger 來計算下次的執行時間。 CommonJ 未提供其中任何一種方法,但在 CommonJ 及其他程式設計模型中,有可能在前一次執行完成時,對作業重新排程。 在下列範例中,將作業排定為執行正好四次,每次執行前的延遲各不相同。 重設、重新排程,或計算下一次執行的程式碼,可以在包括這一節的 Trigger 內找到,以及包括在基本作業實作一節之下的 AlarmListenerTimerListener 實作內找到。

非同步 Bean 範例:

int initialDelay = 50;
int[] subsequentDelays = new int[] { 40, 80, 70 };
AsynchScope asynchScope = abWorkManager.findOrCreateAsynchScope("MyScope");
AlarmManager alarmManager = asynchScope.getAlarmManager();
Alarm alarm = alarmManager.create(
    new PrimeFinderAlarmListener(60),
    subsequentDelays,
    initialDelay);
Thread.sleep(5000);
PrimeFinderAlarmListener alarmListener =
    (PrimeFinderAlarmListener) alarm.getAlarmListener();
long prime = alarmListener.getPrimeNumber();

CommonJ 範例:

long initialDelay = 50;
long [] subsequentDelays = new long[] { 40, 80, 70 };
Timer timer = timerManager.schedule(
    new PrimeFinderTimerListener(60, subsequentDelays),
    initialDelay);
Thread.sleep(5000);
PrimeFinderTimerListener timerListener = (PrimeFinderTimerListener) timer.getTimerListener();
long prime = timerListener.getPrimeNumber();

Concurrency Utilities for Java EE 範例:

ScheduledFuture<Long> future = executor.schedule(
    new PrimeFinderTask(60),
    new DelayTrigger(50, 40, 80, 70));
Thread.sleep(5000);
long prime = future.get();

public class DelayTrigger implements Trigger {
    private int count;
    private long[] delays;
    volatile boolean isSuspended;

    public DelayTrigger(long... delays) {
        this.delays = delays;
    }

    public Date getNextRunTime(LastExecution previousExecution, Date taskScheduledTime) {
        if (delays.length > count)
            return new Date(System.currentTimeMillis() + delays[count++]);
        else
            return null;
    }

    public boolean skipRun(LastExecution previousExecution, Date scheduledRunTime) {
        return isSuspended;
    }
}

暫停及回復執行作業

CommonJ TimerManager 提供若干介面,用來暫停及回復作業的執行。 Concurrency Utilities for Java EE 在更精細的基礎上,透過可外掛之 Trigger 機制的 skipRun 方法來提供這個能力。 只要 Trigger 的實作能夠支援,便可以將單一 Trigger 實例提供給任意數目的作業。 在下列範例中,Trigger 撰寫成可用來排定單一作業。

CommonJ 範例:

Timer timer = timerManager.schedule(
    new PrimeFinderTimerListener(100),
    new Date(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(5)));
timerManager.suspend();
// ... resume at a later point
if (timerManager.isSuspending() || timerManager.isSuspended())
    timerManager.resume();

Concurrency Utilities for Java EE 範例:

DelayTrigger trigger = new DelayTrigger(
    System.currentTimeMillis() + TimeUnit.DAYS.toMillis(5));
ScheduledFuture<Long> future = executor.schedule(
    new PrimeFinderTask(100), trigger);
trigger.isSuspended = true;
// ... resume at a later point
if (trigger.isSuspended)
    trigger.isSuspended = false;

停止進一步執行作業

非同步 Bean 提供用來毀損 AsynchScope 的方法,它會取消 AlarmManagers 在這個範圍內所建立的所有警示。 CommonJ TimerManager 提供若干介面來停止進一步啟動執行,且會等待所有執行中的作業全部停止。 由於每次查閱 TimerManager 都會產生一個可以獨立於其他實例來停止的新實例,所以能夠如此。 在 Concurrency Utilities for Java EE 中,相同的 ManagedScheduledExecutorService 可以跨越各次的查閱來共用,且不接受 shutdownisTerminatedawaitTermination 之類的生命週期作業(遵循規格)。 不過,使用 Concurrency Utilities for Java EE 時,您可以向未受管理的 ScheduledExecutorService 提供 ManagedThreadFactory

非同步 Bean 範例:

alarmManager.create(
    new PrimeFinderAlarmListener(100),
    null,
    (int) TimeUnit.HOURS.toMillis(1));
alarmManager.create(
    new PrimeFinderAlarmListener(200),
    null,
    (int) TimeUnit.HOURS.toMillis(2));
// ... eventually destroy the asynch scope to cancel all alarms
asynchScope.destroy();

CommonJ 範例:

TimerManager timerManager = (TimerManager) new InitialContext().lookup(
    "java:comp/env/tm/default");
timerManager.schedule(
    new PrimeFinderTimerListener(100),
    new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)));
timerManager.schedule(
    new PrimeFinderTimerListener(200),
    new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(2)));
// ... eventually stop the timer manager
timerManager.stop();
if (!timerManager.isStopped())
    timerManager.waitForStop(TIMEOUT_MS);

Concurrency Utilities for Java EE 範例:

ScheduledExecutorService executor =
    Executors.newScheduledThreadPool(1, threadFactory);
executor.schedule(new PrimeFinderTask(100), 1, TimeUnit.HOURS);
executor.schedule(new PrimeFinderTask(200), 2, TimeUnit.HOURS);
// .. eventually shut down the executor
executor.shutdown();
if (!executor.isTerminated())
    executor.awaitTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS);

建構環境定義 Proxy

非同步 Bean 和 Concurrency Utilities for Java EE 程式設計模型可讓您建構環境定義 Proxy。 執行緒環境定義是從建立環境定義 Proxy 的執行緒擷取而來並儲存在其中,當在這個 Proxy 上呼叫介面方法時,會自動套用於執行緒,之後,再將它從執行緒中移除。 非同步 Bean 透過 EventSource 來提供這個功能。 Concurrency Utilities for Java EE 透過 ContextService 來提供這個功能。

非同步 Bean 範例:

EventSource eventSource = abWorkManager.createEventSource();
eventSource.addListener(new DBWriterImpl());
DBWriter dbWriter = (DBWriter) eventSource.getEventTrigger(DBWriter.class);
// Can invoke interface methods from any thread...
int numUpdates = dbWriter.exec(
    "INSERT INTO AIRPORTS VALUES(?,?)", "AIT", "Aitkin Municipal Airport");

Concurrency Utilities for Java EE 範例:

DBWriter dbWriter = contextService.createContextualProxy(
    new DBWriterImpl(), DBWriter.class);
// Can invoke interface methods from any thread...
int numUpdates = dbWriter.exec(
    "INSERT INTO AIRPORTS VALUES(?,?)", "AXN", "Alexandria Municipal Airport");

建構在執行緒交易中執行之多重實例的環境定義 Proxy

非同步 Bean 和 Concurrency Utilities for Java EE 都可讓您指定是否在執行緒交易中執行環境定義 Proxy 的介面方法,或是否在方法期間暫停現行交易,以及套用全新的交易環境定義。 非同步 Bean 的另一個功能是容許單一 Proxy 觸發多個接聽器實例。 不過,Concurrency Utilities for Java EE 沒有這個功能,類似的行為可以通過撰寫封套接聽器,讓它能夠委派給多個其他接聽器來實現。 下列範例呼叫會觸發多個接聽器實例的單一 Proxy,這些全部是在發出呼叫的執行緒的交易之下進行。

非同步 Bean 範例:

boolean runInSameTran = true;
EventSource eventSource = abWorkManager.createEventSource();
eventSource.addListener(new DBInsertTask("MKT", "Mankato Regional Airport"));
eventSource.addListener(new DBInsertTask("ULM", "New Ulm Municipal Airport"));
eventSource.addListener(new DBInsertTask("OWA", "Owatonna Degner Regional Airport"));
Callable<?> eventTrigger = (Callable<?>) eventSource.getEventTrigger(
    Callable.class, runInSameTran);
// Can invoke interface methods from any thread...
tran.begin();
try {
    eventTrigger.call();
		} finally {
    tran.commit();
}

Concurrency Utilities for Java EE 範例:

Callable<?> eventTrigger = contextService.createContextualProxy(
    new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            new DBInsertTask("FFM", "Fergus Falls Municipal Airport").call();
            new DBInsertTask("ONA", "Winona Municipal Airport").call();
            new DBInsertTask("OTG", "Worthington Municipal Airport").call();
            return null;
        }
    },
    Collections.singletonMap(ManagedTask.TRANSACTION,
        ManagedTask.USE_TRANSACTION_OF_EXECUTION_THREAD),
    Callable.class);
// Can invoke interface methods from any thread...
tran.begin();
try {
    eventTrigger.call();
		} finally {
    tran.commit();
}

建立在發出呼叫的執行緒上延遲執行的環境定義作業

非同步 Bean 可讓您建立 WorkWithExecutionContext,這基本上是作業的可序列化環境定義 Proxy,作業稍後可以提交給工作管理員,以便在建立者執行緒的執行緒環境定義之下執行。 如果預期由發出呼叫的執行緒(工作管理員的 doWork 方法)來執行,在 Concurrency Utilities for Java EE 中,環境定義 Proxy 可以完成類似的行為。

非同步 Bean 範例:

WorkWithExecutionContext contextualWork = abWorkManager.create(
    new DBInsertWorkAB("BJI", "Bemidji Regional Airport"));
// Can run the contextual work on any thread...
abWorkManager.doWork(contextualWork);
DBInsertWorkAB work = (DBInsertWorkAB) contextualWork.getWork();
int numUpdates = work.getResult();

Concurrency Utilities for Java EE 範例:

Callable<Integer> contextualTask = contextService.createContextualProxy(
    new DBInsertTask("BRD", "Brainerd Lakes Regional Airport"),
    Callable.class);
// Can run the contextual proxy on any thread...
int numUpdates = contextualTask.call();

建立在聯合排存的執行緒上延遲執行的環境定義作業

非同步 Bean 可讓您建立 WorkWithExecutionContext,這基本上是作業的可序列化環境定義 Proxy,作業稍後可以提交給工作管理員,以便在建立者執行緒的執行緒環境定義之下執行。 如果預期由聯合排存的執行緒(工作管理員的 startWork 方法)來執行,在 Concurrency Utilities for Java EE 中,藉由向受管理執行程式提交作業的環境定義 Proxy,可以完成類似的行為,由於執行緒環境定義的擷取與傳播重複,這個效率比較差。

非同步 Bean 範例:

WorkWithExecutionContext contextualWork = abWorkManager.create(
    new DBInsertWorkAB("ELO", "Ely Municipal Airport"));
WorkItem workItem = abWorkManager.startWork(contextualWork);
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS)) {
    DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
    int numUpdates = work.getResult();
}

Concurrency Utilities for Java EE 範例:

Callable<Integer> contextualTask = contextService.createContextualProxy(
    new DBInsertTask("EVM", "Eveleth-Virginia Municipal Airport"),
    Callable.class);
Future<Integer> future = executor.submit(contextualTask);
int numUpdates = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);

環境定義作業的其他延遲執行選項

當提交要延遲執行的作業時,您可以選擇性地指派接聽器和啟動逾時值,並指示是否預期長時間執行。 在非同步 Bean 中,啟動逾時值只能作為一個參數來使用,但對 CommonJ 和 Concurrency Utilities for Java EE 而言,啟動逾時值可以在 WorkListenerManagedTaskListener 內實作。

非同步 Bean 範例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
WorkWithExecutionContext contextualWork = abWorkManager.create(
    new DBInsertWorkAB("FRM", "Fairmont Municipal Airport"));
WorkItem workItem = abWorkManager.startWork(
    contextualWork, startTimeout, new WorkListenerAB(), isLongRunning);
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, Integer.MAX_VALUE)) {
    DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
    int numUpdates = work.getResult();
}

Concurrency Utilities for Java EE 範例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
Callable<Integer> contextualTask = contextService.createContextualProxy(
    new DBInsertTask("FBL", "Faribault Municipal Airport"),
    Callable.class);
contextualTask = ManagedExecutors.managedTask(
    contextualTask,
    Collections.singletonMap(ManagedTask.LONGRUNNING_HINT,
        Boolean.toString(isLongRunning)),
    new TaskListener(startTimeout));
Future<Integer> future = executor.submit(contextualTask);
int numUpdates = future.get();

子系統監視

非同步 Bean 提供 SubsystemMonitorSubsystemMonitorManager 作為跨越各應用程式或其他構件來進行協調以監視可用性的機制。 Concurrency Utilities for Java EE 未提供任何對等的功能。 如果需要取代這個功能,您可以實作一個所有其他應用程式都能夠辨識的應用程式來扮演 SubsystemMonitorManager 的對等項目,以達到這個目的。

事件處理

非同步 Bean 可供登錄 AlarmManagerAsynchScopeEventSourceSubsystemMonitorWorkManagerWork 上所發生之各類事件的接聽器。 在 Concurrency Utilities for Java EE 中,這些事件大部分都沒有直接的對等項目。 應用程式必須自行負責實作本身用於事件和通知的機制。 在少數情況中,Concurrency Utilities for Java EE 會提供類似的功能。 在某些情況下,您可以利用在更精細的基礎上登錄(在提交作業之時)的 ManagedTaskListener 來取代 AlarmManagerEventsWorkEvents

呼叫環境定義 Proxy 期間所發生之失敗的事件處理

在非同步 Bean 中,當利用環境定義 Proxy 來呼叫作業,且作業引發已宣告的異常狀況時,不會向呼叫者回報這個異常狀況。 相反地,這個異常狀況會在 listenerExceptionThrow 事件上向 EventSourceEvents 接聽器報告。 在 Concurrency Utilities for Java EE 中,呼叫者可以擷取及處理這個異常狀況。

非同步 Bean 範例:

EventSource eventSource = abWorkManager.createEventSource();
eventSource.addListener(new DBWriterImpl());
eventSource.addListener(new EventSourceEvents() {
    public void listenerCountChanged(
        EventSource es, int oldCount, int newCount) {}

    public void listenerExceptionThrown(
        EventSource es, Object listener,
        String methodName, Throwable exception) {
        listenerException.set(exception);
    }

    public void unexpectedException(
        EventSource es, Object runnable, Throwable exception) {}
});
DBWriter dbWriter = (DBWriter) eventSource.getEventTrigger(DBWriter.class);
// Can invoke interface methods from any thread...
try {
    dbWriter.exec(
        "INSERT INTO AIRPORTS VALUES(?,?)", "KAUM", "Austin Municipal Airport");
} catch (Exception x) {
    // expecting this to fail
}
Throwable exception = listenerException.get();

Concurrency Utilities for Java EE 範例:

DBWriter dbWriter = contextService.createContextualProxy(
    new DBWriterImpl(), DBWriter.class);
// Can invoke interface methods from any thread...
SQLException exception = null;
try {
    dbWriter.exec(
        "INSERT INTO AIRPORTS VALUES(?,?)", "KSBU", "Blue Earth Municipal Airport");
} catch (SQLException x) {
    exception = x;
}

指出主題類型的圖示 參照主題



時間戳記圖示 前次更新: last_date
http://www14.software.ibm.com/webapp/wsbroker/redirect?version=cord&product=was-nd-mp&topic=rasb_migrate_to_eeconcurrency
檔名:rasb_migrate_to_eeconcurrency.html