blog

大きなステートを使うときのFlinkのちょっとした最適化

RocksDBの状態ディスクを選択する際のFlinkソースコードの問題。 いくつかの解決策と、各オプションの利点と欠点の分析。 Flinkは、状態が比較的大きい場合、現在唯一の利用可能なオプションの様...

Nov 19, 2020 · 9 min. read
シェア

この記事を通して、あなたは以下の点を得ることができます:

  • Flinkの中で大きな状態を使用する場合の設定方法は?
  • 一般的な負荷分散戦略にはどのようなものがありますか?
  • RocksDBの状態ディスクを選択する際のFlinkソースコードの問題。
  • 多くの解決策が提示され、それぞれの利点と欠点が分析されています。

なぜ最適化するのですか?

RocksDBはLSMツリーの原理に基づいたKVデータベースであり、LSMツリーのリード増幅の問題は非常に深刻であるため、ディスクに高いパフォーマンスが要求されます。しかし、クラスタによってはSSDではなく、普通のメカニカルハードディスクで構成されている場合もあります。 Flinkタスクが比較的大きく、状態アクセスが頻繁に行われる場合、メカニカルハードディスクのディスクIOが性能のボトルネックになる可能性があります。この場合、このボトルネックを解決するにはどうすればよいでしょうか?

複数のハードディスクを使用して負担を共有

RocksDBはデータの保存にメモリとディスクを使用し、状態が大きくなるとディスクのフットプリントも大きくなります。RocksDBへの読み込み要求が頻繁にあると、ディスクIOがFlinkタスクのボトルネックになります。

flink-conf.yamlのstate.backend.rocksdb.localdirパラメータで、RocksDBがディスク上に保存されているディレクトリを指定することを強くお勧めします。1つのTaskManagerに3つのスロットが含まれている場合、1つのサーバー上の3つの並列処理すべてがディスクへの頻繁な読み書きを引き起こし、3つの並列処理間で同じディスクioの競合が発生します。

幸いなことに、Flinkのstate.backend.rocksdb.localdirパラメータは複数のディレクトリを指定することができます。 一般的に、ビッグデータサーバは多くのハードディスクをマウントするので、同じTaskManagerの3つのスロットが異なるハードディスクを使用し、リソースの競合を減らすことが期待できます。具体的なパラメータ構成を以下に示します:

state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb

注:必ず複数の異なるディスクにディレクトリを設定してください。1つのディスクに複数のディレクトリを設定しないでください。

次の図はテスト中のディスクのIO使用率を示していますが、3つの大きなステートオペレータの並列性が3つのディスクに対応していることがわかります。これらの3つのディスクの平均IO使用率は約45%で、最も高いIO使用率はほぼ100%です。このことから、RocksDBをステートフルなバックエンドとして使用し、大きなステートを頻繁に読み書きする場合、ディスクIOのパフォーマンスが大量に消費されることがわかります。

上記は理想的な状況で、複数のRocksDBのローカルディスクディレクトリを設定する場合、Flinkは使用するディレクトリをランダムに選択するので、3つの並列で同じディレクトリを共有することが可能です。

次の図に示すように、2つの並列がsdbディスクを共有し、1つの並列がsdjディスクを使用しています。sdbディスクの平均IO使用率が91.6%に達していることがわかります。この時点で、sdbのディスクIOがFlinkタスク全体のボトルネックになることは確実で、sdbディスクに対応する2つの並列のスループットが大幅に低下し、Flinkタスク全体のスループットが低下します。

サーバーに多数のハードディスクがマウントされている場合、通常はこのような状況は発生しませんが、タスクの再起動後にスループットが低下している場合は、同じディスクを共有する複数の並列度が発生していないかどうかを確認することができます。

Flinkは複数の並列処理で同じディスクを共有すると問題が発生することがあります。

II.よく使われる負荷分散戦略

現象的には、RocksDBに12台のディスクが割り当てられているため、3台のディスクを使用する必要がある並列タスクは3台しかありませんが、2台の並列タスクが同じディスクを共有する可能性もありますし、3台の並列タスクが同じディスクを共有する可能性もあります。このようなFlinkタスクは、ディスクIOがボトルネックになりやすくなります。

ディスクを割り当てるための上記のポリシーは、実際には業界の負荷分散ポリシーです。一般的な負荷分散ポリシーは、ハッシュ、ランダム、ラウンドロビンです。

Hash

タスク自体は、複数のワーカーに圧力を分散させるために、ある種のハッシュポリシーを受けます。上記のシナリオでは、複数のスロットが使用するRocksDBディレクトリの圧力は複数のディスクで共有されます。しかし、ハッシュ競合が発生する可能性があります。つまり、並列性の異なる複数のFlinkがハッシュ後に同じhashCodeを持つか、ハードディスクの数を合計した後に同じハードディスクにhashCodeが割り当てられることになります。

Random

