Spark RDD をデータフレームに変換し、それを MySQL に書き込む例

Spark RDD をデータフレームに変換し、それを MySQL に書き込む例

DataframeはSpark 1.3.0で導入された新しいAPIで、Sparkで大規模な構造化データを処理できるようになります。従来のRDD変換方式よりも使いやすく、計算性能も2倍高速化されていると言われています。 Sparkは、オフラインバッチ処理またはリアルタイムコンピューティングでRDDをDataFrameに変換し、簡単なSQLコマンドでデータを操作できます。SQLに精通している人にとっては、変換とフィルタリングのプロセスは非常に便利で、より高レベルのアプリケーションを実現することもできます。たとえば、リアルタイムでは、Kafkaのトピック名とSQLステートメントが渡され、バックグラウンドで構成されたコンテンツフィールドを読み取ってクラスに反映し、入力SQLと出力SQLを使用してリアルタイムデータを計算します。この場合、Spark Streamingを知らない人でも、リアルタイムコンピューティングのメリットを簡単に享受できます。

次の例は、ローカルファイルをRDDに読み込み、それを暗黙的にDataFrameに変換してデータをクエリし、最後にMySQLテーブルに追加形式で書き込むプロセスです。Scalaコードの例は次のとおりです。

java.sql.Timestamp をインポートする
org.apache.spark.sql.{SaveMode, SQLContext} をインポートします。
org.apache.spark.{SparkContext, SparkConf} をインポートします。
オブジェクト DataFrameSql {
 case class memberbase(data_date:Long,memberid:String,createtime:Timestamp,sp:Int)はSerializableを拡張します{
 def toString をオーバーライドします: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp)
 }
 def main(args:Array[String]): 単位 = {
 val conf = 新しい SparkConf()
 conf.setMaster("local[2]")
// ----------------------
 //パラメータ spark.sql.autoBroadcastJoinThreshold はテーブルをブロードキャストするかどうかを設定します。デフォルトは 10M で、無効にするには -1 に設定します //spark.sql.codegen は SQL を Java バイトコードにプリコンパイルするかどうかを設定します。長い SQL や頻繁な SQL は最適化効果があります //spark.sql.inMemoryColumnarStorage.batchSize は一度に処理される行数です。oom に注意してください
 //spark.sql.inMemoryColumnarStorage.compressed は、メモリ内の列ストレージを圧縮する必要があるかどうかを設定します// ----------------------
 conf.set("spark.sql.shuffle.partitions","20") //デフォルトのパーティションは200です conf.setAppName("dataframe test")
 val sc = 新しい SparkContext(conf)
 val sqc = 新しいSQLContext(sc)
 val ac = sc.accumulator(0,"失敗数")
 val ファイル = sc.textFile("src\\main\\resources\\000000_0")
 val log = file.map(行 => 行.split(" ")).filter(行 =>
  if (line.length != 4) { // 単純なフィルターを実行する ac.add(1)
  間違い
  } それ以外の場合は true)
  .map(行 => memberbase(行(0).toLong, 行(1),Timestamp.valueOf(行(2)), 行(3).toInt))
 // 方法 1: 暗黙的な変換を使用する import sqc.implicits._
 val dftemp = log.toDF() // 変換/*
  方法 2: createDataFrame メソッドを使用して、内部的にリフレクションを使用してフィールドとその型を取得します。val dftemp = sqc.createDataFrame(log)
  */
 val df = dftemp.registerTempTable("memberbaseinfo")
 /*val sqlcommand = "date_format(createtime,'yyyy-MM')をmmとして、count(1)をnumsとして選択" +
  "memberbaseinfo から date_format(createtime,'yyyy-MM') でグループ化" +
  「数字の降順、mm の昇順で並べ替え」*/
 val sqlcommand="memberbaseinfo から * を選択"
 val sel = sqc.sql(sqlコマンド)
 val prop = 新しい java.util.Properties
 prop.setProperty("ユーザー","etl")
 prop.setProperty("パスワード","xxx")
 //DataFrameWriterを呼び出してmysqlにデータを書き込む
 val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // テーブルが存在しない可能性があります println(ac.name.get+" "+ac.value)
 sc.stop()
 }
}

上記コードの textFile 内のサンプル データは次のとおりです。データは Hive から取得されます。フィールド情報は、パーティション番号、ユーザー ID、登録時間、およびサードパーティ番号です。

