注: 以下の翻訳の正確性は検証されていません。AIPを利用して英語版の原文から機械的に翻訳されたものです。

Pipeline Builder でストリーミングパイプラインを作成する

このチュートリアルでは、Foundry Streaming と Pipeline Builder を使用して、センサーの温度情報を含む単一のデータセットを出力する簡単なパイプラインを作成します。Foundry でストリームを作成し、そのストリームにレコードをプッシュし、Pipeline Builder で変換する方法を学びます。

パート 1. 初期設定

まず、新しいストリームを作成する必要があります。

  1. Foundry にログインし、Foundry の Project に移動し、右上の + New を選択してから、Stream を選択します。

ストリーム作成ドロップダウンのスクリーンショット

  1. 次に、ストリームを定義する必要があります。このガイドでは、シンプルな 1 パーティションストリームを作成し、手動でレコードをプッシュします。

    Define ページで、スループットを Normal に設定し、基本的なスキーマを sensor_id: Stringtemperature: Double として定義します。

ストリーム定義ページのスクリーンショット

  1. Create stream を選択します。これにより、ストリーミングデータに接続する方法を指定できる Connect ページが開きます。

パート 2. ストリームにレコードをプッシュする

これで、ストリームを接続する準備が整いました。ここで、source を使用したストリーミングデータの取り込みタスクを設定することができます。ただし、このチュートリアルでは、Curl を使用して手動でストリームにレコードをプッシュします。

  1. 最初に、Connect via API セクションの下にある Curl (Bash) を選択して、ストリームの認証を設定します。レコードを送信するために、個人用トークンを使用します。

ストリーム接続ページのスクリーンショット

  1. Test with a personal token を選択し、画面に表示されるプロンプトに従って、短期間有効な個人用トークンを生成します。

    個人用トークンは、本番用のパイプラインでは使用しないでください。本番用のパイプラインでは、OAuth トークンワークフロー を使用してください。

ストリーム接続認証ページのスクリーンショット

  1. 生成したトークンをテキストボックスに貼り付け、Next Step をクリックします。
  2. Curl コマンドをコピーします。Bash を実行できるコンピュータのターミナルを開き、コマンドを貼り付けます。ターミナルでコマンドを実行します。

ストリームプッシュ with curl ページのスクリーンショット

数秒以内に、ページ上のストリームビューアにレコードが表示されます。

ストリームビューレコードタブのスクリーンショット

これで、リアルタイムでストリーミングデータを取り込むことができました。次に、そのデータを変換しましょう。

パート 3. ストリームを変換する

  1. Start pipelining ボタンを選択して、Pipeline Builder で基本的なストリーミング変換を開始します。

ストリームビューレコードタブのスクリーンショット

  1. Create new pipeline モーダルで、Streaming pipeline タイプを選択し、Create Pipeline をクリックします。

ビルダーストリームパイプラインの作成スクリーンショット

これにより、入力ストリーム用のパイプラインが作成され、グラフに表示されます。

入力ストリームノードを選択すると、データのプレビューが表示されます。プレビューはストリームのコールドストレージビューで実行されるため、ストリームからのレコードが表示されるまでに遅延があります。

入力ストリームを含むビルダーグラフのスクリーンショット

  1. グラフ上の入力ストリームノードをクリックし、入力ノードの隣にある青い T アイコンを選択して、Transform アクションを選択します。

    これにより、ストリーム内の行の入力タイプに基づいて、現在サポートされているすべての変換のリストが表示されます。このチュートリアルでは、すべての sensor_ids を大文字に変換し、余白を削除し、3 度を超える温度でフィルター処理します。

ビルダーストリーム変換ドロップダウンのスクリーンショット

  1. Uppercase 変換を選択し、sensor_id 行を選択し、Apply をクリックします。

ビルダーストリーム大文字変換のスクリーンショット

  1. 次に、Trim whitespace 変換を検索し、選択します。再度 sensor_id 行を選択し、Apply をクリックします。

ビルダー空白トリム変換のスクリーンショット

  1. 最後の変換では、まず Filter 変換を検索し、Keep rows を選択します。次に、temperature 行を選択し、フィルターを 3 よりも greater than に設定し、Apply を選択します。

ビルダーフィルター変換のスクリーンショット

  1. 画面右上の Apply all changes をクリックします。次に、Back to graph を選択してパイプラインに戻ります。

変換が含まれるビルダーグラフのスクリーンショット

  1. 作成した Transform path ノードを選択し、New dataset をクリックします。

ビルダーグラフで新しい出力を作成するスクリーンショット

  1. アプリケーションの右上隅で、まず Save をクリックしてパイプラインにすべての新しい変更を適用します。次に、Deploy をクリックし、Deploy pipeline を選択します。

変更を保存してもデプロイしない場合、パイプラインのロジックは最新の変更に 更新されません。変換ロジックの変更を取り込むには、パイプラインをデプロイする必要があります。

ビルダーグラフデプロイドロップダウンのスクリーンショット

  1. 作成した出力ストリームノードを選択し、グラフ下部の Data preview セクションの上にあるストリーム名をクリックします。

デプロイされた出力を持つビルダーグラフのスクリーンショット

これで、変換の出力ストリームが表示されるストリームプレビューページが開きます。

ストリーミングクラスタの起動には約 1 分かかるため、すぐにレコードが表示されないことがあります。ただし、クラスタが実行されている場合、新しいレコードはリアルタイムで処理されます。

出力ストリームのスクリーンショット

次のステップ

シンプルなストリーミングパイプラインの作成方法がわかったので、デバッグ失敗ストリーム を使ってストリームの管理方法をさらに学びましょう。より高度な変換機能については、Pipeline Builder の詳細をご覧ください。