Stormは分散ストリーム処理システムであり、すべてのタプルが正常に処理されることを保証するためにアンカーとackメカニズムを使用しています。タプルにエラーが発生した場合、再送信することができますが、タプルのエラーが一度だけ処理されるようにするにはどうすればよいのでしょうか?Stormは、この問題を解決するために使用されるトランザクションコンポーネントのトランザクショントポロジーのセットを提供します。
I. 一貫性サービスの設計
嵐どのようにそのタプル並列処理を実現するだけでなく、トランザクションを確保するためです。このセクションでは、単純なトランザクションの実装方法から始まり、徐々にトランザクショントポロジーの原則につながります。
1.シンプルなデザインI:強力なシーケンシャル・フロー
タプルが一度だけ処理されることを保証する最も簡単な方法は、タプル・ストリームを強く順次的なものにして、一度に1つのタプルのみを処理し、1から始めて各タプルにidを順次追加することです。タプルを処理するとき、処理に成功したタプルのidと計算結果がデータベースに格納されます。次のタプルが到着したら、そのidとデータベースのidを比較します。もし同じなら、そのタプルは正常に処理され、無視されたことを意味します。もし異なるなら、強い順次性に従って、そのタプルは処理されていないことを意味し、そのidと計算結果はデータベースに更新されます。
メッセージの総数をカウントする例を見てみましょう。各タプルについて、データベースに保存されているidが現在のタプルidと異なる場合、データベース内のメッセージの総数が1増加し、データベース内の現在のタプルidの値が更新されます。図に示すように
しかし、このメカニズムではシステムは一度に1つのタプルのみを処理することになり、分散コンピューティングを実現することはできません。
2、シンプルな設計II:強力な注文バッチフロー
バッチ内のタプルは並列処理できます。
バッチが一度だけ処理されるようにするため、バッチIDがデータベースに格納されることを除けば、メカニズムは前節と同様。バッチIDがデータベースに格納される以外は、まずバッチの中間計算結果がローカル変数に格納され、バッチ内のすべてのタプルが処理された後にバッチIDが決定され、中間計算結果がデータベースIDと異なる場合はデータベースに更新されます。
バッチ内のすべてのタプルが処理されたことを確認するには?図のように、Stormが提供するCoordinateBoltを利用できます:
しかし、強力な逐次バッチフローにも限界があり、一度に処理できるバッチは1つだけで、バッチを並列化することはできません。真の分散トランザクション処理を実現するには、stormが提供するTransactional Topologyを使用することができます。ここではまず、CoordinateBoltの原理を詳しく説明します。
3、座標ボルトの原理
CoordinateBoltの正確な原理は以下の通りです:
- 実際に計算を行うボルトは、外側でCoordinateBoltをカプセル化します。実際にタスクを実行するボルトは、リアルボルトと呼ばれます。
- それぞれのCoordinateBoltは2つの値を記録します:どのタスクが私にタプルを送ってきたか、そしてどのタプルに情報を送りたいか。
- すべてのタプルが送信された後、CoordinateBoltは、emitDirectと呼ばれる特別なストリームによって、このタスクにタプルを送信したすべてのタスクに、このタスクに送信したタプルの数を伝えます。 下流のタスクは、この数と受信したタプルの数を比較し、等しければ、すべてのタプルを処理したことになります。等しい場合、すべてのタプルが処理されたことになります。
- 下流のCoordinateBoltは、上記の手順を繰り返して下流に通知します。
プロセス全体を図に示します:
CoordinateBoltは主に2つのシナリオで使用されます:
- Transactional Topology
- トランザクション・トポロジー
4.トランザクショナルトポロジー
Stormが提供するTransactional Topologyでは、バッチ計算をprocessとcommitの2つのフェーズに分割します。processフェーズでは、連続性を保証せずに複数のバッチを同時に処理することができます。バッチが正常に投入されると、2番目のバッチは投入できません。
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields("word"), PARTITION_TAKE_PER_BATCH)PARTITION_TAKE_PER_BATCH
TransactionalTopologyBuilder builder = newTransactionalTopologyBuilder("global-count","spout", spout, 3);
builder.setBolt("sum", newUpdateGlobalCount()).globalGrouping("partial-count");
TransactionalTopologyBuilder は、合計 4 つのパラメータを受け取ります。
- Spoutこのトポロジーの id
- TransactionalSpoutTridentの並列性
- TransactionalSpoutの並列性。
以下はBatchCountの定義です:
public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count++;
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“id“, “count“));
}
}
transactionAttemptには、transaction idとattempt idの2つの値が含まれます。transaction idは、前述のようにバッチ内の各タプルに一意であり、バッチが何回リプレイされても同じです。attempt idは各バッチに一意ですが、リプレイされた後の同じバッチでは、attempt idはリプレイとは異なります。idは各バッチに固有のidですが、同じバッチでは、リプレイ後のattempt idはリプレイと同じではありません。attempt idをreplay-timesと理解すると、stormはこのidを使用して、バッチで生成されたタプルの異なるバージョンを区別します。
executeメソッドはバッチ内の各タプルに対して1回実行されるので、バッチの計算状態をローカル変数に保持しておく必要があります。この例では、executeメソッドでタプルの数をインクリメントしています。
最後に、finishBatchメソッドは、ボルトが特定のバッチのすべてのタプルを受信したときに呼び出されます。この例のBatchCountクラスは、この時点でローカルカウントを出力ストリームに出力します。
以下は UpdateGlobalCount クラスの定義です:
public static class UpdateGlobalCount extends BaseTransactionalBolt
implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;
int _sum = 0;
@Override
public void prepare(Map conf, TopologyContext context,
BatchOutputCollector collector, TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}
@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}
@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
newnewval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“id“, “sum“));
}
}
UpdateGlobalCount の finishBatch メソッドでは、現在のトランザクション ID がデータベースに保存されている ID と比較されます。同じ場合は、バッチは無視され、異なる場合は、このバッチの計算結果が合計結果に追加され、データベースが更新されます。
Transactional Topolgyは以下のように実行されます:
トランザクショナルトポロジーの特徴を以下にまとめます:
- BatchBoltは、バッチ化されたタプルを処理し、各タプルに対してexecuteメソッドを呼び出し、バッチ全体が処理されるとfinishBatchメソッドを呼び出します。
- BatchBolt が Committer としてマークされている場合、finishBolt メソッドを呼び出せるのはコミットフェーズの間だけです。バッチのコミットフェーズは、前のバッチが正常にコミットされた後にのみ実行されることがストームによって保証されています。そして、トポロジー内のすべてのボルトがコミットされるまで再試行されます。
トライデントの紹介
Tridentはタプルを処理のためにバッチにストリーミングし、APIはこれらのバッチの処理をカプセル化し、タプルが一度だけ処理されるようにします。バッチ処理の結果はTridentStateオブジェクトに格納されます。
Tridentの取引原則はここでは詳述しませんので、関心のある読者は各自の情報を参照してください。
/-----!
///-ール