20160309 45386477 2012-06-12 20:13:15 901438
20160309 45390977 2012-06-12 22:38:06 901036
20160309 45446677 2012-06-14 21:57:39 901438
20160309 45464977 2012-06-15 13:42:55 901438
20160309 45572377 2012-06-18 14:55:03 902606
20160309 45620577 2012-06-20 00:21:09 902606
20160309 45628377 2012-06-20 10:48:05 901181
20160309 45628877 2012-06-20 11:10:15 902606
20160309 45667777 2012-06-21 18:58:34 902524
20160309 45680177 2012-06-22 01:49:55 
20160309 45687077 2012-06-22 11:23:22 902607

ここでのフィールド タイプのマッピング、つまり、公式 Web サイトのスクリーンショットに示されているように、ケース クラスからデータフレームへのマッピングに注意してください。

詳細については、公式ドキュメント「Spark SQLおよびDataFrameガイド」を参照してください。

Spark RDD をデータフレームに変換して MySQL に書き込む上記の例が、私が皆さんと共有したいことのすべてです。これが皆さんの参考になれば幸いです。また、123WORDPRESS.COM をサポートしていただければ幸いです。

以下もご興味があるかもしれません:
  • SparkSQLはIDEAを使用して、DataFrameとDataSetをすぐに使い始めることができます。
  • DataFrame: SparkSql を介して Scala クラスを DataFrame に変換する方法
  • pyspark.sql.DataFrame と pandas.DataFrame 間の変換例
  • DataFrameとSparkSqlの値の誤解についての簡単な説明
  • Spark SQL 2.4.8 データフレームを操作する2つの方法

<<:  DockerでSpring Bootアプリケーションを実行する方法

>>:  Vueフィルターの使い方

推薦する

nginx ベースのブラウザネゴシエーションキャッシュプロセスの詳細な説明

この記事は主に、nginx に基づいてブラウザネゴシエーションキャッシュを設定する詳細なプロセスを紹...

JavaScript ドキュメント オブジェクト モデル DOM

目次1. JavaScriptはページ内のすべてのHTML要素を変更できる1. IDでHTML要素を...

フォームの読み取り専用属性と無効な属性についての簡単な説明

フォーム内の読み取り専用および無効な属性1. 読み取り専用:サーバーは、ユーザーがデータを変更するこ...

CSS3は子供のころの紙飛行機を実現する

今日は折り紙飛行機(飛べる飛行機)を作ります基本的にすべてCSSで実装されており、JSはごく一部に過...

Mac で docker と kubectl の自動補完コマンドを追加する方法

kubectl の紹介kubectl は、k8s クラスターを操作するためのコマンドライン ツールで...

Webpackプラグインを書いてnpmに公開するための80行のコード

1. はじめに最近、 Webpackの原理を勉強しています。これまでは Webpack の設定方法し...

nginx が複数のプロキシ層を通過して実際の送信元 IP を取得するプロセスの詳細な説明

質問Nginx は $remote_addr を実際の IP アドレスとして受け取りますが、実際には...

フロントエンドセキュリティの詳細な説明: JavaScript の http ハイジャック対策と XSS

目次HTTP ハイジャック、DNS ハイジャック、XSS HTTPハイジャックDNSハイジャックXS...

MySQLビューの原理と使用法の詳細な説明

この記事では、例を使用して MySQL ビューの原理と使用方法を説明します。ご参考までに、詳細は以下...

MySQLでの少し複雑な使用例コード

序文MySQL の構文は誰にとっても難しいものではないと思いますが、この記事では主に MySQL の...

Linux でファイルを削除するさまざまな方法の効率の比較

Linux で大量のファイルを削除する効率をテストします。まず500,000個のファイルを作成する$...

JavaScript の構成と継承の説明

目次1. はじめに2. プロトタイプチェーン継承3. コンストラクタの継承4. 組み合わせ継承1. ...

MySQL 最適化のケーススタディ

1. 背景Youzan の各 OLTP データベース インスタンスには、実行時間が特定のしきい値を超...

Linux で開いているファイルが多すぎる問題を解決する方法

原因は、プロセスが特定の時点でシステム制限を超える数のファイルと通信リンクを開くことです。 システム...