特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期する - ソリューション

特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期する - ソリューション

1. 当初の需要

特定の MySQL ライブラリ内の特定のテーブルの元の完全なデータと増分データをリアルタイムで同期する必要があり、対応する変更と削除も同期する必要があります。

データの同期は邪魔にならないようにする必要があります。つまり、ビジネス手順を変更したり、ビジネス側に過度のパフォーマンス圧力をかけたりしてはなりません。

アプリケーション シナリオ: データ ETL 同期とビジネス サーバーへの負荷の軽減。

2. 解決策

3. 運河の導入と設置

Canal は、純粋な Java で開発された Alibaba のオープンソース プロジェクトです。データベースの増分ログ分析に基づいて、増分データのサブスクリプションと消費を提供し、現在は主に MySQL をサポートしています (mariaDB もサポートしています)。

動作原理: MySQL マスタースレーブレプリケーションの実装

大まかに見ると、レプリケーションは次の 3 つのステップに分かれます。

  1. マスターはバイナリ ログに変更を記録します (これらの記録はバイナリ ログ イベントと呼ばれ、show binlog events で表示できます)。
  2. スレーブはマスターのバイナリ ログ イベントをリレー ログにコピーします。
  3. スレーブはリレー ログ内のイベントをやり直し、データを自身のものに反映するように変更します。

運河の仕組み

原理は比較的単純です。

  1. Canal は MySQL スレーブの対話型プロトコルをシミュレートし、MySQL スレーブのふりをして、ダンプ プロトコルを MySQL マスターに送信します。
  2. MySQL マスターはダンプ要求を受信し、バイナリ ログをスレーブ (チャネル) にプッシュし始めます。
  3. Canalはバイナリログオブジェクト(元々はバイトストリーム)を解析します

建築

例:

  • サーバーは、JVMに対応するチャネル実行インスタンスを表します。
  • インスタンスはデータ キューに対応します (1 つのサーバーは 1..n 個のインスタンスに対応します)

インスタンスモジュール:

  • eventParser (データ ソース アクセス、スレーブ プロトコルとマスターの相互作用のシミュレーション、プロトコル解析)
  • eventSink (パーサーおよびストア コネクタ、データのフィルタリング、処理、および配信を実行)
  • eventStore (データストレージ)
  • metaManager (増分サブスクリプションおよび消費情報マネージャー)

インストール

1. MySQLとKafka環境の準備

2. canal をダウンロード: wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3. 解凍: tar -zxvf canal.deployer-1.1.3.tar.gz

4. confディレクトリ内のファイルパラメータを設定する

canal.properties を設定します。

conf/example に入り、instance.properties を設定します。

5. 起動: bin/startup.sh

6. ログの表示:

4. 検証

1. 対応するKafkaコンシューマーを開発する

パッケージ org.kafka;

java.util.Arrays をインポートします。
java.util.Properties をインポートします。
org.apache.kafka.clients.consumer.ConsumerRecord をインポートします。
org.apache.kafka.clients.consumer.ConsumerRecords をインポートします。
org.apache.kafka.clients.consumer.KafkaConsumer をインポートします。
org.apache.kafka.common.serialization.StringDeserializer をインポートします。


/**
 *
 * タイトル: KafkaConsumerTest
 * 説明:
 * kafka コンシューマーデモ
 * バージョン:1.0.0
 * @著者 パンCM
 * @日付 2018年1月26日 */
パブリッククラス KafkaConsumerTest は Runnable を実装します {

    プライベート最終 KafkaConsumer<String, String> コンシューマー;
    プライベート ConsumerRecords<String, String> msgList;
    プライベート最終文字列トピック;
    プライベート静的最終文字列 GROUPID = "groupA";

    パブリック KafkaConsumerTest(文字列トピック名) {
        プロパティ props = new Properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("セッションタイムアウト.ms", "30000");
        props.put("auto.offset.reset", "最新");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = 新しい KafkaConsumer<String, String>(props);
        this.topic = トピック名;
        this.consumer.subscribe(Arrays.asList(トピック));
    }

    @オーバーライド
    パブリックボイド実行() {
        int メッセージ番号 = 1;
        System.out.println("---------消費開始---------");
        試す {
            のために (; ; ) {
                メッセージリスト = consumer.poll(1000);
                if (null != msgList && msgList.count() > 0) {
                    (ConsumerRecord<String, String> レコード: msgList) {
                        // 100 件のレコードを消費した後に印刷しますが、印刷されたデータはこのパターンに従わない可能性があります System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());


// 文字列 v = decodeUnicode(record.value());

// System.out.println(v);

                        // 1000 メッセージが消費されたら終了 if (messageNo % 1000 == 0) {
                            壊す;
                        }
                        メッセージNo++;
                    }
                } それ以外 {
                    スレッド.sleep(11);
                }
            }
        } キャッチ (InterruptedException e) {
            e.printStackTrace();
        ついに
            コンシューマーを閉じます。
        }
    }

    パブリック静的void main(String args[]) {
        KafkaConsumerTest test1 = 新しい KafkaConsumerTest("サンプルデータ");
        スレッド thread1 = new Thread(test1);
        スレッド1を開始します。
    }


    /*
     * 中国語をユニコードに変換*/
    パブリック静的文字列 gbEncoding(最終的な文字列 gbString) {
        char[] utfBytes = gbString.toCharArray();
        文字列 unicodeBytes = "";
        (int i = 0; i < utfBytes.length; i++) の場合 {
            文字列 hexB = Integer.toHexString(utfBytes[i]);
            (hexB.length() <= 2)の場合{
                hexB = "00" + hexB;
            }
            ユニコードバイト = ユニコードバイト + "\\u" + hexB;
        }
        unicodeBytes を返します。
    }

    /*
     * 中国語への Unicode エンコード */
    パブリック静的文字列decodeUnicode(final String dataStr) {
        開始 = 0;
        int 終了 = 0;
        最終的なStringBufferバッファ = 新しいStringBuffer();
        (開始 > -1) の間 {
            終了 = dataStr.indexOf("\\u", 開始 + 2);
            文字列 charStr = "";
            終了 == -1 の場合 {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } それ以外 {
                charStr = dataStr.substring(開始 + 2、終了);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // 16進整数文字列を解析します。
            buffer.append(新しいCharacter(letter).toString());
            開始 = 終了;
        }
        buffer.toString() を返します。

    }
}