ランダム戦略は、各Flinkタスクに乱数を生成し、ワーカーに圧力をランダムに割り当てる、つまり、あるディスクに圧力をランダムに割り当てるというものです。しかし、乱数には衝突の可能性もあります。

Round Robin

回転戦略を理解するのは比較的簡単ですが、複数の労働者がデータを受信する順番にすることができます、RocksDBのディレクトリを使用するディレクトリ1、2番目のアプリケーションディレクトリを使用するディレクトリ2を適用するための最初の時間のためのFlinkのタスクは、することができます。この戦略は、この戦略を使用する場合は、すべてのハードディスクは、1のタスク数の最大差に割り当てられていることを確認され、タスク数の最も均等な分布です。

最小負荷ポリシー / 最小応答時間ポリシー

Workerがタスクを割り当てるレスポンスタイムを見ると、レスポンスタイムが短いということは、負荷能力が高く、より多くのタスクを割り当てる必要があります。上記のシナリオに対応して、各ディスクのIO使用率をチェックします。使用率が低いということは、ディスクIOが比較的空いていて、より多くのタスクを割り当てる必要があります。

ウェイトポリシーの指定

各作業員に異なるウェイト値を割り当て、ウェイト値が高いタスクほど多くのタスクを割り当て、一般的に割り当てタスク数はウェイト値に比例します。

例えば、Worker0のウェイトが2、Worker1のウェイトが1の場合、タスクを割り当てる際、Worker0はWorker1の2倍のタスクを割り当てようとします。RocksDBのローカルディレクトリにSSDとHDDの両方が含まれていない限り、一般的に同じサーバー上の各ハードドライブの負荷容量はあまり変わりません。

第三に、ソースコードの中でディスクはどのように割り当てられているのでしょうか?

オンラインでFlinkバージョン1.8.1を使用していますが、複数の並列が割り当てられているドライブもあれば、並列が1つも割り当てられていないドライブもあります。ほとんどの場合、各ハードディスクには1つのタスクしか割り当てられておらず、複数のタスクが割り当てられている可能性は低いため、ソースコードでハッシュやランダムを使用している確率は比較的高いと大胆に推測できます。

ラウンドロビン戦略を使用する場合、各ドライブに並列度が割り当てられた後にのみ、1つのドライブに2つのタスクが割り当てられるようになります。また、ラウンドロビン戦略では、割り当てられたドライブが連続していることが保証されます。

RocksDBStateBackendクラスのソースコードを直接見てみましょう:

/** Base paths for RocksDB directory, as initialized.
こちらが上記で設定した12 rocksdbのローカルdirだ。 */
private transient File[] initializedDbBasePaths;
/** The index of the next directory to be used from {@link #initializedDbBasePaths}.
次にdirのインデックスを使用したい場合、もしnextDirectory = 2,
initialisedDbBasePathsに2が含まれるディレクトリを、rocksdbのストレージ・ディレクトリとして使用する場合、initialisedDbBasePathsに2が含まれるディレクトリを、rocksdbのストレージ・ディレクトリとして使用することができる。 */
private transient int nextDirectory;
// lazyInitializeForJob メソッドの中で、この行は次に使うdirのインデックスを決定する。,
// initialisedDbBasePathsによると.length 乱数を生成する,
// もしinitialisedDbBasePathsが.length = 12,0-11 の範囲で乱数を生成する
nextDirectory = new Random().nextInt(initializedDbBasePaths.length);

単純なソースコードを分析した結果、ソースコードはdirをランダムに割り当てる戦略を使っていることがわかりました。ランダムな割り当ては衝突の確率が小さい。

IV.どの戦略を使うのがより理にかなっていますか?

ランダムやハッシュの方針は、タスク数が比較的多い場合、各ワーカーに基本的に同じ量のタスクを確実に割り当てることができますが、タスク数が比較的少ない場合、例えばランダムアルゴリズムで20個のタスクを10人のワーカーに割り当てた場合、一部のワーカーはタスクを割り当てられず、一部のワーカーは3、4個のタスクに割り当てられる可能性があります。では、ランダム戦略やハッシュ戦略では、rocksdbによるディスクの不均等な割り当てというペインポイントを解決できないので、ラウンドロビン戦略や最低負荷戦略ではどうでしょうか?

ローテーションポリシー

回転戦略は、上記の問題を次のように解決します:

// RocksDBStateBackend クラスは
private static final AtomicInteger DIR_INDEX = new AtomicInteger(0);
// nextDirectory DIRの割り当て戦略を以下のコードに変更した。_INDEX + 1,次に、dirの総数のバランスをとる。
nextDirectory = DIR_INDEX.getAndIncrement() % initializedDbBasePaths.length;

上記を使えば、ディスク0から順番にディスクを要求し、一度に次のディスクを使用するラウンドロビン・ポリシーを実装することができます。

問題点

Javaの静的変数はJVMレベルであり、各TaskManagerは別々のJVMに属しているため、TaskManagerは内部的にローテーション・ポリシーを保証しています。複数のTaskManagerが同じサーバー上で実行されている場合、それらはすべてインデックス0のディスクから開始するので、小さいインデックスのディスクはより頻繁に使用され、大きいインデックスのディスクはそれほど頻繁に使用されないかもしれません。

