対応プラットフォーム
| プラットフォーム | サポート |
|---|---|
| Confluent Cloud | 全プラン |
| セルフホスト Kafka | 2.8+(KRaftモード)、3.x |
前提条件
- 少なくとも1つのKafka環境とクラスターを持つ Confluent Cloud アカウント、または CloudThinkerから到達可能なセルフホスト Kafka 2.8+(KRaftモード)もしくは 3.x クラスター。
- Confluent Cloudの場合: confluent.cloud/settings/api-keys でAPIキーを作成するための管理者アクセス。
- CloudThinkerからKafkaクラスターのブートストラップサーバーおよびRESTエンドポイントへのネットワークアクセス。
セットアップ
Kafkaプラットフォームを選択して、接続手順を確認してください:- Confluent Cloud
- セルフホスト Kafka
Confluent Cloudを開いて環境を選択
confluent.cloud/home にアクセスし、Environments を開きます。接続したい環境をクリックします。環境IDは選択後のURLに表示されます(例:
env-xxxxx)。ナビゲーション例:- 環境一覧:
https://confluent.cloud/environments - 選択した環境のURLパターン:
https://confluent.cloud/environments/<env-id>/overview
Kafkaクラスターのフィールドを取得
選択した環境内で Clusters を開き、対象クラスター(例:
<cluster-name>)をクリックします。以下を収集します:BOOTSTRAP_SERVERSKAFKA_REST_ENDPOINTKAFKA_CLUSTER_ID
KAFKA_ENV_ID はステップ1で取得した環境IDを使用します。スコープ別APIキーとシークレットを作成
confluent.cloud/settings/api-keys にアクセスし、+ Add API Key をクリックします。本番ワークロードには Service Account、開発・テストには My Account を選択します。Confluentのオンボーディングで目的のスコープを選択し、生成されたAPIキーとAPIシークレットのペアを保存します。作成できるスコープ:
- Kafkaクラスター
- Schema Registry
- ksqlDBクラスター
- Flinkリージョン
- クラウドリソース管理
- Tableflow
Schema Registryエンドポイントを取得(オプション)
選択した環境で Stream Governance -> Schema Registry を開きます。以下を収集します:
SCHEMA_REGISTRY_ENDPOINT
https://confluent.cloud/environments/<env-id>/stream-governance/schema-registry/overviewFlinkフィールドを取得(オプション)
選択した環境で Flink を開きます。Compute pools を開き、必要であれば + Add compute pool でプールを作成します。対象のコンピュートプールをクリックして以下を収集します:
FLINK_COMPUTE_POOL_IDFLINK_ENV_ID(URLから取得した環境ID)
https://confluent.cloud/environments/<env-id>/flink/pools/<compute-pool-id>/overviewFLINK_REST_ENDPOINT はクラウドプロバイダーとリージョン(AWS、Azure、またはGCP。例: <region-code>)から設定します。組織IDを取得(オプション)
confluent.cloud/settings/organizations/edit にアクセスして以下を収集します:
FLINK_ORG_ID
CloudThinkerで接続を追加
CloudThinkerで Connections → Kafka に移動します。有効にしたスコープのフィールドを含むJSONファイルを作成します(下記の接続フィールドテンプレートを参照)。接続フォームにこのJSONファイルをアップロードします。必須フィールドはプロファイルによって異なります。詳細はプロファイルを参照してください。Connect をクリックします。CloudThinkerが認証情報を検証し、Connected ステータスを表示します。
スコープベースの認証情報モデル
Confluent CloudはスコープベースのAPI認証情報を使用します。各APIキーとシークレットのペアは、特定のリソーススコープへのアクセスを付与します。Kafkaのみのフィールドから始めて、後でSchema Registry、Flink、Cloud API、またはTableflowのフィールドを追加できます。| スコープ | アンロックされる機能 | 主なフィールド |
|---|---|---|
| Kafkaクラスター | トピックの管理(一覧表示・作成・削除・設定)、メッセージのプロデュース/コンシューム、クラスターメタデータの表示 | BOOTSTRAP_SERVERS, KAFKA_API_KEY, KAFKA_API_SECRET, KAFKA_CLUSTER_ID, KAFKA_ENV_ID, KAFKA_REST_ENDPOINT |
| Schema Registry | データスキーマの一覧表示・検査・削除 | SCHEMA_REGISTRY_ENDPOINT, SCHEMA_REGISTRY_API_KEY, SCHEMA_REGISTRY_API_SECRET |
| Flinkリージョン | Flink SQLステートメントの作成と管理、カタログ/データベース/テーブルの探索、ヘルスチェックと診断 | FLINK_REST_ENDPOINT, FLINK_API_KEY, FLINK_API_SECRET, FLINK_COMPUTE_POOL_ID, FLINK_ENV_ID |
| クラウドリソース管理 | 環境とクラスターの探索、運用メトリクスと課金コストのクエリ | CONFLUENT_CLOUD_API_KEY, CONFLUENT_CLOUD_API_SECRET |
| Tableflow | Tableflow対応トピックとカタログ統合の管理(例: AWS Glue) | TABLEFLOW_API_KEY, TABLEFLOW_API_SECRET |
| 組織メタデータ | Flinkリソース管理のための組織レベルのコンテキスト | FLINK_ORG_ID |
プロファイル
最小構成(Kafkaのみ)
必須:BOOTSTRAP_SERVERSKAFKA_API_KEYKAFKA_API_SECRETKAFKA_CLUSTER_IDKAFKA_ENV_ID
標準構成(Kafka + Schema Registry + クラウド管理)
追加:SCHEMA_REGISTRY_ENDPOINTSCHEMA_REGISTRY_API_KEYSCHEMA_REGISTRY_API_SECRETCONFLUENT_CLOUD_API_KEYCONFLUENT_CLOUD_API_SECRET
高度な構成(Flink / Tableflow)
必要に応じて1つ以上のオプションスコープグループを追加:- Flink:
FLINK_REST_ENDPOINT,FLINK_API_KEY,FLINK_API_SECRET,FLINK_COMPUTE_POOL_ID,FLINK_ENV_ID - Tableflow:
TABLEFLOW_API_KEY,TABLEFLOW_API_SECRET
接続フィールドテンプレート
このテンプレートを使用して、有効にしたスコープの値を入力してください:接続の詳細
接続フィールドはJSON認証情報ファイルとして送信します。フィールドはプラットフォームと有効なスコープによって異なります。詳細はセットアップセクションの全テンプレートを参照してください。CloudThinkerは部分的なスコープのオンボーディングをサポートしています。Kafkaのみのフィールドから始めて、後でSchema Registry、Flink、Cloud API、またはTableflowの認証情報を追加できます。
| フィールド | プラットフォーム | 説明 |
|---|---|---|
BOOTSTRAP_SERVERS | 両方 | Kafkaクラスターのブートストラップアドレス(必須) |
KAFKA_API_KEY / KAFKA_API_SECRET | Confluent Cloud | Kafkaスコープの認証情報 |
SCHEMA_REGISTRY_ENDPOINT | 両方 | Schema Registry URL |
SCHEMA_REGISTRY_API_KEY / SCHEMA_REGISTRY_API_SECRET | Confluent Cloud | Schema Registry認証情報 |
FLINK_REST_ENDPOINT | Confluent Cloud | Flinkリージョンエンドポイント |
CONFLUENT_CLOUD_API_KEY / CONFLUENT_CLOUD_API_SECRET | Confluent Cloud | クラウド管理認証情報 |
TABLEFLOW_API_KEY / TABLEFLOW_API_SECRET | Confluent Cloud | Tableflow認証情報 |
必要な権限
Confluent Cloud: スコープごとに別々のAPIキーとシークレットのペアを作成します。Kafka ACLをCloudThinkerが必要とする特定のトピックに制限します。クラウド管理認証情報には最低限MetricsViewerロールが必要です。 セルフホスト Kafka: APIキーは不要です。ブローカーのブートストラップアドレスがCloudThinkerからポート9092でネットワーク到達可能であることを確認してください。エージェント機能
接続後、Alex と Tony は次のことができます:| 機能 | 説明 |
|---|---|
| コンシューマーラグ監視 | コンシューマーグループごとのラグを追跡し、遅いコンシューマーを特定 |
| トピックヘルス分析 | パーティション分散、レプリケーションファクター、レプリカ不足パーティションを確認 |
| スループットメトリクス | トピックごとのバイト入出力とメッセージレートを監視 |
| ブローカーヘルス | ブローカーの可用性とISR(In-Sync Replicas)ステータスを追跡 |
接続を確認
プロンプト例
トラブルシューティング
接続が拒否またはタイムアウト
接続が拒否またはタイムアウト
- Kafkaブローカープロセスが
<broker-name>.<your-domain>で実行中であることを確認してください。 - ブローカーポート(デフォルト9092)が開放されており、ファイアウォールでブロックされていないことを確認してください。
- ブートストラップサーバーアドレス
<broker-name>.<your-domain>:9092が正しく、CloudThinkerから到達可能であることを確認してください。 - ローカル開発の場合、KafkaがアクセスしやすいIP(
127.0.0.1だけでなく)にバインドされていることを確認してください。
Confluent Cloudの部分スコープフィールド
Confluent Cloudの部分スコープフィールド
部分的なスコープのオンボーディングを使用する場合、未使用スコープのキーと値のペア全体を削除してください。空文字列は残さないでください。正しい形式(Kafkaのみ、Schema Registryを完全に削除):誤った形式(空文字列の値はバリデーションエラーを引き起こします):
Schema Registry接続の失敗
Schema Registry接続の失敗
SCHEMA_REGISTRY_ENDPOINT URLが正しく、CloudThinkerから到達可能であることを確認してください。セルフホストの場合、ポート8081が開放されていることを確認してください。Confluent Cloudの場合、Schema Registry APIキーがお使いの環境に対して正しい権限を持っていることを確認してください。セキュリティ
- 最小権限 — エージェントがユースケースに必要な権限のみを付与します。まず読み取り専用から始め、後から拡張してください。
- デフォルトで読み取り専用 — エージェントにこの接続で変更を行わせる場合を除き、読み取り専用の認証情報を使用してください。
- 認証情報のローテーション — 通常のスケジュールに従ってキーとトークンをローテーションしてください。接続を更新すると、CloudThinker が新しい値を自動的に取得します。
- オフボーディング時に失効 — 接続を削除するか、チームメンバーが退職する際には、プロバイダー側で認証情報を無効化してください。
- スコープ限定APIキー — CloudThinkerが必要とするスコープのみを付与し、Kafkaのみから始めてSchema Registry、Flink、またはクラウド管理スコープを段階的に追加してください
- ネットワーク制限 — セキュリティグループまたはファイアウォールルールで、ブートストラップおよびRESTエンドポイントをCloudThinkerのエグレスIPに制限してください
関連情報
Alex エージェント
クラウドインフラストラクチャとストリーミング最適化エージェント
AWS 接続
AWSクラウドリソースのセットアップ手順