メインコンテンツへスキップ
Apache Kafkaクラスターを接続することで、Alex(Cloud Engineer)とTony(Database Engineer)がトピックの健全性を監視し、コンシューマーラグを分析し、ストリーミングパフォーマンスを最適化できるようになります。 Kafka接続は、スコープごとに別々のAPIキーペアを持つJSON認証情報ファイル(Confluent Cloud)またはブートストラップアドレス(セルフホスト)として送信します。

対応プラットフォーム

プラットフォームサポート
Confluent Cloud全プラン
セルフホスト Kafka2.8+(KRaftモード)、3.x

前提条件

  • 少なくとも1つのKafka環境とクラスターを持つ Confluent Cloud アカウント、または CloudThinkerから到達可能なセルフホスト Kafka 2.8+(KRaftモード)もしくは 3.x クラスター。
  • Confluent Cloudの場合: confluent.cloud/settings/api-keys でAPIキーを作成するための管理者アクセス。
  • CloudThinkerからKafkaクラスターのブートストラップサーバーおよびRESTエンドポイントへのネットワークアクセス。

セットアップ

Kafkaプラットフォームを選択して、接続手順を確認してください:
1

Confluent Cloudを開いて環境を選択

confluent.cloud/home にアクセスし、Environments を開きます。接続したい環境をクリックします。環境IDは選択後のURLに表示されます(例: env-xxxxx)。ナビゲーション例:
  • 環境一覧: https://confluent.cloud/environments
  • 選択した環境のURLパターン: https://confluent.cloud/environments/<env-id>/overview
2

Kafkaクラスターのフィールドを取得

選択した環境内で Clusters を開き、対象クラスター(例: <cluster-name>)をクリックします。以下を収集します:
  • BOOTSTRAP_SERVERS
  • KAFKA_REST_ENDPOINT
  • KAFKA_CLUSTER_ID
KAFKA_ENV_ID はステップ1で取得した環境IDを使用します。
3

スコープ別APIキーとシークレットを作成

confluent.cloud/settings/api-keys にアクセスし、+ Add API Key をクリックします。本番ワークロードには Service Account、開発・テストには My Account を選択します。Confluentのオンボーディングで目的のスコープを選択し、生成されたAPIキーとAPIシークレットのペアを保存します。作成できるスコープ:
  • Kafkaクラスター
  • Schema Registry
  • ksqlDBクラスター
  • Flinkリージョン
  • クラウドリソース管理
  • Tableflow
4

Schema Registryエンドポイントを取得(オプション)

選択した環境で Stream Governance -> Schema Registry を開きます。以下を収集します:
  • SCHEMA_REGISTRY_ENDPOINT
URLパターン例: https://confluent.cloud/environments/<env-id>/stream-governance/schema-registry/overview
5

Flinkフィールドを取得(オプション)

選択した環境で Flink を開きます。Compute pools を開き、必要であれば + Add compute pool でプールを作成します。対象のコンピュートプールをクリックして以下を収集します:
  • FLINK_COMPUTE_POOL_ID
  • FLINK_ENV_ID(URLから取得した環境ID)
URLパターン例: https://confluent.cloud/environments/<env-id>/flink/pools/<compute-pool-id>/overviewFLINK_REST_ENDPOINT はクラウドプロバイダーとリージョン(AWS、Azure、またはGCP。例: <region-code>)から設定します。
6

組織IDを取得(オプション)

confluent.cloud/settings/organizations/edit にアクセスして以下を収集します:
  • FLINK_ORG_ID
7

CloudThinkerで接続を追加

CloudThinkerで Connections → Kafka に移動します。有効にしたスコープのフィールドを含むJSONファイルを作成します(下記の接続フィールドテンプレートを参照)。接続フォームにこのJSONファイルをアップロードします。必須フィールドはプロファイルによって異なります。詳細はプロファイルを参照してください。Connect をクリックします。CloudThinkerが認証情報を検証し、Connected ステータスを表示します。

スコープベースの認証情報モデル

Confluent CloudはスコープベースのAPI認証情報を使用します。各APIキーとシークレットのペアは、特定のリソーススコープへのアクセスを付与します。Kafkaのみのフィールドから始めて、後でSchema Registry、Flink、Cloud API、またはTableflowのフィールドを追加できます。
スコープアンロックされる機能主なフィールド
Kafkaクラスタートピックの管理(一覧表示・作成・削除・設定)、メッセージのプロデュース/コンシューム、クラスターメタデータの表示BOOTSTRAP_SERVERS, KAFKA_API_KEY, KAFKA_API_SECRET, KAFKA_CLUSTER_ID, KAFKA_ENV_ID, KAFKA_REST_ENDPOINT
Schema Registryデータスキーマの一覧表示・検査・削除SCHEMA_REGISTRY_ENDPOINT, SCHEMA_REGISTRY_API_KEY, SCHEMA_REGISTRY_API_SECRET
FlinkリージョンFlink SQLステートメントの作成と管理、カタログ/データベース/テーブルの探索、ヘルスチェックと診断FLINK_REST_ENDPOINT, FLINK_API_KEY, FLINK_API_SECRET, FLINK_COMPUTE_POOL_ID, FLINK_ENV_ID
クラウドリソース管理環境とクラスターの探索、運用メトリクスと課金コストのクエリCONFLUENT_CLOUD_API_KEY, CONFLUENT_CLOUD_API_SECRET
TableflowTableflow対応トピックとカタログ統合の管理(例: AWS Glue)TABLEFLOW_API_KEY, TABLEFLOW_API_SECRET
組織メタデータFlinkリソース管理のための組織レベルのコンテキストFLINK_ORG_ID

