MLSQL スタックでストリームのデバッグを簡単にする方法

MLSQL スタックでストリームのデバッグを簡単にする方法

序文

クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。

  • いつでも最新の固定数の Kafka データを表示できる機能
  • デバッグ結果(シンク)はWebコンソールに出力できます
  • ストリーミング プログラムは JSON スキーマを自動的に推測できます (Spark では現時点ではこれができません)

これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。

プロセス

まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。

abc=''' を設定します
{ "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
''';
jsonStr.`abc` を table1 として読み込みます。

table1 から to_json(struct(*)) を値として table2 として選択します。
追加したtable2をkafka.`wow`として保存します。 
kafka.bootstrap.servers="127.0.0.1:9092";

こうすることで、実行するたびにデータを Kafka に書き込むことができます。

次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。

!kafkaTool sampleData 「127.0.0.1:9092」からの 10 件のレコード、すごい!

この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。

問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。

-- kafkaTool を使用して kafka からスキーマを推測します
!kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。


kafka.`wow` オプションをロードする 
kafka.bootstrap.servers="127.0.0.1:9092"
newkafkatable1 として;


newkafkatable1 から * を選択
表21のように;


-- ターミナルコンソールの代わりに Web コンソールに印刷します。
保存追加テーブル21 
webConsole として。 
オプションモード="追加"
期間="15"
および checkpointLocation="/tmp/s-cpl4";

結果は次のとおりです。

ターミナルでリアルタイムの効果を確認することもできます。

補充する

もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。


-- いくつかのデータを模擬します。
データ='''を設定
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
''';

-- データをテーブルとしてロードする
jsonStr.`data` をデータソースとして読み込みます。

--テーブルをストリームソースとして変換する
mockStream.`datasource` オプションをロードする 
ステップサイズ範囲="0-3"
newkafkatable1 として;

-- 集約 
newkafkatable1 から、cast(value as string) を k として選択します。
表21のように;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- 結果をコンソールに出力します。


保存追加テーブル21 
慣習として。 
オプションモード="追加"
期間="15"
そしてsourceTable="jack"
コード='''
jack から count(*) を c として newjack として選択します。
append newjack を parquet.`/tmp/jack` として保存します。 
'''
および checkpointLocation="/tmp/cpl15";

要約する

以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。

以下もご興味があるかもしれません:
  • Mysql LONGBLOB型はバイナリデータを格納します(変更+デバッグ+ソート)
  • Mysql LONGTEXT型は大きなファイル(バイナリも可能)を保存します(変更+デバッグ+ソート)
  • Mysql 中国語の挿入と中国語のクエリ (変更 + デバッグ)
  • 初心者向け PHP デバッグ環境の設定 (IIS+PHP+MYSQL)
  • MySQL UDFデバッグモードdebugviewの関連メソッド
  • MySQL のデバッグと最適化に関する 101 のヒントを共有する
  • GDBデバッグMySQL実戦ソースコードコンパイルとインストール

<<:  Linux でファイルの作成時間を取得する方法と実践的なチュートリアル

>>:  JavaScriptのスリープ関数の使用

推薦する

Linux での screen コマンドの使用方法の詳細な説明

GUNスクリーン:公式サイト: http://www.gnu.org/software/screen...

MySQL 5.7.18 無料インストール版ウィンドウ設定方法

初めてのブログです。データベースの勉強を始めた頃のことを書いています。自分でダウンロードしたのですが...

Docker を使用して Nginx+Flask+Mongo アプリケーションをデプロイする

サーバーにはNginx、データベースサポートにはMongo、Python言語のWebフレームワークに...

Maven+Tomcat 基本イメージを構築する Docker の実装

序文Javaプログラミングでは、ほとんどのアプリケーションはMavenに基づいて構築されており、配信...

Mysql テーブルコメントフィールド取得操作

余計なことは言わないで、コードだけ見てみましょう〜 -- テーブル内のフィールドコメントを表示および...

MySQL InnoDB の重要なコンポーネントの概要

Innodbには以下のコンポーネントが含まれています1. innodb_buffer_pool:これ...

js 日付と時刻のフォーマット方法の例

js 日付時刻形式日付と時刻を指定された形式に変換します。例: YYYY-mm-dd HH:MM は...

ネイティブ JS 音楽プレーヤー

この記事の例では、音楽プレーヤーを実装するためのJSの具体的なコードを参考までに共有しています。具体...

Ubuntu ベースのディストリビューションに Microsoft TrueType フォントをインストールするチュートリアル

Linux 上の LibreOffice で Microsoft ドキュメントを開くと、フォントが少...

VMware で Nginx+KeepAlived クラスタ デュアルアクティブ アーキテクチャを展開する際の問題と解決策

序文負荷分散には nginx を使用します。アーキテクチャのフロントエンドまたは中間層として、トラフ...

Vueは完全な選択機能を実装しています

この記事の例では、完全な選択機能を実装するためのVueの具体的なコードを参考までに共有しています。具...

収集する価値のある Linux ドキュメント編集コマンド 27 個

Linux col コマンドLinux の col コマンドは制御文字をフィルタリングするために使用...

js のプロトタイプ、プロトタイプ オブジェクト、プロトタイプ チェーンの包括的な分析

目次プロトタイプを理解するプロトタイプオブジェクトを理解するインスタンスプロパティとプロトタイププロ...

JavaScript配列の簡単な紹介

目次配列の紹介配列リテラル2次元配列要約する配列の紹介配列- Arrayもオブジェクトですこれは通常...

Dockerを使用してSpringBootプロジェクトをデプロイする方法

Docker テクノロジの開発により、マイクロサービスの実装にさらに便利な環境が提供されます。Doc...