注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。
このチュートリアルでは、Foundry Streaming と Pipeline Builder を使用して、センサーの温度情報を含む単一のデータセットを出力する簡単なパイプラインを作成します。Foundry でストリームを作成し、そのストリームにレコードをプッシュし、Pipeline Builder で変換する方法を学びます。
まず、新しいストリームを作成する必要があります。
次に、ストリームを定義する必要があります。このガイドでは、シンプルな 1 パーティションストリームを作成し、手動でレコードをプッシュします。
Define ページで、スループットを Normal に設定し、基本的なスキーマを sensor_id: String
、temperature: Double
として定義します。
これで、ストリームを接続する準備が整いました。ここで、source を使用したストリーミングデータの取り込みタスクを設定することができます。ただし、このチュートリアルでは、Curl を使用して手動でストリームにレコードをプッシュします。
Test with a personal token を選択し、画面に表示されるプロンプトに従って、短期間有効な個人用トークンを生成します。
個人用トークンは、本番用のパイプラインでは使用しないでください。本番用のパイプラインでは、OAuth トークンワークフロー を使用してください。
数秒以内に、ページ上のストリームビューアにレコードが表示されます。
これで、リアルタイムでストリーミングデータを取り込むことができました。次に、そのデータを変換しましょう。
これにより、入力ストリーム用のパイプラインが作成され、グラフに表示されます。
入力ストリームノードを選択すると、データのプレビューが表示されます。プレビューはストリームのコールドストレージビューで実行されるため、ストリームからのレコードが表示されるまでに遅延があります。
グラフ上の入力ストリームノードをクリックし、入力ノードの隣にある青い T アイコンを選択して、Transform アクションを選択します。
これにより、ストリーム内の行の入力タイプに基づいて、現在サポートされているすべての変換のリストが表示されます。このチュートリアルでは、すべての sensor_ids
を大文字に変換し、余白を削除し、3 度を超える温度でフィルター処理します。
sensor_id
行を選択し、Apply をクリックします。sensor_id
行を選択し、Apply をクリックします。temperature
行を選択し、フィルターを 3
よりも greater than に設定し、Apply を選択します。変更を保存してもデプロイしない場合、パイプラインのロジックは最新の変更に 更新されません。変換ロジックの変更を取り込むには、パイプラインをデプロイする必要があります。
これで、変換の出力ストリームが表示されるストリームプレビューページが開きます。
ストリーミングクラスタの起動には約 1 分かかるため、すぐにレコードが表示されないことがあります。ただし、クラスタが実行されている場合、新しいレコードはリアルタイムで処理されます。
シンプルなストリーミングパイプラインの作成方法がわかったので、デバッグ失敗ストリーム を使ってストリームの管理方法をさらに学びましょう。より高度な変換機能については、Pipeline Builder の詳細をご覧ください。