プロファイル

最小構成(Kafkaのみ)

必須:
  • BOOTSTRAP_SERVERS
  • KAFKA_API_KEY
  • KAFKA_API_SECRET
  • KAFKA_CLUSTER_ID
  • KAFKA_ENV_ID
できること: トピックの管理(一覧表示・作成・削除・設定)、メッセージのプロデュース/コンシューム、クラスターメタデータとトピック設定の表示。

標準構成(Kafka + Schema Registry + クラウド管理)

追加:
  • SCHEMA_REGISTRY_ENDPOINT
  • SCHEMA_REGISTRY_API_KEY
  • SCHEMA_REGISTRY_API_SECRET
  • CONFLUENT_CLOUD_API_KEY
  • CONFLUENT_CLOUD_API_SECRET
できること: 最小構成のすべてに加え、データスキーマの一覧表示と検査、環境とクラスターの探索、運用メトリクスのクエリ、課金コストの表示。必要に応じて1つ以上のオプションスコープグループを追加:
  • Flink: FLINK_REST_ENDPOINT, FLINK_API_KEY, FLINK_API_SECRET, FLINK_COMPUTE_POOL_ID, FLINK_ENV_ID
  • Tableflow: TABLEFLOW_API_KEY, TABLEFLOW_API_SECRET
できること: 標準構成のすべてに加え、Flink SQLステートメントの作成と管理、FlinkカタログとデータベースSlice探索、ストリーミングクエリのヘルスチェック、カタログ統合(例: AWS Glue)を含むTableflow対応トピックの管理。

接続フィールドテンプレート

このテンプレートを使用して、有効にしたスコープの値を入力してください:
{
  "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
  "KAFKA_API_KEY": "<kafka-api-key>",
  "KAFKA_API_SECRET": "<kafka-api-secret>",
  "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
  "KAFKA_CLUSTER_ID": "lkc-xxxxx",
  "KAFKA_ENV_ID": "env-xxxxx",

  "SCHEMA_REGISTRY_ENDPOINT": "https://psrc-xxxxx.<region>.<provider>.confluent.cloud",
  "SCHEMA_REGISTRY_API_KEY": "<schema-registry-api-key>",
  "SCHEMA_REGISTRY_API_SECRET": "<schema-registry-api-secret>",

  "FLINK_API_KEY": "<flink-api-key>",
  "FLINK_API_SECRET": "<flink-api-secret>",
  "FLINK_COMPUTE_POOL_ID": "lfcp-xxxxx",
  "FLINK_ENV_ID": "env-xxxxx",
  "FLINK_REST_ENDPOINT": "https://flink.<region>.<provider>.confluent.cloud",
  "FLINK_ORG_ID": "<org-id>",

  "CONFLUENT_CLOUD_API_KEY": "<cloud-api-key>",
  "CONFLUENT_CLOUD_API_SECRET": "<cloud-api-secret>",

  "TABLEFLOW_API_KEY": "<tableflow-api-key>",
  "TABLEFLOW_API_SECRET": "<tableflow-api-secret>"
}

接続の詳細

接続フィールドはJSON認証情報ファイルとして送信します。フィールドはプラットフォームと有効なスコープによって異なります。詳細はセットアップセクションの全テンプレートを参照してください。
CloudThinkerは部分的なスコープのオンボーディングをサポートしています。Kafkaのみのフィールドから始めて、後でSchema Registry、Flink、Cloud API、またはTableflowの認証情報を追加できます。
フィールドプラットフォーム説明
BOOTSTRAP_SERVERS両方Kafkaクラスターのブートストラップアドレス(必須)
KAFKA_API_KEY / KAFKA_API_SECRETConfluent CloudKafkaスコープの認証情報
SCHEMA_REGISTRY_ENDPOINT両方Schema Registry URL
SCHEMA_REGISTRY_API_KEY / SCHEMA_REGISTRY_API_SECRETConfluent CloudSchema Registry認証情報
FLINK_REST_ENDPOINTConfluent CloudFlinkリージョンエンドポイント
CONFLUENT_CLOUD_API_KEY / CONFLUENT_CLOUD_API_SECRETConfluent Cloudクラウド管理認証情報
TABLEFLOW_API_KEY / TABLEFLOW_API_SECRETConfluent CloudTableflow認証情報

