概要: この記事では、Avro データをシリアル化して生成し、FlinkSQL を使用して解析する方法を説明します。 Avro 公式ドキュメント、http://avro.apache.org/docs/current/index.html。 Avroの紹介Avroはデータシリアル化システムです 提供内容:
技術的背景インターネットの急速な発展に伴い、クラウドコンピューティング、ビッグデータ、人工知能AI、モノのインターネットなどの最先端技術は、現代の主流のハイテクとなっています。電子商取引サイト、顔認識、自動運転車、スマートホーム、スマートシティなどは、人々の衣食住や交通を促進するだけでなく、さまざまなシステムプラットフォームによって常に大量のデータが収集、整理、分析されています。データの低遅延、高スループット、セキュリティを確保することが特に重要です。Apache Avro自体は、バイナリ転送用のスキーマを通じてシリアル化されており、一方では高速データ転送を、他方ではデータセキュリティを確保しています。Avroは現在、さまざまな業界でますます広く使用されています。Avroデータをどのように処理および解析するかは特に重要です。この記事では、シリアル化によってAvroデータを生成し、FlinkSQLを使用して解析する方法を説明します。 この記事は、Avro 解析のデモです。現在、FlinkSQL は単純な Avro データ解析にのみ適しており、複雑にネストされた Avro データはまだサポートされていません。 シーン紹介この記事では主に以下の3つのポイントを紹介します。
前提条件
手順1. 新しいAvro Mavenプロジェクトを作成し、POM依存関係を構成する pom ファイルの内容は次のとおりです。 <?xml バージョン="1.0" エンコーディング="UTF-8"?> <プロジェクト xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <モデルバージョン>4.0.0</モデルバージョン> <グループID>com.huawei.bigdata</グループID> <artifactId>アヴロデモ</artifactId> <バージョン>1.0-SNAPSHOT</バージョン> <依存関係> <依存関係> <グループ ID>org.apache.avro</グループ ID> <artifactId>アブロ</artifactId> <バージョン>1.8.1</バージョン> </依存関係> <依存関係> <groupId>ジュニット</groupId> <artifactId>junit</artifactId> <バージョン>4.12</バージョン> </依存関係> </依存関係> <ビルド> <プラグイン> <プラグイン> <グループ ID>org.apache.avro</グループ ID> <artifactId>Avro-Maven プラグイン</artifactId> <バージョン>1.8.1</バージョン> <処刑> <実行> <phase>ソースを生成する</phase> <目標> <goal>スキーマ</goal> </目標> <構成> <ソースディレクトリ>${project.basedir}/src/main/avro/</ソースディレクトリ> <出力ディレクトリ>${project.basedir}/src/main/java/</出力ディレクトリ> </構成> </実行> </処刑> </プラグイン> <プラグイン> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <構成> <ソース>1.6</ソース> <target>1.6</target> </構成> </プラグイン> </プラグイン> </ビルド> </プロジェクト> 注: 上記の pom ファイルは、自動的に生成されるクラスのパス、つまり ${project.basedir}/src/main/avro/ と ${project.basedir}/src/main/java/ を構成します。この構成の後、mvn コマンドを実行すると、プラグインはこのディレクトリの avsc スキーマからクラス ファイルを自動的に生成し、後者のディレクトリに配置します。 avro ディレクトリが生成されない場合は、手動で作成します。 2. スキーマを定義する JSON を使用して Avro のスキーマを定義します。スキーマは、プリミティブ型 (null、boolean、int、long、float、double、bytes、string) と複合型 (record、enum、array、map、union、fixed) で構成されます。たとえば、次の例では、ユーザー スキーマを定義し、メイン ディレクトリの下に avro ディレクトリを作成し、次に avro ディレクトリの下に新しいファイル user.avsc を作成します。 {"名前空間": "lancoo.ecbdc.pre", "タイプ": "レコード", "名前": "ユーザー", 「フィールド」: [ {"name": "名前", "type": "文字列"}, {"name": "favorite_number", "type": ["int", "null"]}, {"名前": "お気に入りの色", "タイプ": ["文字列", "null"]} ] } 3. スキーマをコンパイルする Mavenプロジェクトのコンパイルをクリックしてコンパイルすると、名前空間パスとUserクラスコードが自動的に作成されます。 4. シリアル化 生成されたデータをシリアル化するためのTestUserクラスを作成する ユーザー user1 = 新しい User(); user1.setName("アリッサ"); user1.setお気に入り番号(256); // お気に入りの列または null を残す // 代替コンストラクタ ユーザー user2 = new User("Ben", 7, "red"); // ビルダー経由で構築 ユーザー user3 = User.newBuilder() .setName("チャーリー") .setFavoriteColor("青") .setお気に入り番号(null) 。建てる(); // user1、user2、user3 をディスクにシリアル化します DatumWriter<User> userDatumWriter = 新しい SpecificDatumWriter<User>(User.class); DataFileWriter<ユーザー> dataFileWriter = 新しい DataFileWriter<ユーザー>(userDatumWriter); dataFileWriter.create(user1.getSchema(), 新しいファイル("user_generic.avro")); dataFileWriter.append(user1); dataFileWriter.append(user2); dataFileWriter.append(user3); データファイルライターを閉じます。 シリアル化プログラムを実行すると、プロジェクトと同じディレクトリにAvroデータが生成されます。 user_generic.avro の内容は次のとおりです。
5. デシリアライゼーション デシリアライゼーションコードによるAvroデータの解析 // ディスクからユーザーをデシリアライズする DatumReader<User> userDatumReader = 新しい SpecificDatumReader<User>(User.class); DataFileReader<ユーザー> dataFileReader = new DataFileReader<ユーザー>(新しいファイル("user_generic.avro"), userDatumReader); ユーザー user = null; (dataFileReader.hasNext()) が実行される間 { // ユーザーオブジェクトをnext()に渡して再利用します。これにより、 // 多くのオブジェクトを割り当ててガベージコレクションするファイル // 多くのアイテム。 ユーザー = dataFileReader.next(ユーザー); System.out.println(ユーザー); } デシリアライズコードを実行してuser_generic.avroを解析します。 Avro データの解析が成功しました。 6. user_generic.avroをhdfsパスにアップロードする hdfs dfs -mkdir -p /tmp/lztest/ hdfs dfs -put user_generic.avro /tmp/lztest/ 7. flinkserverを設定する Avro jarパッケージを準備する flink-sql-avro-*.jar と flink-sql-avro-confluent-registry-*.jar を flinkserver ライブラリに配置し、すべての flinkserver ノードで次のコマンドを実行します。 cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib chmod 500 flink-sql-avro*.jar chown omm:wheel flink-sql-avro*.jar 同時に、FlinkServer インスタンスを再起動し、再起動後に avro パッケージがアップロードされているかどうかを確認します。
8. FlinkSQLを書く テーブルtestHdfsを作成します( 名前文字列、 お気に入り番号 int, favorite_color 文字列 ) と( 'コネクタ' = 'ファイルシステム'、 'パス' = 'hdfs:///tmp/lztest/user_generic.avro', 'フォーマット' = 'avro' );テーブル KafkaTable の作成 ( 名前文字列、 お気に入り番号 int, favorite_color 文字列 ) と ( 'コネクタ' = 'kafka'、 'トピック' = 'testavro'、 'properties.bootstrap.servers' = '96.10.2.1:21005', 'properties.group.id' = 'testGroup'、 'scan.startup.mode' = '最新オフセット'、 'フォーマット' = 'avro' ); 挿入する Kafkaテーブル 選択 * から テストHdfs; タスクを保存して送信 9. 対応するトピックにデータがあるかどうかを確認する FlinkSQL は Avro データを正常に解析しました。 Apache Avro データの解析に関するこの記事はこれで終わりです。Apache Avro データに関するより関連性の高いコンテンツについては、123WORDPRESS.COM の過去の記事を検索するか、以下の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。 以下もご興味があるかもしれません:
|
>>: ホバー画像のポップアウトポップアップ効果を実現するための純粋な CSS のサンプルコード
前回は、Explain 実行プランの表示、インデックスの分析など、MySQL での SQL クエリの...
今日、MySQL の無料インストール版をデプロイしたところ、テーブル 'mysql.plug...
Docker プロセス、メモリ、カップ消費量を表示dockerコンテナを起動し、dockerinsp...
目次仮想DOM仮想DOMとは何か仮想DOMの役割Vue の仮想 DOM vノードvNodeとはvNo...
MySQL 結合クエリ1. 基本概念2 つのテーブルの各行をペアで水平に接続して、すべての行の結果を...
1. DOCTYPE は必須です。ブラウザは宣言した DOCTYPE に基づいてページのレンダリング...
この記事では、例を使用して、MySQL イベントの変更 (ALTER EVENT)、イベントの無効化...
目次序文1. ロックとは何ですか? 2. InnoDBストレージエンジンのロック2.1 ロックの種類...
時間は本当に存在するのでしょうか?時間は人間が考え出した概念に過ぎず、物事の変化を測る基準に過ぎない...
毎日の統計情報を取得するプロジェクトを実行する際、プロジェクト ログを分析する必要があります。要件の...
ここ2日間Javaを復習するつもりなので、練習にdubboを使ってショッピングモールプロジェクトを書...
結合クエリ結合クエリとは、2 つ以上のテーブル間のマッチング クエリを指し、一般的には水平操作と呼ば...
JWT の紹介JWTとは正式名称はJSON Web Tokenで、現在最も人気のあるクロスドメイン認...
ログイン + セッションストレージエフェクト表示ログインに成功すると、ユーザー ID がフロントエン...
目次要件: 進行中のアクティビティ データを照会する次のSQLクエリは、上記の4つの要件を満たし、タ...