MLSQL スタックでストリームのデバッグを簡単にする方法

MLSQL スタックでストリームのデバッグを簡単にする方法

序文

クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。

  • いつでも最新の固定数の Kafka データを表示できる機能
  • デバッグ結果(シンク)はWebコンソールに出力できます
  • ストリーミング プログラムは JSON スキーマを自動的に推測できます (Spark では現時点ではこれができません)

これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。

プロセス

まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。

abc=''' を設定します
{ "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
{ "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"}
''';
jsonStr.`abc` を table1 として読み込みます。

table1 から to_json(struct(*)) を値として table2 として選択します。
追加したtable2をkafka.`wow`として保存します。 
kafka.bootstrap.servers="127.0.0.1:9092";

こうすることで、実行するたびにデータを Kafka に書き込むことができます。

次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。

!kafkaTool sampleData 「127.0.0.1:9092」からの 10 件のレコード、すごい!

この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。

問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。

-- kafkaTool を使用して kafka からスキーマを推測します
!kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。


kafka.`wow` オプションをロードする 
kafka.bootstrap.servers="127.0.0.1:9092"
newkafkatable1 として;


newkafkatable1 から * を選択
表21のように;


-- ターミナルコンソールの代わりに Web コンソールに印刷します。
保存追加テーブル21 
webConsole として。 
オプションモード="追加"
期間="15"
および checkpointLocation="/tmp/s-cpl4";

結果は次のとおりです。

ターミナルでリアルタイムの効果を確認することもできます。

補充する

もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。

-- ストリーム名は一意である必要があります。
streamName を「streamExample」に設定します。


-- いくつかのデータを模擬します。
データ='''を設定
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
{"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0}
''';

-- データをテーブルとしてロードする
jsonStr.`data` をデータソースとして読み込みます。

--テーブルをストリームソースとして変換する
mockStream.`datasource` オプションをロードする 
ステップサイズ範囲="0-3"
newkafkatable1 として;

-- 集約 
newkafkatable1 から、cast(value as string) を k として選択します。
表21のように;


!callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated";
-- 結果をコンソールに出力します。


保存追加テーブル21 
慣習として。 
オプションモード="追加"
期間="15"
そしてsourceTable="jack"
コード='''
jack から count(*) を c として newjack として選択します。
append newjack を parquet.`/tmp/jack` として保存します。 
'''
および checkpointLocation="/tmp/cpl15";

要約する

以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。

以下もご興味があるかもしれません:
  • Mysql LONGBLOB型はバイナリデータを格納します(変更+デバッグ+ソート)
  • Mysql LONGTEXT型は大きなファイル(バイナリも可能)を保存します(変更+デバッグ+ソート)
  • Mysql 中国語の挿入と中国語のクエリ (変更 + デバッグ)
  • 初心者向け PHP デバッグ環境の設定 (IIS+PHP+MYSQL)
  • MySQL UDFデバッグモードdebugviewの関連メソッド
  • MySQL のデバッグと最適化に関する 101 のヒントを共有する
  • GDBデバッグMySQL実戦ソースコードコンパイルとインストール

<<:  Linux でファイルの作成時間を取得する方法と実践的なチュートリアル

>>:  JavaScriptのスリープ関数の使用

推薦する

webpack と rollup を使用してコンポーネント ライブラリをパッケージ化する方法

序文以前、ローディングスタイルのコンポーネントを作成しました。コードの再利用性を実現するために、この...

Nginx における accept lock の仕組みと実装の詳細な説明

序文nginx はマルチプロセス モデルを使用します。リクエストが届くと、システムはプロセスをロック...

MySQL 5.6 での table_open_cache パラメータの最適化と適切な構成の詳細な説明

1. はじめにtable_cache は非常に重要な MySQL パフォーマンス パラメータであり、...

CSS3 天子グリッドリストのスタイルの書き方

多くのプロジェクトでは、中央に灰色の分割線があり、両側に分割線がないグリッド表示の機能を実装する必要...

Docker コンテナを他のサーバーに移行する 5 つの方法

多くの場合、移行は避けられません。ハードウェアのアップグレード、データ センターの変更、古いオペレー...

HTML フレーム、Iframe、フレームセットの違い

10.4.1 フレームセットとフレームの違い まず、フレームセットとフレームの違いについて説明します...

JavaScript の寄生的構成継承についての簡単な説明

コンポジション継承組み合わせ継承は、疑似古典的継承とも呼ばれます。これは、昨日説明したプロトタイプ ...

Docker ケース分析: Redis サービスの構築

目次1 マウントディレクトリとファイルを作成する2 Redisイメージを取得する3 コンテナを作成し...

awk でのループの使用

同じコマンドを複数回実行するさまざまな種類のループについて学習しましょう。 awk スクリプトには、...

JavaScript データ型変換の例 (他の型を文字列、数値型、ブール型に変換する)

序文データ型変換とは何ですか?フォームまたはプロンプトを使用して取得されるデフォルトのデータ型は文字...

Nginx セッション損失問題の解決策

nginx をリバース プロキシ tomcat として使用する場合、セッション損失が発生する可能性が...

HTML をホームページとして設定し、お気に入りに追加_Powernode Java Academy

IE ブラウザで「ホームページとして設定」および「お気に入りに追加」機能を実装する方法解決:指定さ...

一般的なMysql DDL操作の概要

図書館管理ライブラリを作成する データベースを作成します [存在しない場合] ライブラリ名;ライブラ...

HTMLでカスタムタグを使用する方法

カスタム タグは XML ファイルと HTML ファイルで自由に使用できますが、いくつか注意すべき点...