序文 クラスメートが MLSQL Stack のストリーミング サポートを調査しています。そこで、フローデバッグは実はかなり難しいと言いました。実践を通じて、次の3点を達成したいと考えています。
これら 3 つのポイントを実装した後、デバッグがはるかに簡単になったことがわかりました。 プロセス まず、Kafka へのデータの書き込みを容易にするために kaf_write.mlsql ファイルを作成しました。 abc=''' を設定します { "x": 100, "y": 200, "z": 200 ,"dataType":"グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} { "x": 120, "y": 100, "z": 260 ,"dataType":"B グループ"} '''; jsonStr.`abc` を table1 として読み込みます。 table1 から to_json(struct(*)) を値として table2 として選択します。 追加したtable2をkafka.`wow`として保存します。 kafka.bootstrap.servers="127.0.0.1:9092"; こうすることで、実行するたびにデータを Kafka に書き込むことができます。 次に、書き込みが完了したら、データが実際に書き込まれているかどうか、またそれがどのようになっているかを確認する必要があります。
この文は、Kafka から 10 個の Kafka データをサンプリングすることを意味します。Kafka のアドレスは 127.0.0.1:9092 で、トピックは wow です。実行結果は次のとおりです。 問題ありません。次に、非常にシンプルなストリーミング プログラムを作成しました。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- kafkaTool を使用して kafka からスキーマを推測します !kafkaTool registerSchema 2 件のレコードが "127.0.0.1:9092" から取得されました。すごいですね。 kafka.`wow` オプションをロードする kafka.bootstrap.servers="127.0.0.1:9092" newkafkatable1 として; newkafkatable1 から * を選択 表21のように; -- ターミナルコンソールの代わりに Web コンソールに印刷します。 保存追加テーブル21 webConsole として。 オプションモード="追加" 期間="15" および checkpointLocation="/tmp/s-cpl4"; 結果は次のとおりです。 ターミナルでリアルタイムの効果を確認することもできます。 補充する もちろん、MLSQL スタックにはストリーミングのための優れた機能が他にも 2 つあります。1 つ目は、ストリーミング イベントに HTTP プロトコル コールバックを設定し、バッチ SQL を使用してストリーミング結果を処理し、最終的にデータベースに保存できることです。次のスクリプトを参照してください。 -- ストリーム名は一意である必要があります。 streamName を「streamExample」に設定します。 -- いくつかのデータを模擬します。 データ='''を設定 {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":0,"タイムスタンプ":"2008-01-24 18:01:01.001","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":1,"タイムスタンプ":"2008-01-24 18:01:01.002","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":2,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":3,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":4,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} {"key":"はい","値":"いいえ","topic":"テスト","パーティション":0,"オフセット":5,"タイムスタンプ":"2008-01-24 18:01:01.003","タイムスタンプタイプ":0} '''; -- データをテーブルとしてロードする jsonStr.`data` をデータソースとして読み込みます。 --テーブルをストリームソースとして変換する mockStream.`datasource` オプションをロードする ステップサイズ範囲="0-3" newkafkatable1 として; -- 集約 newkafkatable1 から、cast(value as string) を k として選択します。 表21のように; !callback post "http://127.0.0.1:9002/api_v1/test" when "started,progress,terminated"; -- 結果をコンソールに出力します。 保存追加テーブル21 慣習として。 オプションモード="追加" 期間="15" そしてsourceTable="jack" コード=''' jack から count(*) を c として newjack として選択します。 append newjack を parquet.`/tmp/jack` として保存します。 ''' および checkpointLocation="/tmp/cpl15"; 要約する 以上がこの記事の全内容です。この記事の内容が皆様の勉強や仕事に何らかの参考学習価値をもたらすことを願います。123WORDPRESS.COM をご愛顧いただき、誠にありがとうございます。 以下もご興味があるかもしれません:
|
<<: Linux でファイルの作成時間を取得する方法と実践的なチュートリアル
序文以前、ローディングスタイルのコンポーネントを作成しました。コードの再利用性を実現するために、この...
序文nginx はマルチプロセス モデルを使用します。リクエストが届くと、システムはプロセスをロック...
1. はじめにtable_cache は非常に重要な MySQL パフォーマンス パラメータであり、...
多くのプロジェクトでは、中央に灰色の分割線があり、両側に分割線がないグリッド表示の機能を実装する必要...
多くの場合、移行は避けられません。ハードウェアのアップグレード、データ センターの変更、古いオペレー...
10.4.1 フレームセットとフレームの違い まず、フレームセットとフレームの違いについて説明します...
コンポジション継承組み合わせ継承は、疑似古典的継承とも呼ばれます。これは、昨日説明したプロトタイプ ...
目次1 マウントディレクトリとファイルを作成する2 Redisイメージを取得する3 コンテナを作成し...
同じコマンドを複数回実行するさまざまな種類のループについて学習しましょう。 awk スクリプトには、...
序文データ型変換とは何ですか?フォームまたはプロンプトを使用して取得されるデフォルトのデータ型は文字...
nginx をリバース プロキシ tomcat として使用する場合、セッション損失が発生する可能性が...
IE ブラウザで「ホームページとして設定」および「お気に入りに追加」機能を実装する方法解決:指定さ...
図書館管理ライブラリを作成する データベースを作成します [存在しない場合] ライブラリ名;ライブラ...
カスタム タグは XML ファイルと HTML ファイルで自由に使用できますが、いくつか注意すべき点...
まず、効果図の下にコードを添付します <!DOCTYPE html> <html&...