解決策 1:

DIR_INDEXが初期化されるとき、毎回0に初期化する代わりに乱数を生成することができます。これにより、以下の実装コードに示すように、インデックスが小さいディスクが毎回使用されることがなくなります:

// RocksDBStateBackend クラスは
private static final AtomicInteger DIR_INDEX = new AtomicInteger(new Random().nextInt(100));

しかし、上記のソリューションでは、ディスク競合の問題を完全に解決することはできません。 12個のディスクを持つ同じマシン上で、TaskManager0はインデックス0、1、2の3つのディスクを使用し、TaskManager1はインデックス1、2、3の3つのディスクを使用する可能性があります。その結果、TaskManagerは内部的にラウンドロビン・ポリシーを実装して負荷分散を確保しますが、グローバルには負荷が分散されません。

解決策 2:

グローバルなロードバランシングのためには、絶対的なロードバランシングを達成するために複数のTaskManagerが互いに通信する必要があります。これはサードパーティのストレージの助けを借りて行うことができ、例えばZookeeperでは、各サーバにznodeを生成します。 znodeの名前はhostでもipでもかまいません。 CuratorのDistributedAtomicInteger を使用して、現在のサーバに対応する znode に格納されている DIR_INDEX 変数を保持します。 どの TaskManager がディスクを要求しても、DistributedAtomicInteger を使用して、現在のサーバに対応する DIR_INDEX に 1 を追加することで、グローバルローテーションを実現できます。グローバルローテーションポリシーを実装することができます。

AtomicIntegerはTaskManager内でのラウンドロビンを保証するのみで、グローバルラウンドロビンを保証することはできません。グローバルラウンドロビンを実装したい場合は、Zookeeperなどを使って実装する必要があります。ローテーションポリシーの要件がより厳しい場合は、Zookeeperベースのローテーションポリシーを使用することができ、外部コンポーネントに依存したくない場合は、AtomicIntegerのみを使用して実現することができます。

最小負荷ポリシー

このアイデアは、TaskManagerが起動したときに、rocksdbローカルdirに対応するすべてのディスクの過去1分または5分の平均IO使用量を監視し、IO使用量の多いディスクをふるい落とし、平均IO使用量の少ないディスクを優先し、同時に、平均IO使用量の少ないディスクの場合は、まだローテーションポリシーを使用して実装する必要があります。

問題点

  • Flinkタスクが起動するとき、ディスクの現在のIO使用率しか取得できませんが、これは瞬間的な値です。
  • IO使用量を1分間収集するまで、Flinkタスクの開始を待つことはできません。
  • このIO使用量を得るために外部の監視システムに頼りたくないなら、一般性を考えてください。
  • すべてのドライブの直前のIO使用量がわかったとして、どのように判断すればいいでしょうか?
  • 平均IO使用率が低いディスクの場合、ローテーション・ポリシーを使用してこれを達成する必要があります。
  • IOの平均利用率は低く、10%の差を低いと見るか、20%、30%と見るかは簡単には判断できません。
  • また、新しいタスクによってディスク使用量に対する要件が異なるため、判断が難しいのです。

新しいアイデア

起動フェーズで使用される DistributedAtomicInteger は、ハードディスクの負荷圧力を捕捉せず、基本的に各ハードディスクで負荷がバランスされるようにします。ただし、タスクの開始後しばらくして、Flink タスクによってディスクの平均 IO 使用率が他のディスクに比べて非常に高くなった場合は、負荷の高いディスクから負荷の低いディスクにデータを移行することを選択できます。高負荷ドライブから低負荷ドライブへのデータ移行を選択できます。

まとめ

この論文では、Flinkが大きな状態を使用する際に発生する現在の問題を分析し、様々な解決策を示します。

現在のところ、ランダム、TaskManagerベースのラウンドロビン、Zookeeperベースのグローバルラウンドロビンの3つの戦略を実装しており、本番環境に適用する場合は、flink-conf.yamlファイルで直接戦略を設定することができます。これまでのところ、Zookeeperベースのグローバルラウンドロビン戦略は非常に優れています。後でコミュニティに還元するようにします。

Read next

「インターネット・アーキテクチャ」 ソフトウェア・アーキテクチャ - ソフトウェア・システム設計

このキーボードを持って、とにかくやってみてください。 コーディングを始める前に、深く考え、複数の関係者を考慮しましょう。 要件の中に文章があります。 ビジネスの目的を達成するために、ユーザーのニーズを満たすこと。代わりに、開発者自身の曲がった、高レベルの設計者は、ソフトウェアのユーザーのニーズを満たすために設計するだけでなく、すべてのコストではなく、最先端のソフトウェアから設計をコード化することです、そこには最高の、唯一の最も適切な。...

Nov 19, 2020 · 6 min read