レプリカ・プリロード・コントローラーを使用した Loader は、Loader インターフェースに加えて ReplicaPreloadController インターフェースを実装することができます。
ReplicaPreloadController インターフェースは、プライマリー断片になるレプリカが、以前のプライマリー断片がプリロード・プロセスを完了したかどうかを認識する方法を提供するように設計されています。プリロードが部分的に完了している場合は、以前のプライマリーが完了していない部分をピックアップする情報が提供されます。ReplicaPreloadController インターフェースを実装すると、プライマリーとなるレプリカは、以前のプライマリーが完了していないプリロード・プロセスを続行し、プリロード全体が完了するまで続行します。
分散 WebSphere® eXtreme Scale 環境では、マップは、レプリカを含み、初期化中に大容量のデータをプリロードすることも可能です。プリロードは Loader アクティビティーであり、初期化中のプライマリー・マップでのみ行われます。大容量のデータがプリロードされる場合は、プリロードの完了に長時間かかる場合があります。プライマリー・マップでプリロード・データのほとんどが処理されたものの、初期化中に不明な理由でプリロードが停止した場合、レプリカはプライマリーとなります。この場合には、通常、新しいプライマリーが無条件にプリロードを実行するため、以前のプライマリーによってプリロードされたデータは失われます。無条件プリロードにより、新しいプライマリーはプリロード・プロセスを初期状態から開始し、以前にプリロードされたデータは無視されます。以前のプライマリーがプリロード・プロセス中に完了しなかった部分を、新しいプライマリーにピックアップさせるには、ReplicaPreloadController インターフェースを実装した Loader を指定します。詳しくは、API 資料を参照してください。
ローダーについて詳しくは、ローダーを参照してください。通常の Loader プラグインの作成については、ローダーの作成を参照してください。
public interface ReplicaPreloadController
{
public static final class Status
{
static public final Status PRELOADED_ALREADY =
new Status(K_PRELOADED_ALREADY);
static public final Status FULL_PRELOAD_NEEDED =
new Status(K_FULL_PRELOAD_NEEDED);
static public final Status PARTIAL_PRELOAD_NEEDED =
new Status(K_PARTIAL_PRELOAD_NEEDED);
}
Status checkPreloadStatus(Session session,
BackingMap bmap);
}
以下のセクションでは、Loader および ReplicaPreloadController インターフェースのいくつかのメソッドについて説明します。
Loader で ReplicaPreloadController インターフェースが実装されると、マップ初期化中に preloadMap メソッドが呼び出される前に、checkPreloadStatus メソッドが呼び出されます。このメソッドの戻り状況により、preloadMap メソッドが呼び出されるかどうかが決まります。このメソッドによって Status#PRELOADED_ALREADY が返された場合、preload メソッドは呼び出されません。返されない場合は、preload メソッドが実行されます。この動作によって、このメソッドは Loader 初期化メソッドとして機能します。このメソッドで Loader プロパティーを初期化する必要があります。このメソッドにより正しい状況が返される必要があり、返されない場合は、プリロードは予定通りに動作しません。
public Status checkPreloadStatus(Session session,
BackingMap backingMap) {
// When a loader implements ReplicaPreloadController interface,
// this method will be called before preloadMap method during
// map initialization. Whether the preloadMap method will be
// called depends on teh returned status of this method. So, this
// method also serve as Loader's initialization method. This method
// has to return the right staus, otherwise the preload may not
// work as expected.
// Note: must initialize this loader instance here.
ivOptimisticCallback = backingMap.getOptimisticCallback();
ivBackingMapName = backingMap.getName();
ivPartitionId = backingMap.getPartitionId();
ivPartitionManager = backingMap.getPartitionManager();
ivTransformer = backingMap.getObjectTransformer();
preloadStatusKey = ivBackingMapName + "_" + ivPartitionId;
try {
// get the preloadStatusMap to retrieve preload status that
// could be set by other JVMs.
ObjectMap preloadStatusMap = session.getMap(ivPreloadStatusMapName);
// retrieve last recorded preload data chunk index.
Integer lastPreloadedDataChunk = (Integer) preloadStatusMap
.get(preloadStatusKey);
if (lastPreloadedDataChunk == null) {
preloadStatus = Status.FULL_PRELOAD_NEEDED;
} else {
preloadedLastDataChunkIndex = lastPreloadedDataChunk.intValue();
if (preloadedLastDataChunkIndex == preloadCompleteMark) {
preloadStatus = Status.PRELOADED_ALREADY;
} else {
preloadStatus = Status.PARTIAL_PRELOAD_NEEDED;
}
}
System.out.println("TupleHeapCacheWithReplicaPreloadControllerLoader.
checkPreloadStatus()
-> map = " + ivBackingMapName + ", preloadStatusKey = " + preloadStatusKey
+ ", retrieved lastPreloadedDataChunk =" + lastPreloadedDataChunk + ",
determined preloadStatus = "
+ getStatusString(preloadStatus));
} catch (Throwable t) {
t.printStackTrace();
}
return preloadStatus;
}
preloadMap メソッドの実行は、checkPreloadStatus メソッドから返された結果によって異なります。preloadMap メソッドが呼び出されると、このメソッドは、通常、指定されたプリロード状況マップからプリロード状況の情報を取得し、どのようにプリロードを進行するかを決定する必要があります。理想的には、プリロードが部分的に完了されており、どこから開始すればよいかが preloadMap メソッドで正確に認識されている必要があります。データ・プリロード中に、preloadMap メソッドは、指定されたプリロード状況マップでプリロード状況を更新する必要があります。プリロード状況マップに保管されているプリロード状況は、プリロード状況を確認する必要がある場合に、checkPreloadStatus メソッドによって取得されます。
public void preloadMap(Session session, BackingMap backingMap)
throws LoaderException {
EntityMetadata emd = backingMap.getEntityMetadata();
if (emd != null && tupleHeapPreloadData != null) {
// The getPreLoadData method is similar to fetching data
// from database. These data will be push into cache as
// preload process.
ivPreloadData = tupleHeapPreloadData.getPreLoadData(emd);
ivOptimisticCallback = backingMap.getOptimisticCallback();
ivBackingMapName = backingMap.getName();
ivPartitionId = backingMap.getPartitionId();
ivPartitionManager = backingMap.getPartitionManager();
ivTransformer = backingMap.getObjectTransformer();
Map preloadMap;
if (ivPreloadData != null) {
try {
ObjectMap map = session.getMap(ivBackingMapName);
// get the preloadStatusMap to record preload status.
ObjectMap preloadStatusMap = session.
getMap(ivPreloadStatusMapName);
// Note: when this preloadMap method is invoked, the
// checkPreloadStatus has been called, Both preloadStatus
// and preloadedLastDataChunkIndex have been set. And the
// preloadStatus must be either PARTIAL_PRELOAD_NEEDED
// or FULL_PRELOAD_NEEDED that will require a preload again.
// If large amount of data will be preloaded, the data usually
// is divided into few chunks and the preload process will
// process each chunk within its own tran. This sample only
// preload few entries and assuming each entry represent a chunk.
// so that the preload process an entry in a tran to simulate
// chunk preloading.
Set entrySet = ivPreloadData.entrySet();
preloadMap = new HashMap();
ivMap = preloadMap;
// The dataChunkIndex represent the data chunk that is in
// processing
int dataChunkIndex = -1;
boolean shouldRecordPreloadStatus = false;
int numberOfDataChunk = entrySet.size();
System.out.println(" numberOfDataChunk to be preloaded = "
+ numberOfDataChunk);
Iterator it = entrySet.iterator();
int whileCounter = 0;
while (it.hasNext()) {
whileCounter++;
System.out.println("preloadStatusKey = " + preloadStatusKey
+ " ,
whileCounter = " + whileCounter);
dataChunkIndex++;
// if the current dataChunkIndex <= preloadedLastDataChunkIndex
// no need to process, becasue it has been preloaded by
// other JVM before. only need to process dataChunkIndex
// > preloadedLastDataChunkIndex
if (dataChunkIndex <= preloadedLastDataChunkIndex) {
System.out.println("ignore current dataChunkIndex =
" + dataChunkIndex + " that has been previously
preloaded.");
continue;
}
// Note: This sample simulate data chunk as an entry.
// each key represent a data chunk for simplicity.
// If the primary server or shard stopped for unknown
// reason, the preload status that indicates the progress
// of preload should be available in preloadStatusMap. A
// replica that become a primary can get the preload status
// and determine how to preload again.
// Note: recording preload status should be in the same
// tran as putting data into cache; so that if tran
// rollback or error, the recorded preload status is the
// actual status.
Map.Entry entry = (Entry) it.next();
Object key = entry.getKey();
Object value = entry.getValue();
boolean tranActive = false;
System.out.println("processing data chunk. map = " +
this.ivBackingMapName + ", current dataChunkIndex = " +
dataChunkIndex + ", key = " + key);
try {
shouldRecordPreloadStatus = false; // re-set to false
session.beginNoWriteThrough();
tranActive = true;
if (ivPartitionManager.getNumOfPartitions() == 1) {
// if just only 1 partition, no need to deal with
// partition.
// just push data into cache
map.put(key, value);
preloadMap.put(key, value);
shouldRecordPreloadStatus = true;
} else if (ivPartitionManager.getPartition(key) ==
ivPartitionId) {
// if map is partitioned, need to consider the
// partition key only preload data that belongs
// to this partition.
map.put(key, value);
preloadMap.put(key, value);
shouldRecordPreloadStatus = true;
} else {
// ignore this entry, because it does not belong to
// this partition.
}
if (shouldRecordPreloadStatus) {
System.out.println("record preload status. map = " +
this.ivBackingMapName + ", preloadStatusKey = " +
preloadStatusKey + ", current dataChunkIndex ="
+ dataChunkIndex);
if (dataChunkIndex == numberOfDataChunk) {
System.out.println("record preload status. map = " +
this.ivBackingMapName + ", preloadStatusKey = " +
preloadStatusKey + ", mark complete =" +
preloadCompleteMark);
// means we are at the lastest data chunk, if commit
// successfully, record preload complete.
// at this point, the preload is considered to be done
// use -99 as special mark for preload complete status.
preloadStatusMap.get(preloadStatusKey);
// a put follow a get will become update if the get
// return an object, otherwise, it will be insert.
preloadStatusMap.put(preloadStatusKey, new
Integer(preloadCompleteMark));
} else {
// record preloaded current dataChunkIndex into
// preloadStatusMap a put follow a get will become
// update if teh get return an object, otherwise, it
// will be insert.
preloadStatusMap.get(preloadStatusKey);
preloadStatusMap.put(preloadStatusKey, new
Integer(dataChunkIndex));
}
}
session.commit();
tranActive = false;
// to simulate preloading large amount of data
// put this thread into sleep for 30 secs.
// The real app should NOT put this thread to sleep
Thread.sleep(10000);
} catch (Throwable e) {
e.printStackTrace();
throw new LoaderException("preload failed with
exception: " + e, e);
} finally {
if (tranActive && session != null) {
try {
session.rollback();
} catch (Throwable e1) {
// preload ignoring exception from rollback
}
}
}
}
// at this point, the preload is considered to be done for sure
// use -99 as special mark for preload complete status.
// this is a insurance to make sure the complete mark is set.
// besides, when partitioning, each partition does not know when
// is its last data chunk. so the following block serves as the
// overall preload status complete reporting.
System.out.println("Overall preload status complete -> record
preload status. map = " + this.ivBackingMapName + ",
preloadStatusKey = " + preloadStatusKey + ", mark complete =" +
preloadCompleteMark);
session.begin();
preloadStatusMap.get(preloadStatusKey);
// a put follow a get will become update if teh get return an object,
// otherwise, it will be insert.
preloadStatusMap.put(preloadStatusKey, new Integer(preloadCompleteMark));
session.commit();
ivMap = preloadMap;
} catch (Throwable e) {
e.printStackTrace();
throw new LoaderException("preload failed with exception: " + e, e);
}
}
}
}
ReplicaPreloadController インターフェースの実装をサポートするには、プリロード状況マップを使用する必要があります。preloadMap メソッドは、常にプリロード状況マップに保管されているプリロード状況を最初に確認し、データがキャッシュにプッシュされた場合には、プリロード状況マップのプリロード状況を更新する必要があります。checkPreloadStatus メソッドはプリロード状況マップからプリロード状況を取得することができ、プリロード状況を判別して、その状況を呼び出し元に返します。プリロード状況マップは、レプリカ・プリロード・コントローラー Loader を持つ他のマップと同じ mapSet に置かれている必要があります。