实现 Concurrency Utilities for Java EE 的 API 任务示例

您可以迁移使用异步 Bean API 和 CommonJ Timer and Work Manager API 的应用程序,以使用 Concurrency Utilities for Java™ EE。

代码示例中使用的资源

此页面上的代码示例假定应用程序已注入或查找下列资源:

@Resource(lookup = "wm/default")
private com.ibm.websphere.asynchbeans.WorkManager abWorkManager;

@Resource(lookup = "wm/default")
private commonj.work.WorkManager cjWorkManager;

@Resource
private ContextService contextService;

@Resource(name = "java:app/env/jdbc/dsRef")
private DataSource dataSource;

@Resource
private ManagedScheduledExecutorService executor;

@Resource
private ManagedThreadFactory threadFactory;

@Resource(name = "java:comp/env/tm/default", lookup = "tm/default", shareable = false)
private TimerManager timerManager;

@Resource
private UserTransaction tran;

基本任务实现

此部分提供本文档的其余部分中其他示例所使用的简单任务实现的一些示例。异步 Bean 需要一个单独的接口 AlarmListener 用于调度为将来运行的任务。CommonJ 需要一个单独的接口 TimerListener 用于调度为将来运行的任务。Concurrency Utilities for Java EE 任务可以是 RunnableCallable,并允许使用任一接口,而无论是提交任务以立即运行还是将任务调度为将来运行。在某些情况下,可以不进行任何更改就将异步 Bean 或 CommonJ Work 作为 Runnable 提交给受管执行程序。Work 的 release 方法将由用于取消和中断正在运行的线程的受管执行程序功能所取代。Work 的 isDaemon 方法将由 LONGRUNNING_HINT 执行属性所取代。

异步 Bean 和 CommonJ 的用于查找下一个素数的 Work 任务示例:

public class PrimeFinderWork implements
    com.ibm.websphere.asynchbeans.Work, commonj.work.Work {
    private long num;
    private volatile boolean released;
    private long result;

    public PrimeFinderWork(long startingValue) {
        num = startingValue;
    }

    public boolean isDaemon() {
        return false;
    }
    public void release() {
        		released = true;
    }

    public void run() {
        while (!isPrime(num))
            if (released || Thread.currentThread().isInterrupted())
                throw new RuntimeException(new InterruptedException());
            else
                num++;
        result = num++;
    }

    public long getPrimeNumber() {
        if (result > 0)
            return result;
        else
            throw new IllegalStateException();
    }
}

异步 Bean 的用于查找下一个素数的 AlarmListener 任务示例

public class PrimeFinderAlarmListener implements AlarmListener {
    private volatile boolean aborted;
    private int count;
    private long num;
    private long result;

    public PrimeFinderAlarmListener(long startingValue) {
        num = startingValue;
    }

    public void abort() {
        aborted = true;
    }

    public void fired(Alarm alarm) {
        while (!isPrime(num))
            if (aborted || Thread.currentThread().isInterrupted())
                throw new RuntimeException(new InterruptedException());
            else
                num++;
        result = num++;
        // optionally reschedule:
        Object delays = alarm.getContext();
        if (delays instanceof Integer)
            alarm.reset((Integer) delays);
        else if (delays instanceof int[] && count < ((int[]) delays).length)
            alarm.reset(((int[]) delays)[count++]);
    }

    public long getPrimeNumber() {
        if (result > 0)
            return result;
        else
            throw new IllegalStateException();
    }

异步 Bean 的用于查找下一个素数的 TimerListener 任务示例

public class PrimeFinderTimerListener implements CancelTimerListener, TimerListener {
    private volatile boolean aborted;
    private int count;
    private final long[] delays;
    private long num;
    private long result;

    public PrimeFinderTimerListener(long startingValue, long... delays) {
        num = startingValue;
        this.delays = delays;
    }

    public void timerCancel(Timer timer) {
        aborted = true;
    }

    public void timerExpired(Timer timer) {
        while (!isPrime(num))
            if (aborted || Thread.currentThread().isInterrupted())
                throw new RuntimeException(new InterruptedException());
            else
                num++;
        result = num++;
        // optionally reschedule:
        if (count < delays.length)
            try {          
                TimerManager timerManager = (TimerManager) new InitialContext().lookup(
                    "java:comp/env/tm/default");
                timerManager.schedule(this, delays[count++]);
            } catch (NamingException x) {
                throw new RuntimeException(x);
            }
    }

    public long getPrimeNumber() {
        if (result > 0)
            return result;
        else
            throw new IllegalStateException();
    }
}

Concurrency Utilities for Java EE 的用于查找下一个素数的 Runnable 任务示例:

public class PrimeFinderRunnable implements Runnable {
    private long num;
    private long result;

    public PrimeFinderRunnable(long startingValue) {
        num = startingValue;
    }

    public void run() {
        while (!isPrime(num))
            if (Thread.currentThread().isInterrupted())
                throw new RuntimeException(new InterruptedException());
            else
                num++;
        result = num++;
    }

