1 つの記事で Apache Avro データを解析する

1 つの記事で Apache Avro データを解析する

概要: この記事では、Avro データをシリアル化して生成し、FlinkSQL を使用して解析する方法を説明します。

Avro 公式ドキュメント、http://avro.apache.org/docs/current/index.html。

Avroの紹介

Avroはデータシリアル化システムです

提供内容:

  • 豊富なデータ構造
  • コンパクトで高速なバイナリデータ形式
  • 永続的なデータを保存するためのファイル形式
  • リモート プロシージャ コール (RPC) システム
  • 動的言語とのシンプルな対話。データ ファイルの読み取りと書き込み用のコードを生成する必要はなく、RPC プロトコルを使用したり実装したりする必要もありません。コード生成は最適化の一形態ですが、静的言語の場合にのみ意味があります。

技術的背景

インターネットの急速な発展に伴い、クラウドコンピューティング、ビッグデータ、人工知能AI、モノのインターネットなどの最先端技術は、現代の主流のハイテクとなっています。電子商取引サイト、顔認識、自動運転車、スマートホーム、スマートシティなどは、人々の衣食住や交通を促進するだけでなく、さまざまなシステムプラットフォームによって常に大量のデータが収集、整理、分析されています。データの低遅延、高スループット、セキュリティを確保することが特に重要です。Apache Avro自体は、バイナリ転送用のスキーマを通じてシリアル化されており、一方では高速データ転送を、他方ではデータセキュリティを確保しています。Avroは現在、さまざまな業界でますます広く使用されています。Avroデータをどのように処理および解析するかは特に重要です。この記事では、シリアル化によってAvroデータを生成し、FlinkSQLを使用して解析する方法を説明します。

この記事は、Avro 解析のデモです。現在、FlinkSQL は単純な Avro データ解析にのみ適しており、複雑にネストされた Avro データはまだサポートされていません。

シーン紹介

この記事では主に以下の3つのポイントを紹介します。

  • Avro データをシリアル化して生成する方法
  • Avro データをデシリアライズして解析する方法
  • FlinkSQL を使用して Avro データを解析する方法

前提条件

  • Avroの詳細については、Apache Avro公式サイトのクイックスタートガイドを参照してください。
  • Avro アプリケーションのシナリオを理解する

手順

1. 新しいAvro Mavenプロジェクトを作成し、POM依存関係を構成する

pom ファイルの内容は次のとおりです。

<?xml バージョン="1.0" エンコーディング="UTF-8"?>
<プロジェクト xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <モデルバージョン>4.0.0</モデルバージョン>

    <グループID>com.huawei.bigdata</グループID>
    <artifactId>アヴロデモ</artifactId>
    <バージョン>1.0-SNAPSHOT</バージョン>
    <依存関係>
        <依存関係>
            <グループ ID>org.apache.avro</グループ ID>
            <artifactId>アブロ</artifactId>
            <バージョン>1.8.1</バージョン>
        </依存関係>
        <依存関係>
            <groupId>ジュニット</groupId>
            <artifactId>junit</artifactId>
            <バージョン>4.12</バージョン>
        </依存関係>
    </依存関係>

    <ビルド>
        <プラグイン>
            <プラグイン>
                <グループ ID>org.apache.avro</グループ ID>
                <artifactId>Avro-Maven プラグイン</artifactId>
                <バージョン>1.8.1</バージョン>
                <処刑>
                    <実行>
                        <phase>ソースを生成する</phase>
                        <目標>
                            <goal>スキーマ</goal>
                        </目標>
                        <構成>
                            <ソースディレクトリ>${project.basedir}/src/main/avro/</ソースディレクトリ>
                            <出力ディレクトリ>${project.basedir}/src/main/java/</出力ディレクトリ>
                        </構成>
                    </実行>
                </処刑>
            </プラグイン>
            <プラグイン>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <構成>
                    <ソース>1.6</ソース>
                    <target>1.6</target>
                </構成>
            </プラグイン>
        </プラグイン>
    </ビルド>

</プロジェクト>

