DataframeはSpark 1.3.0で導入された新しいAPIで、Sparkで大規模な構造化データを処理できるようになります。従来のRDD変換方式よりも使いやすく、計算性能も2倍高速化されていると言われています。 Sparkは、オフラインバッチ処理またはリアルタイムコンピューティングでRDDをDataFrameに変換し、簡単なSQLコマンドでデータを操作できます。SQLに精通している人にとっては、変換とフィルタリングのプロセスは非常に便利で、より高レベルのアプリケーションを実現することもできます。たとえば、リアルタイムでは、Kafkaのトピック名とSQLステートメントが渡され、バックグラウンドで構成されたコンテンツフィールドを読み取ってクラスに反映し、入力SQLと出力SQLを使用してリアルタイムデータを計算します。この場合、Spark Streamingを知らない人でも、リアルタイムコンピューティングのメリットを簡単に享受できます。 次の例は、ローカルファイルをRDDに読み込み、それを暗黙的にDataFrameに変換してデータをクエリし、最後にMySQLテーブルに追加形式で書き込むプロセスです。Scalaコードの例は次のとおりです。 java.sql.Timestamp をインポートする org.apache.spark.sql.{SaveMode, SQLContext} をインポートします。 org.apache.spark.{SparkContext, SparkConf} をインポートします。 オブジェクト DataFrameSql { case class memberbase(data_date:Long,memberid:String,createtime:Timestamp,sp:Int)はSerializableを拡張します{ def toString をオーバーライドします: String="%d\t%s\t%s\t%d".format(data_date,memberid,createtime,sp) } def main(args:Array[String]): 単位 = { val conf = 新しい SparkConf() conf.setMaster("local[2]") // ---------------------- //パラメータ spark.sql.autoBroadcastJoinThreshold はテーブルをブロードキャストするかどうかを設定します。デフォルトは 10M で、無効にするには -1 に設定します //spark.sql.codegen は SQL を Java バイトコードにプリコンパイルするかどうかを設定します。長い SQL や頻繁な SQL は最適化効果があります //spark.sql.inMemoryColumnarStorage.batchSize は一度に処理される行数です。oom に注意してください //spark.sql.inMemoryColumnarStorage.compressed は、メモリ内の列ストレージを圧縮する必要があるかどうかを設定します// ---------------------- conf.set("spark.sql.shuffle.partitions","20") //デフォルトのパーティションは200です conf.setAppName("dataframe test") val sc = 新しい SparkContext(conf) val sqc = 新しいSQLContext(sc) val ac = sc.accumulator(0,"失敗数") val ファイル = sc.textFile("src\\main\\resources\\000000_0") val log = file.map(行 => 行.split(" ")).filter(行 => if (line.length != 4) { // 単純なフィルターを実行する ac.add(1) 間違い } それ以外の場合は true) .map(行 => memberbase(行(0).toLong, 行(1),Timestamp.valueOf(行(2)), 行(3).toInt)) // 方法 1: 暗黙的な変換を使用する import sqc.implicits._ val dftemp = log.toDF() // 変換/* 方法 2: createDataFrame メソッドを使用して、内部的にリフレクションを使用してフィールドとその型を取得します。val dftemp = sqc.createDataFrame(log) */ val df = dftemp.registerTempTable("memberbaseinfo") /*val sqlcommand = "date_format(createtime,'yyyy-MM')をmmとして、count(1)をnumsとして選択" + "memberbaseinfo から date_format(createtime,'yyyy-MM') でグループ化" + 「数字の降順、mm の昇順で並べ替え」*/ val sqlcommand="memberbaseinfo から * を選択" val sel = sqc.sql(sqlコマンド) val prop = 新しい java.util.Properties prop.setProperty("ユーザー","etl") prop.setProperty("パスワード","xxx") //DataFrameWriterを呼び出してmysqlにデータを書き込む val dataResult = sqc.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test","t_spark_dataframe_test",prop) // テーブルが存在しない可能性があります println(ac.name.get+" "+ac.value) sc.stop() } } 上記コードの textFile 内のサンプル データは次のとおりです。データは Hive から取得されます。フィールド情報は、パーティション番号、ユーザー ID、登録時間、およびサードパーティ番号です。 20160309 45386477 2012-06-12 20:13:15 901438 20160309 45390977 2012-06-12 22:38:06 901036 20160309 45446677 2012-06-14 21:57:39 901438 20160309 45464977 2012-06-15 13:42:55 901438 20160309 45572377 2012-06-18 14:55:03 902606 20160309 45620577 2012-06-20 00:21:09 902606 20160309 45628377 2012-06-20 10:48:05 901181 20160309 45628877 2012-06-20 11:10:15 902606 20160309 45667777 2012-06-21 18:58:34 902524 20160309 45680177 2012-06-22 01:49:55 20160309 45687077 2012-06-22 11:23:22 902607 ここでのフィールド タイプのマッピング、つまり、公式 Web サイトのスクリーンショットに示されているように、ケース クラスからデータフレームへのマッピングに注意してください。 詳細については、公式ドキュメント「Spark SQLおよびDataFrameガイド」を参照してください。 Spark RDD をデータフレームに変換して MySQL に書き込む上記の例が、私が皆さんと共有したいことのすべてです。これが皆さんの参考になれば幸いです。また、123WORDPRESS.COM をサポートしていただければ幸いです。 以下もご興味があるかもしれません:
|
<<: DockerでSpring Bootアプリケーションを実行する方法
フロントエンドの担当者であれば、面接でも仕事中でも、「CSS を使用して中央揃えにする」という効果に...
前回のキャンバス ゲーム シリーズへようこそ: 《VUEがFlappy Birdを実装しました〜〜〜...
私はかつて、ウェブサイトを一度も構築したことのない人々が、初心者向けのウェブサイト構築方法に関する私...
目次イベントとは簡単な例イベントをバインドする方法フレームワーク内のイベントイベントオブジェクトイベ...
最近、同社はitpubを皮切りに、コーポレートウェブサイト傘下の全サイトの評価を開始した。そのために...
MySQL 一時テーブルは、一時的なデータを保存する必要がある場合に非常に便利です。一時テーブルは現...
みなさんこんにちは。私と同じように混乱している方はいらっしゃいませんか。CSS は簡単に始められます...
目次導入なぜわざわざ?落とし穴のあるコミュニティソリューション(Vue を例に挙げる)現時点では良い...
序文ご存知のとおり、bash (Bourne-Gain Shell) は、ほとんどの Linux デ...
1. 何ですかreactアプリケーションでは、イベント名はキャメルケース形式で記述されます。たとえ...
最初にsudo suコマンドを使用して root アカウントに切り替えることをお勧めします。そうしな...
目次背景解決策1: 古いデータをバックアップするオプション2: テーブルを分割する解決策3: tid...
昨日プロジェクトを書いていた時に、MySQL の派生版である Percona を使う必要があったので...
Centos7 の yum ソースには、mysql の代わりに mariaDB が使用されているため...
LinuxリモートMySQLデータベースの展開、参考までに、具体的な内容は次のとおりです。 1.0 ...