Apache Spark 2.0ジョブは完了するまでに長い時間がかかります

Apache Spark 2.0ジョブは完了するまでに長い時間がかかります

現象

Apache Spark 2.x を使用すると、Spark ジョブがすべて完了しているにもかかわらず、プログラムがまだ実行されているという現象が発生することがあります。たとえば、Spark SQL を使用していくつかの SQL を実行すると、最終的に大量のファイルが生成されます。次に、この SQL のすべての Spark ジョブが実際に完了まで実行されているが、このクエリ ステートメントはまだ実行中であることがわかります。ログから、ドライバーノードがタスクによって生成されたファイルを 1 つずつ最終テーブルのディレクトリに移動していることがわかります。この現象は、ジョブが大量のファイルを生成する場合に発生しやすくなります。この記事では、この問題を解決する方法を紹介します。

なぜこのような現象が起こるのでしょうか?

Spark 2.x は Hadoop 2.x を使用します。生成されたファイルを HDFS に保存すると、次のように、FileOutputCommitter を使用する saveAsHadoopFile が最終的に呼び出されます。

問題は、Hadoop 2.x の FileOutputCommitter 実装にあります。FileOutputCommitter には、commitTask と commitJob という注目すべき 2 つのメソッドがあります。 Hadoop 2.x の FileOutputCommitter 実装では、mapreduce.fileoutputcommitter.algorithm.version パラメータによって commitTask と commitJob の動作が制御されます。具体的なコードは次のとおりです (便宜上、無関係な記述は削除しました。完全なコードは FileOutputCommitter.java にあります)。

ご覧のとおり、commitTask メソッドには、条件判断 algorithmVersion == 1 があります。これは、mapreduce.fileoutputcommitter.algorithm.version パラメータの値で、デフォルトは 1 です。このパラメータが 1 の場合、Task が完了すると、Task によって一時的に生成されたデータは、タスクの対応するディレクトリに移動され、その後、commitJob が呼び出されたときに最終ジョブ出力ディレクトリに移動されます。Hadoop 2.x でのこのパラメータのデフォルト値は 1 です。このため、ジョブは完了しているように見えますが、プログラムはまだデータを移動しているため、ジョブ全体が完了していません。最終的に、commitJob 関数は Spark のドライバーによって実行されるため、実行が遅くなる理由があります。

また、 mapreduce.fileoutputcommitter.algorithm.versionパラメータの値を 2 に設定すると、commitTask の実行時に mergePaths メソッドが呼び出され、タスクによって生成されたデータがタスクの一時ディレクトリからプログラムによって最終的に生成されたディレクトリに直接移動されることがわかります。 commitJob を実行する場合、データを直接移動する必要がないため、当然デフォルト値よりもはるかに高速になります。

Hadoop 2.7.0 より前のバージョンでは、プログラムがこの値を 2 に制限しないため、mapreduce.fileoutputcommitter.algorithm.version パラメータを 1 以外の値に設定することでこれを実現できることに注意してください。ただし、Hadoop 2.7.0 以降では、mapreduce.fileoutputcommitter.algorithm.version パラメータの値は 1 または 2 にする必要があります。詳細については、MAPREDUCE-4815 を参照してください。

Sparkでこのパラメータを設定する方法

問題が見つかりました。プログラムで解決できます。いくつかの方法があります:

  • conf/spark-defaults.confspark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2直接設定します。これはグローバルに影響します。
  • Spark プログラムで直接設定します (spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2"))。これはジョブ レベルです。
  • Dataset API を使用して HDFS にデータを書き込む場合は、 dataset.write.option("mapreduce.fileoutputcommitter.algorithm.version", "2") を設定できます。

ただし、Hadoop バージョンが 3.x の場合、mapreduce.fileoutputcommitter.algorithm.version パラメータのデフォルト値はすでに 2 に設定されています。詳細については、MAPREDUCE-6336 および MAPREDUCE-6406 を参照してください。

このパラメータはパフォーマンスに多少の影響を与えるため、Spark 2.2.0 では、このパラメータは Spark 構成ドキュメントconfiguration.htmlに記録されています。詳細については、SPARK-20107 を参照してください。

要約する

以上、Apache Spark 2.0についてご紹介しました。お役に立てれば幸いです。

以下もご興味があるかもしれません:
  • Spark と Scala を使用して Apache アクセス ログを分析する方法
  • 2018 年にリリースされる Apache Spark 2.4 の新機能は何ですか?

<<:  Mysql5.7 のグループ連結関数を使用するときにデータが切り捨てられる問題に対する完璧な解決策

>>:  JSでES6クラスの使い方をすぐにマスター

推薦する

ドラッグ効果を実現するための js オブジェクト指向メソッド

この記事では、ドラッグアンドドロップをJSオブジェクト指向で実装するための具体的なコードを参考までに...

Mysql5.7でのスケジュールバックアップの実装

1. MySQL インストール パス D:\xxx\MYSQL\MySQL Workbench CE...

MySQLの文字列インターセプト関連関数の概要

この記事では、MySQL の文字列インターセプト関連の機能を紹介します。具体的な内容は以下のとおりで...

MySQL スロークエリ pt-query-digest スロークエリログの分析

1. はじめにpt-query-digest は、MySQL のスロー クエリを分析するためのツール...

Vue カスタム箇条書きボックス効果 (確認ボックス、プロンプトボックス)

この記事の例では、参考のためにVueカスタムポップアップ効果の具体的なコードを共有しています。具体的...

Vueコンポーネントの基本のまとめ

コンポーネントの基本1 コンポーネントの再利用コンポーネントは再利用可能な Vue インスタンスです...

MySQL シャーディング入門ガイド

序文リレーショナル データベースは、システムのボトルネックになる可能性が高くなります。単一のマシンの...

MySQLでインデックスエラーが発生する状況について簡単に説明します

以下に、トレーニング機関からのヒントと私自身の要約をいくつか示します。以下のインデックスの内容を説明...

最小限のルートファイルシステムを構築するためにbusyboxを移植するための詳細な手順

Busybox: 小さなコマンドが詰まったスイスアーミーナイフ。ステップ1: ディレクトリ構造を作成...

変換を使用して純粋な CSS ポップアップ メニューを実装するためのサンプル コード

序文トップメニューを作成する場合、ポップアップのセカンダリメニューを作成する必要があります。 以前の...

リアクトルーティングガード(ルーティングインターセプション)の実装

React は Vue とは異なります。ルートにメタ文字を設定することでルートインターセプションを実...

Nginx ドメイン転送の使用シナリオ コード例

シナリオ 1: サーバーの制限により、外部に開かれているポートは 1 つだけですが、別の外部ネットワ...

JavaScript セレクター関数 querySelector および querySelectorAll

目次1. querySelectorは単一の要素を照会する1. ドキュメントインスタンスの呼び出し2...

VueはElement el-uploadコンポーネントを使用してピットに足を踏み入れます

目次1. 基本的な使い方2. 画像量の制御3. 画像形式の制限/複数の画像を選択可能補足: vueプ...

反応ルーティングでパラメータを渡すいくつかの方法についての簡単な説明

最初のパラメータ渡し方法は、動的ルーティングパラメータ渡しです。リンクのパス属性を設定することで、ル...