注: 上記の pom ファイルは、自動的に生成されるクラスのパス、つまり ${project.basedir}/src/main/avro/ と ${project.basedir}/src/main/java/ を構成します。この構成の後、mvn コマンドを実行すると、プラグインはこのディレクトリの avsc スキーマからクラス ファイルを自動的に生成し、後者のディレクトリに配置します。 avro ディレクトリが生成されない場合は、手動で作成します。

2. スキーマを定義する

JSON を使用して Avro のスキーマを定義します。スキーマは、プリミティブ型 (null、boolean、int、long、float、double、bytes、string) と複合型 (record、enum、array、map、union、fixed) で構成されます。たとえば、次の例では、ユーザー スキーマを定義し、メイン ディレクトリの下に avro ディレクトリを作成し、次に avro ディレクトリの下に新しいファイル user.avsc を作成します。

{"名前空間": "lancoo.ecbdc.pre",
 "タイプ": "レコード",
 "名前": "ユーザー",
 「フィールド」: [
     {"name": "名前", "type": "文字列"},
     {"name": "favorite_number", "type": ["int", "null"]},
     {"名前": "お気に入りの色", "タイプ": ["文字列", "null"]}
 ]
} 

3. スキーマをコンパイルする

Mavenプロジェクトのコンパイルをクリックしてコンパイルすると、名前空間パスとUserクラスコードが自動的に作成されます。

4. シリアル化

生成されたデータをシリアル化するためのTestUserクラスを作成する

ユーザー user1 = 新しい User();
user1.setName("アリッサ");
user1.setお気に入り番号(256);
// お気に入りの列または null を残す

// 代替コンストラクタ
ユーザー user2 = new User("Ben", 7, "red");

// ビルダー経由で構築
ユーザー user3 = User.newBuilder()
        .setName("チャーリー")
        .setFavoriteColor("青")
        .setお気に入り番号(null)
        。建てる();