    public long getPrimeNumber() {
        if (result > 0)
            return result;
        else
            throw new IllegalStateException();
    }
}

Concurrency Utilities for Java EE 的用于查找下一个素数的 Callable 任务示例:

public class PrimeFinderTask implements Callable<Long> {
    private long num;

    public PrimeFinderTask(long startingValue) {
        num = startingValue;
    }

    public Long call() throws InterruptedException {
        while (!isPrime(num))
            if (Thread.currentThread().isInterrupted())
                throw new InterruptedException();
            else
                num++;
        return num++;
    }
}

异步 Bean 的用于执行基本数据库插入的 Work 任务示例:

public class DBInsertWorkAB implements Work, Serializable {
    private static final long serialVersionUID = 2606824039439594442L;
    private transient Thread executionThread;
    private final String code;
    private final String name;
    private boolean released;
    private volatile int result = -1;

    public DBInsertWorkAB(String code, String name) {
        this.code = code;
        this.name = name;
    }

    public int getResult() {
        return result;
    }

    public synchronized void release() {
        		released = true;
        if (executionThread != null)
            executionThread.interrupt();
    }

    public void run() {
	synchronized (this) {
            if (released)
                throw new RuntimeException("Work was canceled");
            executionThread = Thread.currentThread();
        }
        try {          
            DataSource ds = (DataSource) new InitialContext().lookup(
                "java:app/env/jdbc/dsRef");
            Connection con = ds.getConnection();
            try {          
                PreparedStatement stmt = con.prepareStatement(
                    "INSERT INTO AIRPORTS VALUES(?,?)");
                stmt.setString(1, code);
                stmt.setString(2, name);
                result = stmt.executeUpdate();
            } finally {
                con.close();
            }
        } catch (NamingException x) {
            throw new RuntimeException(x);
        } catch (SQLException x) {
            throw new RuntimeException(x);
        } finally {
            synchronized (this) {
                executionThread = null;
            }
        }
    }
}

异步 Bean 的用于执行基本数据库插入的 AlarmListener 任务示例:

public class DBInsertAlarmListener implements AlarmListener {
    private volatile int result = -1;

    public int getResult() {
        return result;
    }

    public void fired(Alarm alarm) {
        String[] alarmContext = (String[]) alarm.getContext();
        try {          
            DataSource ds = (DataSource) new InitialContext().lookup(
                "java:app/env/jdbc/dsRef");
            Connection con = ds.getConnection();
            try {          
                PreparedStatement stmt = con.prepareStatement(
                    "INSERT INTO AIRPORTS VALUES(?,?)");
                stmt.setString(1, alarmContext[0]);
                stmt.setString(2, alarmContext[1]);
                result = stmt.executeUpdate();
            } finally {
                con.close();
            }
        } catch (NamingException x) {
            throw new RuntimeException(x);
        } catch (SQLException x) {
            throw new RuntimeException(x);
        }
    }
}

CommonJ 的用于执行基本数据库插入的 Work 任务示例:

public class DBInsertWorkCJ implements Work, Serializable {
    private static final long serialVersionUID = -8801347489043041978L;
    private transient Thread executionThread;
    private final String code;
    private final String name;
    private boolean isDaemon;
    private boolean released;
    private volatile int result = -1;

    public DBInsertWorkCJ(String code, String name) {
        this.code = code;
        this.name = name;
    }

    public int getResult() {
        return result;
    }

    public boolean isDaemon() {
        return isDaemon;
    }

    public synchronized void release() {
        		released = true;
        if (executionThread != null)
            executionThread.interrupt();
    }

    public void run() {
	synchronized (this) {
            if (released)
                throw new RuntimeException("Work was canceled");
            executionThread = Thread.currentThread();
        }
        try {          
            DataSource ds = (DataSource) new InitialContext().lookup(
                "java:app/env/jdbc/dsRef");
            Connection con = ds.getConnection();
            try {          
                PreparedStatement stmt = con.prepareStatement(
                    "INSERT INTO AIRPORTS VALUES(?,?)");
                stmt.setString(1, code);
                stmt.setString(2, name);
                result = stmt.executeUpdate();
            } finally {
                con.close();
            }
        } catch (NamingException x) {
            throw new RuntimeException(x);
        } catch (SQLException x) {
            throw new RuntimeException(x);
        } finally {
            synchronized (this) {
                executionThread = null;
            }
        }
    }

CommonJ 的用于执行基本数据库插入的 TimerListener 任务示例:

public class DBInsertTimerListener implements TimerListener {
    private volatile int result = -1;

    private final String code;
    private final String name;

    public DBInsertTimerListener(String code, String name) {
        this.code = code;
        this.name = name;
    }

    public int getResult() {
        return result;
    }