必要な権限

Confluent Cloudでは、Service Account を使用し、各APIキーに必要なスコープのみを付与してください。Kafkaのみの認証情報から始めて、追加スコープを段階的に追加してください。
Confluent Cloud: スコープごとに別々のAPIキーとシークレットのペアを作成します。Kafka ACLをCloudThinkerが必要とする特定のトピックに制限します。クラウド管理認証情報には最低限MetricsViewerロールが必要です。 セルフホスト Kafka: APIキーは不要です。ブローカーのブートストラップアドレスがCloudThinkerからポート9092でネットワーク到達可能であることを確認してください。

エージェント機能

接続後、AlexTony は次のことができます:
機能説明
コンシューマーラグ監視コンシューマーグループごとのラグを追跡し、遅いコンシューマーを特定
トピックヘルス分析パーティション分散、レプリケーションファクター、レプリカ不足パーティションを確認
スループットメトリクストピックごとのバイト入出力とメッセージレートを監視
ブローカーヘルスブローカーの可用性とISR(In-Sync Replicas)ステータスを追跡

接続を確認

@alex list all Kafka topics and check consumer group lag for the active consumer groups

プロンプト例

@alex check consumer lag for the orders-service group
@tony analyze message throughput trends for the events topic
@alex identify under-replicated partitions and #report the affected topics

トラブルシューティング

  • Kafkaブローカープロセスが <broker-name>.<your-domain> で実行中であることを確認してください。
  • ブローカーポート(デフォルト9092)が開放されており、ファイアウォールでブロックされていないことを確認してください。
  • ブートストラップサーバーアドレス <broker-name>.<your-domain>:9092 が正しく、CloudThinkerから到達可能であることを確認してください。
  • ローカル開発の場合、KafkaがアクセスしやすいIP(127.0.0.1 だけでなく)にバインドされていることを確認してください。
部分的なスコープのオンボーディングを使用する場合、未使用スコープのキーと値のペア全体を削除してください。空文字列は残さないでください。正しい形式(Kafkaのみ、Schema Registryを完全に削除):
{
  "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
  "KAFKA_API_KEY": "<kafka-api-key>",
  "KAFKA_API_SECRET": "<kafka-api-secret>",
  "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
  "KAFKA_CLUSTER_ID": "lkc-xxxxx",
  "KAFKA_ENV_ID": "env-xxxxx"
}
誤った形式(空文字列の値はバリデーションエラーを引き起こします):
{
  "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
  "KAFKA_API_KEY": "<kafka-api-key>",
  "KAFKA_API_SECRET": "<kafka-api-secret>",
  "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
  "KAFKA_CLUSTER_ID": "lkc-xxxxx",
  "KAFKA_ENV_ID": "env-xxxxx",
  "SCHEMA_REGISTRY_ENDPOINT": "",
  "SCHEMA_REGISTRY_API_KEY": ""
}
SCHEMA_REGISTRY_ENDPOINT URLが正しく、CloudThinkerから到達可能であることを確認してください。セルフホストの場合、ポート8081が開放されていることを確認してください。Confluent Cloudの場合、Schema Registry APIキーがお使いの環境に対して正しい権限を持っていることを確認してください。

セキュリティ

  • 最小権限 — エージェントがユースケースに必要な権限のみを付与します。まず読み取り専用から始め、後から拡張してください。
  • デフォルトで読み取り専用 — エージェントにこの接続で変更を行わせる場合を除き、読み取り専用の認証情報を使用してください。
  • 認証情報のローテーション — 通常のスケジュールに従ってキーとトークンをローテーションしてください。接続を更新すると、CloudThinker が新しい値を自動的に取得します。
  • オフボーディング時に失効 — 接続を削除するか、チームメンバーが退職する際には、プロバイダー側で認証情報を無効化してください。
  • スコープ限定APIキー — CloudThinkerが必要とするスコープのみを付与し、Kafkaのみから始めてSchema Registry、Flink、またはクラウド管理スコープを段階的に追加してください
  • ネットワーク制限 — セキュリティグループまたはファイアウォールルールで、ブートストラップおよびRESTエンドポイントをCloudThinkerのエグレスIPに制限してください

関連情報

Alex エージェント

クラウドインフラストラクチャとストリーミング最適化エージェント
https://mintcdn.com/cloudthinker/aLd-ttc-SCW-aFky/images/icons/aws.svg?fit=max&auto=format&n=aLd-ttc-SCW-aFky&q=85&s=45d526a3e9345214c0345f277da2e829

AWS 接続

AWSクラウドリソースのセットアップ手順