2. テーブルbak1にデータを追加する

テーブル `bak1` を作成します (
  `vin` varchar(20) NOT NULL,
  `p1` ダブルデフォルト NULL、
  `p2` ダブルデフォルト NULL、
  `p3` ダブルデフォルト NULL、
  `p4` ダブルデフォルト NULL、
  `p5` ダブルデフォルト NULL、
  `p6` ダブルデフォルト NULL、
  `p7` ダブルデフォルト NULL、
  `p8` ダブルデフォルト NULL、
  `p9` ダブルデフォルト NULL、
  `p0` ダブル デフォルト NULL
) エンジン=InnoDB デフォルト文字セット=utf8mb4

表示テーブル bak1 を作成します。

insert into bak1 select '李雷abcv',
  `p1`、
  `p2`、
  `p3`、
  `p4`、
  `p5`、
  `p6`、
  `p7`、
  `p8`、
  `p9`、
  moci 制限 10 からの `p0`

3. 出力結果を表示します。

これで、特定の MySQL テーブルの完全データと増分データをメッセージ キューに同期するソリューションに関するこの記事は終了です。特定の MySQL テーブルでデータを同期する方法の詳細については、123WORDPRESS.COM の以前の記事を検索するか、次の関連記事を引き続き参照してください。今後とも 123WORDPRESS.COM をよろしくお願いいたします。

以下もご興味があるかもしれません:
  • MySQLからElasticsearchにデータを同期する方法の詳細な説明
  • Python を使用して MySQL データを ElasticSearch に同期する方法のチュートリアル
  • node.js を使用して MongoDB データを MySQL に同期する手順
  • MySQL5.6 マスタースレーブレプリケーション (mysql データ同期構成)
  • MySQLマスタースレーブデータ同期遅延の削減の詳細な説明
  • 2つのテーブル間でデータを同期するためのmysqlトリガー
  • MySQLデータ同期におけるSlave_IO_Running:Noの問題の解決方法のまとめ
  • MySQLのバックアップと移行データの同期方法
  • MYSQL5 マスタースレーブデータ同期設定方法
  • MySQLデータを同期する方法

<<:  Tencent Cloud Server での Jenkins の設定方法の詳細

>>:  メッセージ ボタンに数量バッジを追加する HTML コード

推薦する

vue3を使用して人間と猫のコミュニケーションアプレットを実装する

目次序文プロジェクトを初期化するデザインコードの実装オンデマンドロードオーディオを再生録音長押しイベ...

要素UIテーブルはドロップダウンフィルタリング機能を実現します

この記事の例では、要素UIテーブルにドロップダウンフィルタリングを実装するための具体的なコードを参考...

HTMLの行間設定方法と問題点

<p></p> の行間隔を設定するには、style="line-h...

広告を閉じるための JavaScript カウントダウン

広告を閉じるまでのカウントダウンを実装するために JavaScript を使用するまだフロントエンド...

MySQL データベースの show processlist コマンドの使用の分析

実際のプロジェクト開発では、多数のクエリや挿入、特にマルチスレッド挿入など、データベースに大きな負荷...

html の img src="" で js 関数または js 変数を呼び出して、画像パスを動的に指定します。

この問題に関して、オンライン リソースをたくさん見つけました。ここにいくつかの方法を示します。コード...

自己終了XHTMLタグを書くときに注意すべきこと

XHTML の img タグはいわゆる自己終了タグであり、XML では完全に合法です。 XHTMLの...

MySQL がエラーを報告: ファイルが見つかりません: './mysql/plugin.frm' 解決策

問題を見つける最近、仕事中に問題が見つかりました。問題は、MySQL ディスクがいっぱいだったことで...

Linux calコマンドの使用

1. コマンドの紹介cal (カレンダー) コマンドは、現在の日付または指定された日付のグレゴリオ暦...

ドキュメントの場所の比較

<br />2 年前に PPK が投稿した素晴らしいブログ記事では、contains()...

CSS3 でテキストマーキーを実装するためのサンプルコード

背景何が起こったかというと、Luzhu は偶然、宇宙で最高の外部スピーカーを備えた携帯電話について知...

CSS -webkit-box-orient: コンパイル後に垂直プロパティが失われる

1. 原因要件は 2 行を表示することであり、余分なテキストは 3 つのドットに置き換えられるため、...

HTMLで下線を設定するには?HTMLでテキストに下線を付ける方法

HTML で下線を引くには、以前はテキストを <u></u> タグで囲む必要...

MySQL ジョイントテーブル更新デー​​タの詳細な例

1.MySQL UPDATE JOIN構文MySQL では、UPDATE ステートメントでJOIN句...

uniapp パッケージ化されたアプレット レーダー チャート コンポーネントの完全なコード

効果画像:実装コードは以下のとおりですビュー <canvas id="radar-c...