    public void timerExpired(Timer timer) {
        try {          
            DataSource ds = (DataSource) new InitialContext().lookup(
                "java:app/env/jdbc/dsRef");
            Connection con = ds.getConnection();
            try {          
                PreparedStatement stmt = con.prepareStatement(
                    "INSERT INTO AIRPORTS VALUES(?,?)");
                stmt.setString(1, code);
                stmt.setString(2, name);
                result = stmt.executeUpdate();
            } finally {
                con.close();
            }
        } catch (NamingException x) {
            throw new RuntimeException(x);
        } catch (SQLException x) {
            throw new RuntimeException(x);
        }
    }
}

Concurrency Utilities for Java EE 的用于执行基本数据库插入的 Callable 任务示例:

public class DBInsertTask implements Callable<Integer>, Serializable {
    private static final long serialVersionUID = 5556464104788801400L;
    private final String code;
    private final String name;

    public DBInsertTask(String code, String name) {
        this.code = code;
        this.name = name;
    }

    public Integer call() throws NamingException, SQLException {
        DataSource ds = (DataSource) new InitialContext().lookup(
            "java:app/env/jdbc/dsRef");
        Connection con = ds.getConnection();
        try {          
            PreparedStatement stmt = con.prepareStatement(
                "INSERT INTO AIRPORTS VALUES(?,?)");
            stmt.setString(1, code);
            stmt.setString(2, name);
            return stmt.executeUpdate();
        } finally {
            con.close();
        }
    }
}

提交任务

所有三个编程模型都提供了一种方法,用于提交基本任务以便在合用线程中运行并获取结果。

异步 Bean 示例:

WorkItem workItem = abWorkManager.startWork(
    new DBInsertWorkAB("DLH", "Duluth International Airport"));
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS)) {
    DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
    int numUpdates = work.getResult();
}

CommonJ 示例:

WorkItem workItem = cjWorkManager.schedule(
    new DBInsertWorkCJ("HIB", "Chisholm-Hibbing Airport"));
if (cjWorkManager.waitForAll(Collections.singletonList(workItem), TIMEOUT_MS)) {
    DBInsertWorkCJ work = (DBInsertWorkCJ) workItem.getResult();
    int numUpdates = work.getResult();
}

Concurrency Utilities for Java EE 示例:

Future<Integer> future = executor.submit(
    new DBInsertTask("INL", "Falls International Airport"));
int numUpdates = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);

提交任务时的更多选项

提交任务时,您可以选择性地指定侦听器和启动超时,并指示是否希望此任务长时间运行。在异步 Bean 中,启动超时仅作为参数可用,而对于 CommonJ 和 Concurrency Utilities for Java EE,可以在 WorkListenerManagedTaskListener 内实现启动超时。

异步 Bean 示例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
WorkItem workItem = abWorkManager.startWork(
    new DBInsertWorkAB("SGS", "South Saint Paul Municipal Airport"),
    startTimeout,
    new WorkListenerAB(),
    isLongRunning);
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, Integer.MAX_VALUE)) {
    DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
    int numUpdates = work.getResult();
}

CommonJ 示例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
DBInsertWorkCJ work = new DBInsertWorkCJ("STP", "Saint Paul Downtown Airport");
work.setDaemon(isLongRunning);
WorkItem workItem = cjWorkManager.schedule(
    work, new WorkListenerCJ(work, startTimeout));
Collection<WorkItem> items = Collections.singleton(workItem);
if (cjWorkManager.waitForAll(items, WorkManager.INDEFINITE)) {
    work = (DBInsertWorkCJ) workItem.getResult();
    int numUpdates = work.getResult();
}

Concurrency Utilities for Java EE 示例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
Callable<Integer> contextualTask = ManagedExecutors.managedTask(
    new DBInsertTask("LVN", "Airlake Airport"),
    Collections.singletonMap(ManagedTask.LONGRUNNING_HINT,
        Boolean.toString(isLongRunning)),
    new TaskListener(startTimeout));
Future<Integer> future = executor.submit(contextualTask);
int numUpdates = future.get();

等待一组任务完成

所有三个编程模型都提供了用于等待一组任务完成的方法。下列示例指定最长等待时间,因为可能会无限期等待,或者因为在 Concurrency Utilities for Java EE 中,可能会通过在各种 Future 中按顺序调用 get 来添加甚至更多的粒度。

ArrayList<WorkItem> items = new ArrayList<WorkItem>(3);
items.add(abWorkManager.startWork(
    new DBInsertWorkAB("COQ", "Cloquet/Carlton County Airport")));
items.add(abWorkManager.startWork(
    new DBInsertWorkAB("CQM", "Cook Municipal Airport")));
items.add(abWorkManager.startWork(
    new DBInsertWorkAB("CKN", "Crookston Municipal Airport")));
boolean allCompleted = abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS);
int numUpdates = 0;
for (WorkItem workItem : items) {
    if (workItem.getStatus() == WorkEvent.WORK_COMPLETED) {
        DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
        numUpdates += work.getResult();
    } else
        ((Work) workItem.getEventTrigger(Work.class)).release();
}

CommonJ 示例:

List<DBInsertWorkCJ> workList = Arrays.asList(
    new DBInsertWorkCJ("DTL", "Detroit Lakes Airport"),
    new DBInsertWorkCJ("TOB", "Dodge Center Airport"),
    new DBInsertWorkCJ("DYT", "Sky Harbor Airport"));