// user1、user2、user3 をディスクにシリアル化します
DatumWriter<User> userDatumWriter = 新しい SpecificDatumWriter<User>(User.class);
DataFileWriter<ユーザー> dataFileWriter = 新しい DataFileWriter<ユーザー>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), 新しいファイル("user_generic.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
データファイルライターを閉じます。

シリアル化プログラムを実行すると、プロジェクトと同じディレクトリにAvroデータが生成されます。

user_generic.avro の内容は次のとおりです。

Objavro.schema�{"type":"record","name":"User","namespace":"lancoo.ecbdc.pre","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}

5. デシリアライゼーション

デシリアライゼーションコードによるAvroデータの解析

// ディスクからユーザーをデシリアライズする
DatumReader<User> userDatumReader = 新しい SpecificDatumReader<User>(User.class);
DataFileReader<ユーザー> dataFileReader = new DataFileReader<ユーザー>(新しいファイル("user_generic.avro"), userDatumReader);
ユーザー user = null;
(dataFileReader.hasNext()) が実行される間 {
    // ユーザーオブジェクトをnext()に渡して再利用します。これにより、
    // 多くのオブジェクトを割り当ててガベージコレクションするファイル
    // 多くのアイテム。
    ユーザー = dataFileReader.next(ユーザー);
    System.out.println(ユーザー);
}

デシリアライズコードを実行してuser_generic.avroを解析します。

Avro データの解析が成功しました。

6. user_generic.avroをhdfsパスにアップロードする

hdfs dfs -mkdir -p /tmp/lztest/
hdfs dfs -put user_generic.avro /tmp/lztest/ 

7. flinkserverを設定する

Avro jarパッケージを準備する

flink-sql-avro-*.jar と flink-sql-avro-confluent-registry-*.jar を flinkserver ライブラリに配置し、すべての flinkserver ノードで次のコマンドを実行します。

cp /opt/huawei/Bigdata/FusionInsight_Flink_8.1.2/install/FusionInsight-Flink-1.12.2/flink/opt/flink-sql-avro*.jar /opt/huawei/Bigdata/FusionInsight_Flink_8.1.3/install/FusionInsight-Flink-1.12.2/flink/lib
chmod 500 flink-sql-avro*.jar
chown omm:wheel flink-sql-avro*.jar 

同時に、FlinkServer インスタンスを再起動し、再起動後に avro パッケージがアップロードされているかどうかを確認します。

hdfs dfs -ls /FusionInsight_FlinkServer/8.1.2-312005/lib

8. FlinkSQLを書く

テーブルtestHdfsを作成します(
  名前文字列、
  お気に入り番号 int,
  favorite_color 文字列
) と(
  'コネクタ' = 'ファイルシステム'、
  'パス' = 'hdfs:///tmp/lztest/user_generic.avro',
  'フォーマット' = 'avro'
);テーブル KafkaTable の作成 (
  名前文字列、
  お気に入り番号 int,
  favorite_color 文字列
) と (
  'コネクタ' = 'kafka'、
  'トピック' = 'testavro'、
  'properties.bootstrap.servers' = '96.10.2.1:21005',
  'properties.group.id' = 'testGroup'、
  'scan.startup.mode' = '最新オフセット'、
  'フォーマット' = 'avro'
);
挿入する
  Kafkaテーブル
選択
  *
から
  テストHdfs; 

タスクを保存して送信

9. 対応するトピックにデータがあるかどうかを確認する

FlinkSQL は Avro データを正常に解析しました。

Apache Avro データの解析に関するこの記事はこれで終わりです。Apache Avro データに関するより関連性の高いコンテンツについては、123WORDPRESS.COM の過去の記事を検索するか、以下の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。

以下もご興味があるかもしれません:
  • Apache Superset を使用して ClickHouse データを視覚化する 2 つの方法

<<:  SQL重複排除方法の概要

>>:  ホバー画像のポップアウトポップアップ効果を実現するための純粋な CSS のサンプルコード

推薦する

MySQL テーブルの読み取り、書き込み、インデックス作成、その他の操作の SQL ステートメントの効率最適化の問題を分析します。

前回は、Explain 実行プランの表示、インデックスの分析など、MySQL での SQL クエリの...

MySQL をデプロイするときに発生する「テーブル mysql.plugin が存在しません」という問題の解決方法

今日、MySQL の無料インストール版をデプロイしたところ、テーブル 'mysql.plug...

Docker View プロセス、メモリ、カップ消費量

Docker プロセス、メモリ、カップ消費量を表示dockerコンテナを起動し、dockerinsp...

Vue 仮想 DOM クイックスタート

目次仮想DOM仮想DOMとは何か仮想DOMの役割Vue の仮想 DOM vノードvNodeとはvNo...

MySQL 結合クエリの原則の知識ポイント

MySQL 結合クエリ1. 基本概念2 つのテーブルの各行をペアで水平に接続して、すべての行の結果を...

HTML ページ出力で従うべきいくつかの原則の要約

1. DOCTYPE は必須です。ブラウザは宣言した DOCTYPE に基づいてページのレンダリング...

MySQLテクノロジーにおけるInnoDBロックの詳細な説明

目次序文1. ロックとは何ですか? 2. InnoDBストレージエンジンのロック2.1 ロックの種類...

MySQL の時間設定に関する考慮事項の詳細な要約

時間は本当に存在するのでしょうか?時間は人間が考え出した概念に過ぎず、物事の変化を測る基準に過ぎない...

一定期間の日ごと、時間ごとの統計データを取得するMySQLの詳しい説明

毎日の統計情報を取得するプロジェクトを実行する際、プロジェクト ログを分析する必要があります。要件の...

dubbo での Zookeeper リクエストのタイムアウト問題: mysql8.0.15 に接続する mybatis+spring の構成

ここ2日間Javaを復習するつもりなので、練習にdubboを使ってショッピングモールプロジェクトを書...

MySQL 接続とコレクションの簡単な分析

結合クエリ結合クエリとは、2 つ以上のテーブル間のマッチング クエリを指し、一般的には水平操作と呼ば...

Nodeはkoa2を使用してシンプルなJWT認証方式を実装します

JWT の紹介JWTとは正式名称はJSON Web Tokenで、現在最も人気のあるクロスドメイン認...

Vue での ElementUI の使用に関する詳細な説明

ログイン + セッションストレージエフェクト表示ログインに成功すると、ユーザー ID がフロントエン...

mysqlは、現在の時刻が開始時刻と終了時刻の間にあるかどうかを判断し、開始時刻と終了時刻が空であることが許可されます。

目次要件: 進行中のアクティビティ データを照会する次のSQLクエリは、上記の4つの要件を満たし、タ...