實作 Concurrency Utilities for Java EE 的範例 API 作業
您可以移轉使用「非同步 Bean」和 CommonJ Timer and Work Manager API 的應用程式,以使用 Concurrency Utilities for Java™ EE。
Concurrency Utilities for Java EE 提供標準化方法,供您在應用程式伺服器中執行並行作業。它取代了「非同步 Bean」和 CommonJ Timer and Work Manager 的程式設計模型。
程式碼範例中使用的資源
這個頁面中的程式碼範例假設應用程式已注入或查閱下列資源:
@Resource(lookup = "wm/default")
private com.ibm.websphere.asynchbeans.WorkManager abWorkManager;
@Resource(lookup = "wm/default")
private commonj.work.WorkManager cjWorkManager;
@Resource
private ContextService contextService;
@Resource(name = "java:app/env/jdbc/dsRef")
private DataSource dataSource;
@Resource
private ManagedScheduledExecutorService executor;
@Resource
private ManagedThreadFactory threadFactory;
@Resource(name = "java:comp/env/tm/default", lookup = "tm/default", shareable = false)
private TimerManager timerManager;
@Resource
private UserTransaction tran;
基本作業實作
本節提供一些簡單的作業實作範例,其他範例會在本文件的其餘地方使用它們。對於排定在未來執行的作業,「非同步 Bean」需要個別的 AlarmListener 介面。對於排定在未來執行的作業,CommonJ 需要個別的 TimerListener 介面。Concurrency Utilities for Java EE 作業可以是 Runnable 或 Callable,且不論要提交作業以立即執行,或是將作業排定在未來執行,任一介面都容許使用。在某些情況下,有可能將「非同步 Bean」或「CommonJ 工作」當成 Runnable 提交給受管理執行程式,而不進行任何變更。工作的 release 方法會以受管理執行程式的功能取代,以取消和中斷正在執行的執行緒。工作的 isDaemon 方法則以 LONGRUNNING_HINT 執行內容取代。
「非同步 Bean」和 CommonJ 兩者的 Work 作業範例,此範例是尋找下一個質數:
public class PrimeFinderWork implements
com.ibm.websphere.asynchbeans.Work, commonj.work.Work {
private long num;
private volatile boolean released;
private long result;
public PrimeFinderWork(long startingValue) {
num = startingValue;
}
public boolean isDaemon() {
return false;
}
public void release() {
released = true;
}
public void run() {
while (!isPrime(num))
if (released || Thread.currentThread().isInterrupted())
throw new RuntimeException(new InterruptedException());
else
num++;
result = num++;
}
public long getPrimeNumber() {
if (result > 0)
return result;
else
throw new IllegalStateException();
}
}
「非同步 Bean」的 AlarmListener 作業範例,此範例是尋找下一個質數:
public class PrimeFinderAlarmListener implements AlarmListener {
private volatile boolean aborted;
private int count;
private long num;
private long result;
public PrimeFinderAlarmListener(long startingValue) {
num = startingValue;
}
public void abort() {
aborted = true;
}
public void fired(Alarm alarm) {
while (!isPrime(num))
if (aborted || Thread.currentThread().isInterrupted())
throw new RuntimeException(new InterruptedException());
else
num++;
result = num++;
// optionally reschedule:
Object delays = alarm.getContext();
if (delays instanceof Integer)
alarm.reset((Integer) delays);
else if (delays instanceof int[] && count < ((int[]) delays).length)
alarm.reset(((int[]) delays)[count++]);
}
public long getPrimeNumber() {
if (result > 0)
return result;
else
throw new IllegalStateException();
}
「非同步 Bean」的 TimerListener 作業範例,此範例是尋找下一個質數:
public class PrimeFinderTimerListener implements CancelTimerListener, TimerListener {
private volatile boolean aborted;
private int count;
private final long[] delays;
private long num;
private long result;
public PrimeFinderTimerListener(long startingValue, long... delays) {
num = startingValue;
this.delays = delays;
}
public void timerCancel(Timer timer) {
aborted = true;
}
public void timerExpired(Timer timer) {
while (!isPrime(num))
if (aborted || Thread.currentThread().isInterrupted())
throw new RuntimeException(new InterruptedException());
else
num++;
result = num++;
// optionally reschedule:
if (count < delays.length)
try {
TimerManager timerManager = (TimerManager) new InitialContext().lookup(
"java:comp/env/tm/default");
timerManager.schedule(this, delays[count++]);
} catch (NamingException x) {
throw new RuntimeException(x);
}
}
public long getPrimeNumber() {
if (result > 0)
return result;
else
throw new IllegalStateException();
}
}
Concurrency Utilities for Java EE 的 Runnable 作業範例,此範例是尋找下一個質數:
public class PrimeFinderRunnable implements Runnable {
private long num;
private long result;
public PrimeFinderRunnable(long startingValue) {
num = startingValue;
}
public void run() {
while (!isPrime(num))
if (Thread.currentThread().isInterrupted())
throw new RuntimeException(new InterruptedException());
else
num++;
result = num++;
}
public long getPrimeNumber() {
if (result > 0)
return result;
else
throw new IllegalStateException();
}
}
Concurrency Utilities for Java EE 的 Callable 作業範例,此範例是尋找下一個質數:
public class PrimeFinderTask implements Callable<Long> {
private long num;
public PrimeFinderTask(long startingValue) {
num = startingValue;
}
public Long call() throws InterruptedException {
while (!isPrime(num))
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
else
num++;
return num++;
}
}
「非同步 Bean」的 Work 作業範例,此範例是執行基本的資料庫插入:
public class DBInsertWorkAB implements Work, Serializable {
private static final long serialVersionUID = 2606824039439594442L;
private transient Thread executionThread;
private final String code;
private final String name;
private boolean released;
private volatile int result = -1;
public DBInsertWorkAB(String code, String name) {
this.code = code;
this.name = name;
}
public int getResult() {
return result;
}
public synchronized void release() {
released = true;
if (executionThread != null)
executionThread.interrupt();
}
public void run() {
synchronized (this) {
if (released)
throw new RuntimeException("Work was canceled");
executionThread = Thread.currentThread();
}
try {
DataSource ds = (DataSource) new InitialContext().lookup(
"java:app/env/jdbc/dsRef");
Connection con = ds.getConnection();
try {
PreparedStatement stmt = con.prepareStatement(
"INSERT INTO AIRPORTS VALUES(?,?)");
stmt.setString(1, code);
stmt.setString(2, name);
result = stmt.executeUpdate();
} finally {
con.close();
}
} catch (NamingException x) {
throw new RuntimeException(x);
} catch (SQLException x) {
throw new RuntimeException(x);
} finally {
synchronized (this) {
executionThread = null;
}
}
}
}
「非同步 Bean」的 AlarmListener 作業範例,此範例是執行基本的資料庫插入:
public class DBInsertAlarmListener implements AlarmListener {
private volatile int result = -1;
public int getResult() {
return result;
}
public void fired(Alarm alarm) {
String[] alarmContext = (String[]) alarm.getContext();
try {
DataSource ds = (DataSource) new InitialContext().lookup(
"java:app/env/jdbc/dsRef");
Connection con = ds.getConnection();
try {
PreparedStatement stmt = con.prepareStatement(
"INSERT INTO AIRPORTS VALUES(?,?)");
stmt.setString(1, alarmContext[0]);
stmt.setString(2, alarmContext[1]);
result = stmt.executeUpdate();
} finally {
con.close();
}
} catch (NamingException x) {
throw new RuntimeException(x);
} catch (SQLException x) {
throw new RuntimeException(x);
}
}
}
CommonJ 的 Work 作業範例,此範例是執行基本的資料庫插入:
public class DBInsertWorkCJ implements Work, Serializable {
private static final long serialVersionUID = -8801347489043041978L;
private transient Thread executionThread;
private final String code;
private final String name;
private boolean isDaemon;
private boolean released;
private volatile int result = -1;
public DBInsertWorkCJ(String code, String name) {
this.code = code;
this.name = name;
}
public int getResult() {
return result;
}
public boolean isDaemon() {
return isDaemon;
}
public synchronized void release() {
released = true;
if (executionThread != null)
executionThread.interrupt();
}
public void run() {
synchronized (this) {
if (released)
throw new RuntimeException("Work was canceled");
executionThread = Thread.currentThread();
}
try {
DataSource ds = (DataSource) new InitialContext().lookup(
"java:app/env/jdbc/dsRef");
Connection con = ds.getConnection();
try {
PreparedStatement stmt = con.prepareStatement(
"INSERT INTO AIRPORTS VALUES(?,?)");
stmt.setString(1, code);
stmt.setString(2, name);
result = stmt.executeUpdate();
} finally {
con.close();
}
} catch (NamingException x) {
throw new RuntimeException(x);
} catch (SQLException x) {
throw new RuntimeException(x);
} finally {
synchronized (this) {
executionThread = null;
}
}
}
CommonJ 的 TimerListener 作業範例,此範例是執行基本的資料庫插入:
public class DBInsertTimerListener implements TimerListener {
private volatile int result = -1;
private final String code;
private final String name;
public DBInsertTimerListener(String code, String name) {
this.code = code;
this.name = name;
}
public int getResult() {
return result;
}
public void timerExpired(Timer timer) {
try {
DataSource ds = (DataSource) new InitialContext().lookup(
"java:app/env/jdbc/dsRef");
Connection con = ds.getConnection();
try {
PreparedStatement stmt = con.prepareStatement(
"INSERT INTO AIRPORTS VALUES(?,?)");
stmt.setString(1, code);
stmt.setString(2, name);
result = stmt.executeUpdate();
} finally {
con.close();
}
} catch (NamingException x) {
throw new RuntimeException(x);
} catch (SQLException x) {
throw new RuntimeException(x);
}
}
}
Concurrency Utilities for Java EE 的 Callable 作業範例,此範例是執行基本的資料庫插入:
public class DBInsertTask implements Callable<Integer>, Serializable {
private static final long serialVersionUID = 5556464104788801400L;
private final String code;
private final String name;
public DBInsertTask(String code, String name) {
this.code = code;
this.name = name;
}
public Integer call() throws NamingException, SQLException {
DataSource ds = (DataSource) new InitialContext().lookup(
"java:app/env/jdbc/dsRef");
Connection con = ds.getConnection();
try {
PreparedStatement stmt = con.prepareStatement(
"INSERT INTO AIRPORTS VALUES(?,?)");
stmt.setString(1, code);
stmt.setString(2, name);
return stmt.executeUpdate();
} finally {
con.close();
}
}
}
提交作業
這三種程式設計模型都會提供方法,來提交基本作業在排入儲存區的執行緒中執行,並取得結果。
「非同步 Bean」範例:
WorkItem workItem = abWorkManager.startWork(
new DBInsertWorkAB("DLH", "Duluth International Airport"));
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS)) {
DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
int numUpdates = work.getResult();
}
CommonJ 範例:
WorkItem workItem = cjWorkManager.schedule(
new DBInsertWorkCJ("HIB", "Chisholm-Hibbing Airport"));
if (cjWorkManager.waitForAll(Collections.singletonList(workItem), TIMEOUT_MS)) {
DBInsertWorkCJ work = (DBInsertWorkCJ) workItem.getResult();
int numUpdates = work.getResult();
}
Concurrency Utilities for Java EE 範例:
Future<Integer> future = executor.submit(
new DBInsertTask("INL", "Falls International Airport"));
int numUpdates = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
提交作業時的其他選項
當提交作業時,您可以選擇性地指派接聽器和啟動逾時,並指出預計該作業是否要長期執行。在「非同步 Bean」中,啟動逾時只能以參數形式提供,但是對於 CommonJ 和 Concurrency Utilities for Java EE,啟動逾時可以放在 WorkListener 或 ManagedTaskListener 內實作。
「非同步 Bean」範例:
long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
WorkItem workItem = abWorkManager.startWork(
new DBInsertWorkAB("SGS", "South Saint Paul Municipal Airport"),
startTimeout,
new WorkListenerAB(),
isLongRunning);
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, Integer.MAX_VALUE)) {
DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
int numUpdates = work.getResult();
}
CommonJ 範例:
long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
DBInsertWorkCJ work = new DBInsertWorkCJ("STP", "Saint Paul Downtown Airport");
work.setDaemon(isLongRunning);
WorkItem workItem = cjWorkManager.schedule(
work, new WorkListenerCJ(work, startTimeout));
Collection<WorkItem> items = Collections.singleton(workItem);
if (cjWorkManager.waitForAll(items, WorkManager.INDEFINITE)) {
work = (DBInsertWorkCJ) workItem.getResult();
int numUpdates = work.getResult();
}
Concurrency Utilities for Java EE 範例:
long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
Callable<Integer> contextualTask = ManagedExecutors.managedTask(
new DBInsertTask("LVN", "Airlake Airport"),
Collections.singletonMap(ManagedTask.LONGRUNNING_HINT,
Boolean.toString(isLongRunning)),
new TaskListener(startTimeout));
Future<Integer> future = executor.submit(contextualTask);
int numUpdates = future.get();
等待作業群組完成
這三種程式設計模型都會提供方法,來等待作業群組的完成。下列範例指定要等待的時間量上限 (因為有可能無限期地等待),或在 Concurrency Utilities for Java EE 中,藉由對各個不同的 Future 循序呼叫 Get,以提高精度。
ArrayList<WorkItem> items = new ArrayList<WorkItem>(3);
items.add(abWorkManager.startWork(
new DBInsertWorkAB("COQ", "Cloquet/Carlton County Airport")));
items.add(abWorkManager.startWork(
new DBInsertWorkAB("CQM", "Cook Municipal Airport")));
items.add(abWorkManager.startWork(
new DBInsertWorkAB("CKN", "Crookston Municipal Airport")));
boolean allCompleted = abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS);
int numUpdates = 0;
for (WorkItem workItem : items) {
if (workItem.getStatus() == WorkEvent.WORK_COMPLETED) {
DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
numUpdates += work.getResult();
} else
((Work) workItem.getEventTrigger(Work.class)).release();
}
CommonJ 範例:
List<DBInsertWorkCJ> workList = Arrays.asList(
new DBInsertWorkCJ("DTL", "Detroit Lakes Airport"),
new DBInsertWorkCJ("TOB", "Dodge Center Airport"),
new DBInsertWorkCJ("DYT", "Sky Harbor Airport"));
List<WorkItem> items = new ArrayList<WorkItem>(workList.size());
for (DBInsertWorkCJ work : workList)
items.add(cjWorkManager.schedule(work));
boolean allCompleted = cjWorkManager.waitForAll(items, TIMEOUT_MS);
int numUpdates = 0;
for (int i = 0; i < items.size(); i++) {
WorkItem workItem = items.get(i);
if (workItem.getStatus() == WorkEvent.WORK_COMPLETED) {
DBInsertWorkCJ work = (DBInsertWorkCJ) workItem.getResult();
numUpdates += work.getResult();
} else
workList.get(i).release();
}
Concurrency Utilities for Java EE 範例:
List<DBInsertTask> tasks = Arrays.asList(
new DBInsertTask("CFE", "Buffalo Municipal Airport"),
new DBInsertTask("CHU", "Caledonia-Houston County Airport"),
new DBInsertTask("CBG", "Cambridge Municipal Airport"));
int numUpdates = 0;
List<Future<Integer>> futures = executor.invokeAll(tasks, TIMEOUT_MS, TimeUnit.MILLISECONDS);
for (Future<Integer> future : futures)
numUpdates += future.get();
等待群組內的單一作業完成
這三種程式設計模型都會提供方法,來等待群組內單一作業的完成。下列範例指定要等待的時間量上限,但是也有可能無限期地等待。
「非同步 Bean」範例:
ArrayList<WorkItem> items = new ArrayList<WorkItem>(3);
items.add(abWorkManager.startWork(new PrimeFinderWork(20)));
items.add(abWorkManager.startWork(new PrimeFinderWork(50)));
items.add(abWorkManager.startWork(new PrimeFinderWork(80)));
boolean anyCompleted = abWorkManager.join(items, WorkManager.JOIN_OR, TIMEOUT_MS);
long prime = -1;
for (WorkItem workItem : items) {
if (workItem.getStatus() == WorkEvent.WORK_COMPLETED) {
PrimeFinderWork work = (PrimeFinderWork) workItem.getResult();
prime = work.getPrimeNumber();
} else
((Work) workItem.getEventTrigger(Work.class)).release();
}
CommonJ 範例:
List<PrimeFinderWork> workList = Arrays.asList(
new PrimeFinderWork(20),
new PrimeFinderWork(50),
new PrimeFinderWork(80));
List<WorkItem> items = new ArrayList<WorkItem>(workList.size());
for (PrimeFinderWork work : workList)
items.add(cjWorkManager.schedule(work));
Collection<WorkItem> completedItems = cjWorkManager.waitForAny(items, TIMEOUT_MS);
long prime = -1;
for (int i = 0; i < items.size(); i++) {
WorkItem workItem = items.get(i);
if (completedItems.contains(workItem)) {
PrimeFinderWork work = (PrimeFinderWork) workItem.getResult();
prime = work.getPrimeNumber();
} else
workList.get(i).release();
}
Concurrency Utilities for Java EE 範例:
List<PrimeFinderTask> tasks = Arrays.asList(
new PrimeFinderTask(20),
new PrimeFinderTask(50),
new PrimeFinderTask(80));
long prime = executor.invokeAny(tasks, TIMEOUT_MS, TimeUnit.MILLISECONDS);
當作業失敗時,取得原因異常狀況
這三種程式設計模型都會提供方法,以便在作業執行失敗時取得原因異常狀況。這可以使用接聽器來完成(可在這個頁面後續中找到範例),或是在取得作業結果時,從 WorkItem 或 Future 取得。會引發 WorkException 或 ExecutionException,其中將原始異常狀況當作原因。
「非同步 Bean」範例:
boolean isLongRunning = false;
WorkItem workItem = abWorkManager.startWork(
new DBInsertWorkAB("KADC", "Wadena Municipal Airport"),
isLongRunning);
Throwable exception = null;
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS))
try {
DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
int numUpdates = work.getResult();
} catch (WorkException x) {
exception = x.getCause();
}
CommonJ 範例:
boolean isLongRunning = false;
DBInsertWorkCJ work = new DBInsertWorkCJ("KBDH", "Willmar Municipal Airport");
work.setDaemon(isLongRunning);
WorkItem workItem = cjWorkManager.schedule(work);
Throwable exception = null;
if (cjWorkManager.waitForAll(Collections.singleton(workItem), TIMEOUT_MS))
try {
work = (DBInsertWorkCJ) workItem.getResult();
int numUpdates = work.getResult();
} catch (WorkException x) {
exception = x.getCause();
}
Concurrency Utilities for Java EE 範例:
boolean isLongRunning = false;
Callable<Integer> task = ManagedExecutors.managedTask(
new DBInsertTask("KACQ", "Waseca Municipal Airport"),
Collections.singletonMap(ManagedTask.LONGRUNNING_HINT,
Boolean.toString(isLongRunning)),
null);
Future<Integer> future = executor.submit(task);
Throwable exception = null;
try {
int numUpdates = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (ExecutionException x) {
exception = x.getCause();
}
排定在間隔之後執行一次性作業
這三種程式設計模型都會提供方法,來排定基本作業在未來的某個時間點,於排入儲存區的執行緒中執行,並取得結果。
「非同步 Bean」範例:
AsynchScope asynchScope = abWorkManager.findOrCreateAsynchScope("MyScope");
AlarmManager alarmManager = asynchScope.getAlarmManager();
Alarm alarm = alarmManager.create(
new DBInsertAlarmListener(),
new String[] { "MSP", "Minneapolis-Saint Paul International Airport"},
(int) TimeUnit.SECONDS.toMillis(1));
DBInsertAlarmListener alarmListener = (DBInsertAlarmListener) alarm.getAlarmListener();
// Poll for result to appear
for (long start = System.nanoTime();
alarmListener.getResult() < 0 && System.nanoTime() - start < TIMEOUT_NS;
Thread.sleep(200)) ;
int numUpdates = alarmListener.getResult();
CommonJ 範例:
Timer timer = timerManager.schedule(
new DBInsertTimerListener("STC", "Saint Cloud Regional Airport"),
TimeUnit.SECONDS.toMillis(1));
DBInsertTimerListener timerListener = (DBInsertTimerListener) timer.getTimerListener();
// Poll for result to appear
for (long start = System.nanoTime();
timerListener.getResult() < 0 && System.nanoTime() - start < TIMEOUT_NS;
Thread.sleep(200)) ;
int numUpdates = timerListener.getResult();
Concurrency Utilities for Java EE 範例:
ScheduledFuture<Integer> future = executor.schedule(
new DBInsertTask("RST", "Rochester International Airport"),
1,
TimeUnit.SECONDS);
int numUpdates = future.get(TIMEOUT_NS, TimeUnit.NANOSECONDS);
排定重複性作業以固定頻率執行,並查詢間隔直到下次執行為止
CommonJ 和 Concurrency Utilities for Java EE 程式設計模型提供方法,來排定重複性作業以固定頻率執行(例如,在每個鐘頭開始時)。不保證即時排程。作業可以在此時間不久之後的任何時間點開始。CommonJ 和 Concurrency Utilities for Java EE 程式設計模型也會提供簡便方法來計算延遲,直到下次執行為止。
CommonJ 範例:
Timer timer = timerManager.scheduleAtFixedRate(
new PrimeFinderTimerListener(120),
TimeUnit.MINUTES.toMillis(90),
TimeUnit.MINUTES.toMillis(30));
long nextExecTime = timer.getScheduledExecutionTime();
long delay = TimeUnit.MILLISECONDS.toSeconds(
nextExecTime - System.currentTimeMillis());
Concurrency Utilities for Java EE 範例:
ScheduledFuture<?> future = executor.scheduleAtFixedRate(
new PrimeFinderRunnable(120), 90, 30, TimeUnit.MINUTES);
long delay = future.getDelay(TimeUnit.SECONDS);
排定重複性作業以固定的執行延遲間隔執行,並取消作業
CommonJ 和 Concurrency Utilities for Java EE 程式設計模型會提供方法,來排定重複性作業以固定間隔(從前次執行結束到下次執行開始之間)執行。不保證即時排程。作業可以在此時間不久之後的任何時間點開始。「非同步 Bean」會在 Alarm 中提供重設方法,同樣可達到這種行為。這三種程式設計模型都會提供方法,來取消排定作業進一步執行,而不啟動。
「非同步 Bean」範例:
AsynchScope asynchScope = abWorkManager.findOrCreateAsynchScope("MyScope");
AlarmManager alarmManager = asynchScope.getAlarmManager();
Alarm alarm = alarmManager.create(
new PrimeFinderAlarmListener(90), 50, 10);
// ... eventually cancel the alarm
alarm.cancel();
CommonJ 範例:
Timer timer = timerManager.schedule(
new PrimeFinderTimerListener(90), 50, 50);
// ... eventually cancel the timer
timer.cancel();
Concurrency Utilities for Java EE 範例:
ScheduledFuture<?> future = executor.scheduleWithFixedDelay(
new PrimeFinderRunnable(90), 50, 50, TimeUnit.MILLISECONDS);
// ... eventually cancel the task
future.cancel(false);
排定重複性作業以不同的間隔執行
在這三種程式設計模型中,可以計算每一次執行的間隔,使重複性作業下次在該間隔之後重複執行。「非同步 Bean」會針對警示提供重設方法。Concurrency Utilities for Java EE 可讓您外掛觸發程式,來計算下次執行時間。CommonJ 雖然不提供上述作法,但在 CommonJ 和其他程式設計模型中,可在前次執行完成時重新排定作業。在下列範例中,是將作業確實排定成執行四次,且每一次執行之前的延遲時間不同。用來重設、重新排程或計算下次執行的程式碼,可在本區段包含的「觸發程式」內找到,以及在基本作業實作區段之下的 AlarmListener 和 TimerListener 實作內找到。
「非同步 Bean」範例:
int initialDelay = 50;
int[] subsequentDelays = new int[] { 40, 80, 70 };
AsynchScope asynchScope = abWorkManager.findOrCreateAsynchScope("MyScope");
AlarmManager alarmManager = asynchScope.getAlarmManager();
Alarm alarm = alarmManager.create(
new PrimeFinderAlarmListener(60),
subsequentDelays,
initialDelay);
Thread.sleep(5000);
PrimeFinderAlarmListener alarmListener =
(PrimeFinderAlarmListener) alarm.getAlarmListener();
long prime = alarmListener.getPrimeNumber();
CommonJ 範例:
long initialDelay = 50;
long [] subsequentDelays = new long[] { 40, 80, 70 };
Timer timer = timerManager.schedule(
new PrimeFinderTimerListener(60, subsequentDelays),
initialDelay);
Thread.sleep(5000);
PrimeFinderTimerListener timerListener = (PrimeFinderTimerListener) timer.getTimerListener();
long prime = timerListener.getPrimeNumber();
Concurrency Utilities for Java EE 範例:
ScheduledFuture<Long> future = executor.schedule(
new PrimeFinderTask(60),
new DelayTrigger(50, 40, 80, 70));
Thread.sleep(5000);
long prime = future.get();
public class DelayTrigger implements Trigger {
private int count;
private long[] delays;
volatile boolean isSuspended;
public DelayTrigger(long... delays) {
this.delays = delays;
}
public Date getNextRunTime(LastExecution previousExecution, Date taskScheduledTime) {
if (delays.length > count)
return new Date(System.currentTimeMillis() + delays[count++]);
else
return null;
}
public boolean skipRun(LastExecution previousExecution, Date scheduledRunTime) {
return isSuspended;
}
}
暫停和回復作業的執行
CommonJ TimerManager 會提供介面,來暫停和回復作業的執行。Concurrency Utilities for Java EE 則透過可外掛「觸發程式」機制的 skipRun 方法,更精細地提供這項功能。單一「觸發程式」實例可提供給任意數量的作業,但前提是必須實作「觸發程式」來支援此作法。在下列範例中,是將「觸發程式」撰寫成用來排定單一作業。
CommonJ 範例:
Timer timer = timerManager.schedule(
new PrimeFinderTimerListener(100),
new Date(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(5)));
timerManager.suspend();
// ... resume at a later point
if (timerManager.isSuspending() || timerManager.isSuspended())
timerManager.resume();
Concurrency Utilities for Java EE 範例:
DelayTrigger trigger = new DelayTrigger(
System.currentTimeMillis() + TimeUnit.DAYS.toMillis(5));
ScheduledFuture<Long> future = executor.schedule(
new PrimeFinderTask(100), trigger);
trigger.isSuspended = true;
// ... resume at a later point
if (trigger.isSuspended)
trigger.isSuspended = false;
停止作業進一步執行
「非同步 Bean」提供方法來銷毀 AsynchScope,這會取消 AlarmManagers 在該範圍中所建立的所有警示。CommonJ TimerManager 提供介面,來阻止進一步啟動執行,並等待所有正在執行的作業停止。這是可行的,因為每一次查閱 TimerManager 會產生一個新實例,並可單獨停止。在 Concurrency Utilities for Java EE 中,查閱之間會共用 ManagedScheduledExecutorService,並且(依照規格)不接受 shutdown、isTerminated 和 awaitTermination 等之類的生命週期作業。不過,使用 Concurrency Utilities for Java EE 時,您可以提供 ManagedThreadFactory 給未受管理的 ScheduledExecutorService,來達到類似的行為。
「非同步 Bean」範例:
alarmManager.create(
new PrimeFinderAlarmListener(100),
null,
(int) TimeUnit.HOURS.toMillis(1));
alarmManager.create(
new PrimeFinderAlarmListener(200),
null,
(int) TimeUnit.HOURS.toMillis(2));
// ... eventually destroy the asynch scope to cancel all alarms
asynchScope.destroy();
CommonJ 範例:
TimerManager timerManager = (TimerManager) new InitialContext().lookup(
"java:comp/env/tm/default");
timerManager.schedule(
new PrimeFinderTimerListener(100),
new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)));
timerManager.schedule(
new PrimeFinderTimerListener(200),
new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(2)));
// ... eventually stop the timer manager
timerManager.stop();
if (!timerManager.isStopped())
timerManager.waitForStop(TIMEOUT_MS);
Concurrency Utilities for Java EE 範例:
ScheduledExecutorService executor =
Executors.newScheduledThreadPool(1, threadFactory);
executor.schedule(new PrimeFinderTask(100), 1, TimeUnit.HOURS);
executor.schedule(new PrimeFinderTask(200), 2, TimeUnit.HOURS);
// .. eventually shut down the executor
executor.shutdown();
if (!executor.isTerminated())
executor.awaitTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS);
建構環境定義 Proxy
「非同步 Bean」和 Concurrency Utilities for Java EE 程式設計模型可讓您建構環境定義 Proxy。執行緒環境定義擷取自建立環境定義 Proxy 的執行緒,並儲存在其內,當對 Proxy 呼叫介面方法時,會將它自動套用至執行的執行緒,之後並從執行的執行緒移除。「非同步 Bean」會透過 EventSource 來提供此項。Concurrency Utilities for Java EE 則是透過 ContextService 來提供此項。
「非同步 Bean」範例:
EventSource eventSource = abWorkManager.createEventSource();
eventSource.addListener(new DBWriterImpl());
DBWriter dbWriter = (DBWriter) eventSource.getEventTrigger(DBWriter.class);
// Can invoke interface methods from any thread...
int numUpdates = dbWriter.exec(
"INSERT INTO AIRPORTS VALUES(?,?)", "AIT", "Aitkin Municipal Airport");
Concurrency Utilities for Java EE 範例:
DBWriter dbWriter = contextService.createContextualProxy(
new DBWriterImpl(), DBWriter.class);
// Can invoke interface methods from any thread...
int numUpdates = dbWriter.exec(
"INSERT INTO AIRPORTS VALUES(?,?)", "AXN", "Alexandria Municipal Airport");
針對在執行之執行緒交易中執行的多個實例,建構環境定義 Proxy
「非同步 Bean」和 Concurrency Utilities for Java EE 都可讓您指定是否在執行之執行緒的交易中執行環境定義 Proxy 的介面方法,或是否在方法期間,暫停現行交易並套用全新的交易環境定義。「非同步 Bean」的另一項功能是容許單一 Proxy 觸發多個接聽器實例。不過,Concurrency Utilities for Java EE 缺乏這項功能,但是可以撰寫一個封套接聽器,並委派給多個實例,來達到類似的行為。在下列範例中,我們將呼叫單一 Proxy 來觸發多個接聽器實例,且全都在呼叫端執行緒的交易之下進行。
「非同步 Bean」範例:
boolean runInSameTran = true;
EventSource eventSource = abWorkManager.createEventSource();
eventSource.addListener(new DBInsertTask("MKT", "Mankato Regional Airport"));
eventSource.addListener(new DBInsertTask("ULM", "New Ulm Municipal Airport"));
eventSource.addListener(new DBInsertTask("OWA", "Owatonna Degner Regional Airport"));
Callable<?> eventTrigger = (Callable<?>) eventSource.getEventTrigger(
Callable.class, runInSameTran);
// Can invoke interface methods from any thread...
tran.begin();
try {
eventTrigger.call();
} finally {
tran.commit();
}
Concurrency Utilities for Java EE 範例:
Callable<?> eventTrigger = contextService.createContextualProxy(
new Callable<Void>() {
@Override
public Void call() throws Exception {
new DBInsertTask("FFM", "Fergus Falls Municipal Airport").call();
new DBInsertTask("ONA", "Winona Municipal Airport").call();
new DBInsertTask("OTG", "Worthington Municipal Airport").call();
return null;
}
},
Collections.singletonMap(ManagedTask.TRANSACTION,
ManagedTask.USE_TRANSACTION_OF_EXECUTION_THREAD),
Callable.class);
// Can invoke interface methods from any thread...
tran.begin();
try {
eventTrigger.call();
} finally {
tran.commit();
}
建立環境定義作業,以便在呼叫端執行緒上延遲執行
「非同步 Bean」可讓您建立 WorkWithExecutionContext,這其實是作業的可序列化環境定義 Proxy,可在之後提交給 WorkManager,以便在建立端執行緒的執行緒環境定義之下執行。如果打算在呼叫端執行緒 (WorkManager 的 doWork 方法)上執行,環境定義 Proxy 可在 Concurrency Utilities for Java EE 中達到類似的行為。
「非同步 Bean」範例:
WorkWithExecutionContext contextualWork = abWorkManager.create(
new DBInsertWorkAB("BJI", "Bemidji Regional Airport"));
// Can run the contextual work on any thread...
abWorkManager.doWork(contextualWork);
DBInsertWorkAB work = (DBInsertWorkAB) contextualWork.getWork();
int numUpdates = work.getResult();
Concurrency Utilities for Java EE 範例:
Callable<Integer> contextualTask = contextService.createContextualProxy(
new DBInsertTask("BRD", "Brainerd Lakes Regional Airport"),
Callable.class);
// Can run the contextual proxy on any thread...
int numUpdates = contextualTask.call();
建立環境定義作業,以便在排入儲存區的執行緒上延遲執行
「非同步 Bean」可讓您建立 WorkWithExecutionContext,這其實是作業的可序列化環境定義 Proxy,可在之後提交給 WorkManager,以便在建立端執行緒的執行緒環境定義之下執行。如果打算在排入儲存區的執行緒 (WorkManager 的 startWork 方法)上執行,在 Concurrency Utilities for Java EE 中,可藉由提交作業的環境定義 Proxy 給受管理執行程式,來達到類似的行為,但由於執行緒環境定義的擷取與傳播重複,其效率較差。
「非同步 Bean」範例:
WorkWithExecutionContext contextualWork = abWorkManager.create(
new DBInsertWorkAB("ELO", "Ely Municipal Airport"));
WorkItem workItem = abWorkManager.startWork(contextualWork);
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS)) {
DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
int numUpdates = work.getResult();
}
Concurrency Utilities for Java EE 範例:
Callable<Integer> contextualTask = contextService.createContextualProxy(
new DBInsertTask("EVM", "Eveleth-Virginia Municipal Airport"),
Callable.class);
Future<Integer> future = executor.submit(contextualTask);
int numUpdates = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
可讓環境定義作業延遲執行的其他選項
當提交作業以延遲執行時,您可以選擇性地指派接聽器和啟動逾時,並指出預計該作業是否要長期執行。在「非同步 Bean」中,啟動逾時只能以參數形式提供,但是對於 CommonJ 和 Concurrency Utilities for Java EE,啟動逾時可以放在 WorkListener 或 ManagedTaskListener 內實作。
「非同步 Bean」範例:
long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
WorkWithExecutionContext contextualWork = abWorkManager.create(
new DBInsertWorkAB("FRM", "Fairmont Municipal Airport"));
WorkItem workItem = abWorkManager.startWork(
contextualWork, startTimeout, new WorkListenerAB(), isLongRunning);
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, Integer.MAX_VALUE)) {
DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
int numUpdates = work.getResult();
}
Concurrency Utilities for Java EE 範例:
long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
Callable<Integer> contextualTask = contextService.createContextualProxy(
new DBInsertTask("FBL", "Faribault Municipal Airport"),
Callable.class);
contextualTask = ManagedExecutors.managedTask(
contextualTask,
Collections.singletonMap(ManagedTask.LONGRUNNING_HINT,
Boolean.toString(isLongRunning)),
new TaskListener(startTimeout));
Future<Integer> future = executor.submit(contextualTask);
int numUpdates = future.get();
子系統監視
「非同步 Bean」提供 SubsystemMonitor 和 SubsystemMonitorManager,作為應用程式或其他構件之間的協調機制,來監視可用性。Concurrency Utilities for Java EE 不提供任何對等項目。如果需要取代這項功能,您可以實作一個其他所有應用程式都知悉的應用程式,以充當 SubsystemMonitorManager 的對等項目。
事件處理
「非同步 Bean」可讓您能夠針對 AlarmManager、AsynchScope、EventSource、SubsystemMonitor、WorkManager 和 Work 上所發生的各種類型的事件,登錄接聽器。這些事件在 Concurrency Utilities for Java EE 中多半沒有任何直接的對等項目。這將交由應用程式負責,以實作其本身的事件和通知機制。在很多情況下,Concurrency Utilities for Java EE 會提供類似的功能。在某些情況下,您也許可以使用更精細登錄的 ManagedTaskListener(當提交作業時),來取代 AlarmManagerEvents 和 WorkEvents。
在呼叫環境定義 Proxy 期間發生失敗時的事件處理
在「非同步 Bean」中,當使用環境定義 Proxy 來呼叫作業,且作業引發所宣告的異常狀況時,不會將異常狀況回報給呼叫端。反而會將異常狀況提報在 listenerExceptionThrown 事件中,並傳給 EventSourceEvents 接聽器。在 Concurrency Utilities for Java EE 中,呼叫端可以捕捉異常狀況,並加以處理。
「非同步 Bean」範例:
EventSource eventSource = abWorkManager.createEventSource();
eventSource.addListener(new DBWriterImpl());
eventSource.addListener(new EventSourceEvents() {
public void listenerCountChanged(
EventSource es, int oldCount, int newCount) {}
public void listenerExceptionThrown(
EventSource es, Object listener,
String methodName, Throwable exception) {
listenerException.set(exception);
}
public void unexpectedException(
EventSource es, Object runnable, Throwable exception) {}
});
DBWriter dbWriter = (DBWriter) eventSource.getEventTrigger(DBWriter.class);
// Can invoke interface methods from any thread...
try {
dbWriter.exec(
"INSERT INTO AIRPORTS VALUES(?,?)", "KAUM", "Austin Municipal Airport");
} catch (Exception x) {
// expecting this to fail
}
Throwable exception = listenerException.get();
Concurrency Utilities for Java EE 範例:
DBWriter dbWriter = contextService.createContextualProxy(
new DBWriterImpl(), DBWriter.class);
// Can invoke interface methods from any thread...
SQLException exception = null;
try {
dbWriter.exec(
"INSERT INTO AIRPORTS VALUES(?,?)", "KSBU", "Blue Earth Municipal Airport");
} catch (SQLException x) {
exception = x;
}