List<WorkItem> items = new ArrayList<WorkItem>(workList.size());
for (DBInsertWorkCJ work : workList)
    items.add(cjWorkManager.schedule(work));
boolean allCompleted = cjWorkManager.waitForAll(items, TIMEOUT_MS);
int numUpdates = 0;
for (int i = 0; i < items.size(); i++) {
    WorkItem workItem = items.get(i); 
    if (workItem.getStatus() == WorkEvent.WORK_COMPLETED) {
        DBInsertWorkCJ work = (DBInsertWorkCJ) workItem.getResult();
        numUpdates += work.getResult();
    } else
        workList.get(i).release();
}

Concurrency Utilities for Java EE 示例:

List<DBInsertTask> tasks = Arrays.asList(
    new DBInsertTask("CFE", "Buffalo Municipal Airport"),
    new DBInsertTask("CHU", "Caledonia-Houston County Airport"),
    new DBInsertTask("CBG", "Cambridge Municipal Airport"));
int numUpdates = 0;
List<Future<Integer>> futures = executor.invokeAll(tasks, TIMEOUT_MS, TimeUnit.MILLISECONDS);
for (Future<Integer> future : futures)
    numUpdates += future.get();

等待组内的单个任务完成

所有三个编程模型都提供了用于等待组内的单个任务完成的方法。下列示例指定最长等待时间,但也可能会无限期地等待。

异步 Bean 示例:

ArrayList<WorkItem> items = new ArrayList<WorkItem>(3);
items.add(abWorkManager.startWork(new PrimeFinderWork(20)));
items.add(abWorkManager.startWork(new PrimeFinderWork(50)));
items.add(abWorkManager.startWork(new PrimeFinderWork(80)));
boolean anyCompleted = abWorkManager.join(items, WorkManager.JOIN_OR, TIMEOUT_MS);
long prime = -1;
for (WorkItem workItem : items) {
    if (workItem.getStatus() == WorkEvent.WORK_COMPLETED) {
        PrimeFinderWork work = (PrimeFinderWork) workItem.getResult();
        prime = work.getPrimeNumber();
    } else
        ((Work) workItem.getEventTrigger(Work.class)).release();
}

CommonJ 示例:

List<PrimeFinderWork> workList = Arrays.asList(
    new PrimeFinderWork(20),
    new PrimeFinderWork(50),
    new PrimeFinderWork(80));
List<WorkItem> items = new ArrayList<WorkItem>(workList.size());
for (PrimeFinderWork work : workList)
    items.add(cjWorkManager.schedule(work));
Collection<WorkItem> completedItems = cjWorkManager.waitForAny(items, TIMEOUT_MS);
long prime = -1;
for (int i = 0; i < items.size(); i++) {
    WorkItem workItem = items.get(i);
    if (completedItems.contains(workItem)) {
        PrimeFinderWork work = (PrimeFinderWork) workItem.getResult();
        prime = work.getPrimeNumber();
    } else
        workList.get(i).release();
}

Concurrency Utilities for Java EE 示例:

List<PrimeFinderTask> tasks = Arrays.asList(
    new PrimeFinderTask(20),
    new PrimeFinderTask(50),
    new PrimeFinderTask(80));
long prime = executor.invokeAny(tasks, TIMEOUT_MS, TimeUnit.MILLISECONDS);

在任务失败时获取原因异常

所有三个编程模型都提供了用于在任务执行失败时获取原因异常的方法。这可以使用侦听器来实现(在此页面的后面部分中可以找到示例),或者在通过 WorkItemFuture 获取任务结果时实现。引发了 WorkExceptionExecutionException,其中包含原始异常作为原因。

异步 Bean 示例:

boolean isLongRunning = false;
WorkItem workItem = abWorkManager.startWork(
    new DBInsertWorkAB("KADC", "Wadena Municipal Airport"),
    isLongRunning);
Throwable exception = null;
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS))
    try {          
        DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
        int numUpdates = work.getResult();
    } catch (WorkException x) {
        exception = x.getCause();
    }

CommonJ 示例:

boolean isLongRunning = false;
DBInsertWorkCJ work = new DBInsertWorkCJ("KBDH", "Willmar Municipal Airport");
work.setDaemon(isLongRunning);
WorkItem workItem = cjWorkManager.schedule(work);
Throwable exception = null;
if (cjWorkManager.waitForAll(Collections.singleton(workItem), TIMEOUT_MS))
    try {          
        work = (DBInsertWorkCJ) workItem.getResult();
        int numUpdates = work.getResult();
    } catch (WorkException x) {
        exception = x.getCause();
    }

Concurrency Utilities for Java EE 示例:

boolean isLongRunning = false;
Callable<Integer> task = ManagedExecutors.managedTask(
    new DBInsertTask("KACQ", "Waseca Municipal Airport"),
    Collections.singletonMap(ManagedTask.LONGRUNNING_HINT,
        Boolean.toString(isLongRunning)),
    null);
Future<Integer> future = executor.submit(task);
Throwable exception = null;
try {          
    int numUpdates = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (ExecutionException x) {
    exception = x.getCause();
}

将一次性任务调度为在特定时间间隔后运行

所有三个编程模型都提供了一种方法,用于将基本任务调度为在将来某个时间点在合用线程中运行并获取结果。

异步 Bean 示例:

AsynchScope asynchScope = abWorkManager.findOrCreateAsynchScope("MyScope");
AlarmManager alarmManager = asynchScope.getAlarmManager();
Alarm alarm = alarmManager.create(
    new DBInsertAlarmListener(),
    new String[] { "MSP", "Minneapolis-Saint Paul International Airport"},
    (int) TimeUnit.SECONDS.toMillis(1));
DBInsertAlarmListener alarmListener = (DBInsertAlarmListener) alarm.getAlarmListener();
// Poll for result to appear
for (long start = System.nanoTime();
    alarmListener.getResult() < 0 && System.nanoTime() - start < TIMEOUT_NS;
    Thread.sleep(200)) ;
int numUpdates = alarmListener.getResult();

CommonJ 示例:

Timer timer = timerManager.schedule(
    new DBInsertTimerListener("STC", "Saint Cloud Regional Airport"),
    TimeUnit.SECONDS.toMillis(1));
DBInsertTimerListener timerListener = (DBInsertTimerListener) timer.getTimerListener();
// Poll for result to appear
for (long start = System.nanoTime();
    timerListener.getResult() < 0 && System.nanoTime() - start < TIMEOUT_NS;
    Thread.sleep(200)) ;
int numUpdates = timerListener.getResult();

Concurrency Utilities for Java EE 示例:

ScheduledFuture<Integer> future = executor.schedule(
    new DBInsertTask("RST", "Rochester International Airport"),
    1,
    TimeUnit.SECONDS);
int numUpdates = future.get(TIMEOUT_NS, TimeUnit.NANOSECONDS);

将重复任务调度为按固定频率运行,并查询下一次执行之前的时间间隔

CommonJ 和 Concurrency Utilities for Java EE 编程模型提供了一种方法,用于将重复任务调度为按固定频率(例如,每小时开始时)运行。不保证实时调度。任务可以在此时间后的任何时间点开始,但不得早于此时间。CommonJ 和 Concurrency Utilities for Java EE 编程模型还提供了一种方便的方法,用于计算下一次执行之前的延迟。

CommonJ 示例:

Timer timer = timerManager.scheduleAtFixedRate(
    new PrimeFinderTimerListener(120),
    TimeUnit.MINUTES.toMillis(90),
    TimeUnit.MINUTES.toMillis(30));
long nextExecTime = timer.getScheduledExecutionTime();
long delay = TimeUnit.MILLISECONDS.toSeconds(
    nextExecTime - System.currentTimeMillis());

Concurrency Utilities for Java EE 示例:

ScheduledFuture<?> future = executor.scheduleAtFixedRate(
    new PrimeFinderRunnable(120), 90, 30, TimeUnit.MINUTES);
long delay = future.getDelay(TimeUnit.SECONDS);

将重复任务调度为在两次执行之间具有固定延迟以及取消此任务

CommonJ 和 Concurrency Utilities for Java EE 编程模型提供了一种方法,用于将重复任务调度为在一次执行结束与下一次执行开始之间间隔固定时长来运行。不保证实时调度。任务可以在此时间后的任何时间点开始,但不得早于此时间。异步 Bean 在 Alarm 中提供了 reset 方法,可用于实现此相同的行为。所有三个编程模型都提供了一种方法,用于取消启动所调度任务的进一步执行。

异步 Bean 示例:

AsynchScope asynchScope = abWorkManager.findOrCreateAsynchScope("MyScope");
AlarmManager alarmManager = asynchScope.getAlarmManager();
Alarm alarm = alarmManager.create(
    new PrimeFinderAlarmListener(90), 50, 10);
// ... eventually cancel the alarm
alarm.cancel();

CommonJ 示例:

Timer timer = timerManager.schedule(
    new PrimeFinderTimerListener(90), 50, 50);
// ... eventually cancel the timer
timer.cancel();

Concurrency Utilities for Java EE 示例:

ScheduledFuture<?> future = executor.scheduleWithFixedDelay(
    new PrimeFinderRunnable(90), 50, 50, TimeUnit.MILLISECONDS);
// ... eventually cancel the task
future.cancel(false);

将重复任务调度为按不同的时间间隔运行

在所有三个编程模型中,都可以使用每次执行计算重复任务下一次重复之前所间隔的时间。异步 Bean 为 alarm 提供了 reset 方法。Concurrency Utilities for Java EE 允许您插入用于计算下一次执行时间的 Trigger。CommonJ 不提供上述任一方法,但在 CommonJ 和其他编程模型中,可以在上一次执行完成时重复调度任务。在下列示例中,任务调度为正好运行四次,在每次执行之前都具有不同的延迟。您可以在此部分中包含的 Trigger 内以及基本任务实现部分下包含的 AlarmListenerTimerListener 实现内找到用于重置、重新调度或计算下一次执行的代码。

异步 Bean 示例:

int initialDelay = 50;
int[] subsequentDelays = new int[] { 40, 80, 70 };
AsynchScope asynchScope = abWorkManager.findOrCreateAsynchScope("MyScope");
AlarmManager alarmManager = asynchScope.getAlarmManager();
Alarm alarm = alarmManager.create(
    new PrimeFinderAlarmListener(60),
    subsequentDelays,
    initialDelay);
Thread.sleep(5000);
PrimeFinderAlarmListener alarmListener =
    (PrimeFinderAlarmListener) alarm.getAlarmListener();
long prime = alarmListener.getPrimeNumber();

CommonJ 示例:

long initialDelay = 50;
long [] subsequentDelays = new long[] { 40, 80, 70 };
Timer timer = timerManager.schedule(
    new PrimeFinderTimerListener(60, subsequentDelays),
    initialDelay);
Thread.sleep(5000);
PrimeFinderTimerListener timerListener = (PrimeFinderTimerListener) timer.getTimerListener();
long prime = timerListener.getPrimeNumber();

Concurrency Utilities for Java EE 示例:

ScheduledFuture<Long> future = executor.schedule(
    new PrimeFinderTask(60),
    new DelayTrigger(50, 40, 80, 70));
Thread.sleep(5000);
long prime = future.get();

public class DelayTrigger implements Trigger {
    private int count;
    private long[] delays;
    volatile boolean isSuspended;

    public DelayTrigger(long... delays) {
        this.delays = delays;
    }

    public Date getNextRunTime(LastExecution previousExecution, Date taskScheduledTime) {
        if (delays.length > count)
            return new Date(System.currentTimeMillis() + delays[count++]);
        else
            return null;
    }

    public boolean skipRun(LastExecution previousExecution, Date scheduledRunTime) {
        return isSuspended;
    }
}

暂挂和恢复任务的执行

CommonJ TimerManager 提供了用于暂挂和恢复任务的执行的接口。Concurrency Utilities for Java EE 通过可插入 Trigger 机制的 skipRun 方法更细粒度地提供此功能。可以将单个 Trigger 实例提供给任意数目的任务,前提是 Trigger 已实现为支持这样做。在以下示例中,Trigger 编写为用于调度单个任务。

CommonJ 示例:

Timer timer = timerManager.schedule(
    new PrimeFinderTimerListener(100),
    new Date(System.currentTimeMillis() + TimeUnit.DAYS.toMillis(5)));
timerManager.suspend();
// ... resume at a later point
if (timerManager.isSuspending() || timerManager.isSuspended())
    timerManager.resume();

Concurrency Utilities for Java EE 示例:

DelayTrigger trigger = new DelayTrigger(
    System.currentTimeMillis() + TimeUnit.DAYS.toMillis(5));
ScheduledFuture<Long> future = executor.schedule(
    new PrimeFinderTask(100), trigger);
trigger.isSuspended = true;
// ... resume at a later point
if (trigger.isSuspended)
    trigger.isSuspended = false;

停止任务的进一步执行

异步 Bean 提供了一种方法来销毁 AsynchScope,这将取消该作用域中的 AlarmManager 创建的所有警报。CommonJ TimerManager 提供了多个接口,用于阻止启动进一步执行并等待所有正在运行的任务停止。这是可行的,因为 TimerManager 的每一次查找都会生成一个新的实例,此实例可以独立于其他实例停止。在 Concurrency Utilities for Java EE 中,将在查找之间共享同一 ManagedScheduledExecutorService,并且根据规范不允许执行 shutdownisTerminatedawaitTermination 之类的生命周期操作。然而,使用 Concurrency Utilities for Java EE 时,您可以通过向非受管 ScheduledExecutorService 提供 ManagedThreadFactory 来获得类似的行为。

异步 Bean 示例:

alarmManager.create(
    new PrimeFinderAlarmListener(100),
    null,
    (int) TimeUnit.HOURS.toMillis(1));
alarmManager.create(
    new PrimeFinderAlarmListener(200),
    null,
    (int) TimeUnit.HOURS.toMillis(2));
// ... eventually destroy the asynch scope to cancel all alarms
asynchScope.destroy();

CommonJ 示例:

TimerManager timerManager = (TimerManager) new InitialContext().lookup(
    "java:comp/env/tm/default");
timerManager.schedule(
    new PrimeFinderTimerListener(100),
    new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)));
timerManager.schedule(
    new PrimeFinderTimerListener(200),
    new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(2)));
// ... eventually stop the timer manager
timerManager.stop();
if (!timerManager.isStopped())
    timerManager.waitForStop(TIMEOUT_MS);

Concurrency Utilities for Java EE 示例:

ScheduledExecutorService executor =
    Executors.newScheduledThreadPool(1, threadFactory);
executor.schedule(new PrimeFinderTask(100), 1, TimeUnit.HOURS);
executor.schedule(new PrimeFinderTask(200), 2, TimeUnit.HOURS);
// .. eventually shut down the executor
executor.shutdown();
if (!executor.isTerminated())
    executor.awaitTermination(TIMEOUT_MS, TimeUnit.MILLISECONDS);

构造上下文代理

异步 Bean 和 Concurrency Utilities for Java EE 编程模型允许您构造上下文代理。线程上下文是从创建上下文代理的线程捕获的并存储在该线程中,在代理中调用接口方法时会自动应用于执行线程,并在此后从执行线程中除去。异步 Bean 通过 EventSource 提供此功能。Concurrency Utilities for Java EE 通过 ContextService 提供此功能。

异步 Bean 示例:

EventSource eventSource = abWorkManager.createEventSource();
eventSource.addListener(new DBWriterImpl());
DBWriter dbWriter = (DBWriter) eventSource.getEventTrigger(DBWriter.class);
// Can invoke interface methods from any thread...
int numUpdates = dbWriter.exec(
    "INSERT INTO AIRPORTS VALUES(?,?)", "AIT", "Aitkin Municipal Airport");

Concurrency Utilities for Java EE 示例:

DBWriter dbWriter = contextService.createContextualProxy(
    new DBWriterImpl(), DBWriter.class);
// Can invoke interface methods from any thread...
int numUpdates = dbWriter.exec(
    "INSERT INTO AIRPORTS VALUES(?,?)", "AXN", "Alexandria Municipal Airport");

为在执行线程的事务中运行的多个实例构造上下文代理

异步 Bean 和 Concurrency Utilities for Java EE 都允许您指定上下文代理的接口方法是否在执行线程的事务中运行,或者指定当前事务是否处于暂挂状态以及在此方法期间是否应用干净的事务上下文。异步 Bean 的另一项功能是允许单个代理触发多个侦听器实例。然而,Concurrency Utilities for Java EE 缺少此功能,您可以通过编写委派给多个实例的包装器侦听器来实现类似的行为。在以下示例中,我们调用单个代理,此代理触发多个全部位于调用线程的事务下的侦听器实例。

异步 Bean 示例:

boolean runInSameTran = true;
EventSource eventSource = abWorkManager.createEventSource();
eventSource.addListener(new DBInsertTask("MKT", "Mankato Regional Airport"));
eventSource.addListener(new DBInsertTask("ULM", "New Ulm Municipal Airport"));
eventSource.addListener(new DBInsertTask("OWA", "Owatonna Degner Regional Airport"));
Callable<?> eventTrigger = (Callable<?>) eventSource.getEventTrigger(
    Callable.class, runInSameTran);
// Can invoke interface methods from any thread...
tran.begin();
try {          
    eventTrigger.call();
} finally {
    tran.commit();
}

Concurrency Utilities for Java EE 示例:

Callable<?> eventTrigger = contextService.createContextualProxy(
    new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            new DBInsertTask("FFM", "Fergus Falls Municipal Airport").call();
            new DBInsertTask("ONA", "Winona Municipal Airport").call();
            new DBInsertTask("OTG", "Worthington Municipal Airport").call();
            return null;
        }
    },
    Collections.singletonMap(ManagedTask.TRANSACTION,
        ManagedTask.USE_TRANSACTION_OF_EXECUTION_THREAD),
    Callable.class);
// Can invoke interface methods from any thread...
tran.begin();
try {          
    eventTrigger.call();
} finally {
    tran.commit();
}

在调用线程中创建延迟执行上下文任务

异步 Bean 允许您创建 WorkWithExecutionContext,此方法本质上是任务的可序列化上下文代理,该代理随后可以提交给 WorkManager 以便在创建线程的线程上下文下运行。如果意图是在调用线程(WorkManager 的 doWork 方法)中运行,那么上下文代理可以在 Concurrency Utilities for Java EE 中实现类似的行为。

异步 Bean 示例:

WorkWithExecutionContext contextualWork = abWorkManager.create(
    new DBInsertWorkAB("BJI", "Bemidji Regional Airport"));
// Can run the contextual work on any thread...
abWorkManager.doWork(contextualWork);
DBInsertWorkAB work = (DBInsertWorkAB) contextualWork.getWork();
int numUpdates = work.getResult();

Concurrency Utilities for Java EE 示例:

Callable<Integer> contextualTask = contextService.createContextualProxy(
    new DBInsertTask("BRD", "Brainerd Lakes Regional Airport"),
    Callable.class);
// Can run the contextual proxy on any thread...
int numUpdates = contextualTask.call();

在合用线程中创建延迟执行上下文任务

异步 Bean 允许您创建 WorkWithExecutionContext,此方法本质上是任务的可序列化上下文代理,该代理随后可以提交给 WorkManager 以便在创建线程的线程上下文下运行。如果意图是在合用线程(WorkManager 的 startWork 方法)中运行,那么在 Concurrency Utilities for Java EE 中,可以通过将任务的上下文代理提交给受管执行程序来实现类似的行为,由于线程上下文捕获和传播重复,此方法效率较低。

异步 Bean 示例:

WorkWithExecutionContext contextualWork = abWorkManager.create(
    new DBInsertWorkAB("ELO", "Ely Municipal Airport"));
WorkItem workItem = abWorkManager.startWork(contextualWork);
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, TIMEOUT_MS)) {
    DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
    int numUpdates = work.getResult();
}

Concurrency Utilities for Java EE 示例:

Callable<Integer> contextualTask = contextService.createContextualProxy(
    new DBInsertTask("EVM", "Eveleth-Virginia Municipal Airport"),
    Callable.class);
Future<Integer> future = executor.submit(contextualTask);
int numUpdates = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);

延迟执行上下文任务的更多选项

提交延迟执行任务时,您可以选择性地指定侦听器和启动超时,并指示是否希望此任务长时间运行。在异步 Bean 中,启动超时仅作为参数可用,而对于 CommonJ 和 Concurrency Utilities for Java EE,可以在 WorkListenerManagedTaskListener 内实现启动超时。

异步 Bean 示例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
WorkWithExecutionContext contextualWork = abWorkManager.create(
    new DBInsertWorkAB("FRM", "Fairmont Municipal Airport"));
WorkItem workItem = abWorkManager.startWork(
    contextualWork, startTimeout, new WorkListenerAB(), isLongRunning);
ArrayList<WorkItem> items = new ArrayList<WorkItem>(1);
items.add(workItem);
if (abWorkManager.join(items, WorkManager.JOIN_AND, Integer.MAX_VALUE)) {
    DBInsertWorkAB work = (DBInsertWorkAB) workItem.getResult();
    int numUpdates = work.getResult();
}

Concurrency Utilities for Java EE 示例:

long startTimeout = TIMEOUT_MS;
boolean isLongRunning = true;
Callable<Integer> contextualTask = contextService.createContextualProxy(
    new DBInsertTask("FBL", "Faribault Municipal Airport"),
    Callable.class);
contextualTask = ManagedExecutors.managedTask(
    contextualTask,
    Collections.singletonMap(ManagedTask.LONGRUNNING_HINT,
        Boolean.toString(isLongRunning)),
    new TaskListener(startTimeout));
Future<Integer> future = executor.submit(contextualTask);
int numUpdates = future.get();

子系统监视

异步 Bean 提供了 SubsystemMonitorSubsystemMonitorManager,作为一种跨应用程序或其他工件进行协调以监视可用性的机制。Concurrency Utilities for Java EE 未提供任何等价功能。如果需要此功能的替代功能,您可以按如下所示进行操作:实现一个所有其他应用程序都已知的应用程序,并使其充当 SubsystemMonitorManager 的等价项。

事件处理

异步 Bean 提供了针对 AlarmManagerAsynchScopeEventSourceSubsystemMonitorWorkManagerWork 中发生的各种类型的事件注册侦听器的功能。大多数这些事件在 Concurrency Utilities for Java EE 中没有任何直接的等价事件。应用程序应负责针对事件和通知实现自己的机制。在少数情况下,Concurrency Utilities for Java EE 提供类似的功能。在某些情况下,您可能能够使用 ManagedTaskListener,此方法取代 AlarmManagerEventsWorkEvents 更细粒度地进行注册(在提交任务时)。

对调用上下文代理期间发生的故障进行事件处理

在异步 Bean 中,如果使用上下文代理来调用某项操作,并且该操作引发已声明的异常,那么该异常不会报告回调用程序。而是,该异常会在 listenerExceptionThrown 事件中报告给 EventSourceEvents 侦听器。在 Concurrency Utilities for Java EE 中,调用程序可以捕获该异常并进行处理。

异步 Bean 示例:

EventSource eventSource = abWorkManager.createEventSource();
eventSource.addListener(new DBWriterImpl());
eventSource.addListener(new EventSourceEvents() {
    public void listenerCountChanged(
        EventSource es, int oldCount, int newCount) {}

    public void listenerExceptionThrown(
        EventSource es, Object listener,
        String methodName, Throwable exception) {
        listenerException.set(exception);
    }

    public void unexpectedException(
        EventSource es, Object runnable, Throwable exception) {}
});
DBWriter dbWriter = (DBWriter) eventSource.getEventTrigger(DBWriter.class);
// Can invoke interface methods from any thread...
try {          
    dbWriter.exec(
        "INSERT INTO AIRPORTS VALUES(?,?)", "KAUM", "Austin Municipal Airport");
} catch (Exception x) {
    // expecting this to fail
}
Throwable exception = listenerException.get();

Concurrency Utilities for Java EE 示例:

DBWriter dbWriter = contextService.createContextualProxy(
    new DBWriterImpl(), DBWriter.class);
// Can invoke interface methods from any thread...
SQLException exception = null;
try {          
    dbWriter.exec(
        "INSERT INTO AIRPORTS VALUES(?,?)", "KSBU", "Blue Earth Municipal Airport");
} catch (SQLException x) {
    exception = x;
}

用于指示主题类型的图标 参考主题



时间戳记图标 最近一次更新时间: Tuesday, 6 December 2016
http://www14.software.ibm.com/webapp/wsbroker/redirect?version=cord&product=was-nd-mp&topic=rwlp_migrate_to_eeconcurrency
文件名:rwlp_migrate_to_eeconcurrency.html