> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cloudthinker.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

> スコープベースの認証情報を使用してConfluent Cloud上のApache KafkaをCloudThinkerに接続し、ストリーム監視と管理を行います

Apache Kafkaクラスターを接続することで、[Alex](/ja/guide/agents/alex)（Cloud Engineer）と[Tony](/ja/guide/agents/tony)（Database Engineer）がトピックの健全性を監視し、コンシューマーラグを分析し、ストリーミングパフォーマンスを最適化できるようになります。

Kafka接続は、スコープごとに別々のAPIキーペアを持つJSON認証情報ファイル（Confluent Cloud）またはブートストラップアドレス（セルフホスト）として送信します。

***

## 対応プラットフォーム

| プラットフォーム            | サポート               |
| ------------------- | ------------------ |
| **Confluent Cloud** | 全プラン               |
| **セルフホスト Kafka**    | 2.8+（KRaftモード）、3.x |

***

## 前提条件

* 少なくとも1つのKafka環境とクラスターを持つ **Confluent Cloud** アカウント、または CloudThinkerから到達可能なセルフホスト **Kafka 2.8+**（KRaftモード）もしくは **3.x** クラスター。
* Confluent Cloudの場合: **confluent.cloud/settings/api-keys** でAPIキーを作成するための管理者アクセス。
* CloudThinkerからKafkaクラスターのブートストラップサーバーおよびRESTエンドポイントへのネットワークアクセス。

***

## セットアップ

Kafkaプラットフォームを選択して、接続手順を確認してください:

<Tabs>
  <Tab title="Confluent Cloud">
    <Steps>
      <Step title="Confluent Cloudを開いて環境を選択">
        [confluent.cloud/home](https://confluent.cloud/home) にアクセスし、**Environments** を開きます。

        接続したい環境をクリックします。

        環境IDは選択後のURLに表示されます（例: `env-xxxxx`）。

        ナビゲーション例:

        * 環境一覧: `https://confluent.cloud/environments`
        * 選択した環境のURLパターン: `https://confluent.cloud/environments/<env-id>/overview`
      </Step>

      <Step title="Kafkaクラスターのフィールドを取得">
        選択した環境内で **Clusters** を開き、対象クラスター（例: `<cluster-name>`）をクリックします。

        以下を収集します:

        * `BOOTSTRAP_SERVERS`
        * `KAFKA_REST_ENDPOINT`
        * `KAFKA_CLUSTER_ID`

        `KAFKA_ENV_ID` はステップ1で取得した環境IDを使用します。
      </Step>

      <Step title="スコープ別APIキーとシークレットを作成">
        [confluent.cloud/settings/api-keys](https://confluent.cloud/settings/api-keys) にアクセスし、**+ Add API Key** をクリックします。

        本番ワークロードには **Service Account**、開発・テストには **My Account** を選択します。

        Confluentのオンボーディングで目的のスコープを選択し、生成されたAPIキーとAPIシークレットのペアを保存します。

        作成できるスコープ:

        * Kafkaクラスター
        * Schema Registry
        * ksqlDBクラスター
        * Flinkリージョン
        * クラウドリソース管理
        * Tableflow
      </Step>

      <Step title="Schema Registryエンドポイントを取得（オプション）">
        選択した環境で **Stream Governance -> Schema Registry** を開きます。

        以下を収集します:

        * `SCHEMA_REGISTRY_ENDPOINT`

        URLパターン例:
        `https://confluent.cloud/environments/<env-id>/stream-governance/schema-registry/overview`
      </Step>

      <Step title="Flinkフィールドを取得（オプション）">
        選択した環境で **Flink** を開きます。

        **Compute pools** を開き、必要であれば **+ Add compute pool** でプールを作成します。

        対象のコンピュートプールをクリックして以下を収集します:

        * `FLINK_COMPUTE_POOL_ID`
        * `FLINK_ENV_ID`（URLから取得した環境ID）

        URLパターン例:
        `https://confluent.cloud/environments/<env-id>/flink/pools/<compute-pool-id>/overview`

        `FLINK_REST_ENDPOINT` はクラウドプロバイダーとリージョン（AWS、Azure、またはGCP。例: `<region-code>`）から設定します。
      </Step>

      <Step title="組織IDを取得（オプション）">
        [confluent.cloud/settings/organizations/edit](https://confluent.cloud/settings/organizations/edit) にアクセスして以下を収集します:

        * `FLINK_ORG_ID`
      </Step>

      <Step title="CloudThinkerで接続を追加">
        CloudThinkerで **Connections → Kafka** に移動します。

        有効にしたスコープのフィールドを含むJSONファイルを作成します（下記の[接続フィールドテンプレート](#接続フィールドテンプレート)を参照）。接続フォームにこのJSONファイルをアップロードします。

        必須フィールドはプロファイルによって異なります。詳細は[プロファイル](#プロファイル)を参照してください。

        **Connect** をクリックします。CloudThinkerが認証情報を検証し、**Connected** ステータスを表示します。
      </Step>
    </Steps>

    ### スコープベースの認証情報モデル

    Confluent CloudはスコープベースのAPI認証情報を使用します。各APIキーとシークレットのペアは、特定のリソーススコープへのアクセスを付与します。

    Kafkaのみのフィールドから始めて、後でSchema Registry、Flink、Cloud API、またはTableflowのフィールドを追加できます。

    | スコープ                | アンロックされる機能                                               | 主なフィールド                                                                                                             |
    | ------------------- | -------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- |
    | **Kafkaクラスター**      | トピックの管理（一覧表示・作成・削除・設定）、メッセージのプロデュース/コンシューム、クラスターメタデータの表示 | `BOOTSTRAP_SERVERS`, `KAFKA_API_KEY`, `KAFKA_API_SECRET`, `KAFKA_CLUSTER_ID`, `KAFKA_ENV_ID`, `KAFKA_REST_ENDPOINT` |
    | **Schema Registry** | データスキーマの一覧表示・検査・削除                                       | `SCHEMA_REGISTRY_ENDPOINT`, `SCHEMA_REGISTRY_API_KEY`, `SCHEMA_REGISTRY_API_SECRET`                                 |
    | **Flinkリージョン**      | Flink SQLステートメントの作成と管理、カタログ/データベース/テーブルの探索、ヘルスチェックと診断    | `FLINK_REST_ENDPOINT`, `FLINK_API_KEY`, `FLINK_API_SECRET`, `FLINK_COMPUTE_POOL_ID`, `FLINK_ENV_ID`                 |
    | **クラウドリソース管理**      | 環境とクラスターの探索、運用メトリクスと課金コストのクエリ                            | `CONFLUENT_CLOUD_API_KEY`, `CONFLUENT_CLOUD_API_SECRET`                                                             |
    | **Tableflow**       | Tableflow対応トピックとカタログ統合の管理（例: AWS Glue）                   | `TABLEFLOW_API_KEY`, `TABLEFLOW_API_SECRET`                                                                         |
    | **組織メタデータ**         | Flinkリソース管理のための組織レベルのコンテキスト                              | `FLINK_ORG_ID`                                                                                                      |

    ### プロファイル

    #### 最小構成（Kafkaのみ）

    必須:

    * `BOOTSTRAP_SERVERS`
    * `KAFKA_API_KEY`
    * `KAFKA_API_SECRET`
    * `KAFKA_CLUSTER_ID`
    * `KAFKA_ENV_ID`

    **できること:** トピックの管理（一覧表示・作成・削除・設定）、メッセージのプロデュース/コンシューム、クラスターメタデータとトピック設定の表示。

    #### 標準構成（Kafka + Schema Registry + クラウド管理）

    追加:

    * `SCHEMA_REGISTRY_ENDPOINT`
    * `SCHEMA_REGISTRY_API_KEY`
    * `SCHEMA_REGISTRY_API_SECRET`
    * `CONFLUENT_CLOUD_API_KEY`
    * `CONFLUENT_CLOUD_API_SECRET`

    **できること:** 最小構成のすべてに加え、データスキーマの一覧表示と検査、環境とクラスターの探索、運用メトリクスのクエリ、課金コストの表示。

    #### 高度な構成（Flink / Tableflow）

    必要に応じて1つ以上のオプションスコープグループを追加:

    * **Flink:** `FLINK_REST_ENDPOINT`, `FLINK_API_KEY`, `FLINK_API_SECRET`, `FLINK_COMPUTE_POOL_ID`, `FLINK_ENV_ID`
    * **Tableflow:** `TABLEFLOW_API_KEY`, `TABLEFLOW_API_SECRET`

    **できること:** 標準構成のすべてに加え、Flink SQLステートメントの作成と管理、FlinkカタログとデータベースSlice探索、ストリーミングクエリのヘルスチェック、カタログ統合（例: AWS Glue）を含むTableflow対応トピックの管理。

    ### 接続フィールドテンプレート

    このテンプレートを使用して、有効にしたスコープの値を入力してください:

    ```json theme={null}
    {
      "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
      "KAFKA_API_KEY": "<kafka-api-key>",
      "KAFKA_API_SECRET": "<kafka-api-secret>",
      "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
      "KAFKA_CLUSTER_ID": "lkc-xxxxx",
      "KAFKA_ENV_ID": "env-xxxxx",

      "SCHEMA_REGISTRY_ENDPOINT": "https://psrc-xxxxx.<region>.<provider>.confluent.cloud",
      "SCHEMA_REGISTRY_API_KEY": "<schema-registry-api-key>",
      "SCHEMA_REGISTRY_API_SECRET": "<schema-registry-api-secret>",

      "FLINK_API_KEY": "<flink-api-key>",
      "FLINK_API_SECRET": "<flink-api-secret>",
      "FLINK_COMPUTE_POOL_ID": "lfcp-xxxxx",
      "FLINK_ENV_ID": "env-xxxxx",
      "FLINK_REST_ENDPOINT": "https://flink.<region>.<provider>.confluent.cloud",
      "FLINK_ORG_ID": "<org-id>",

      "CONFLUENT_CLOUD_API_KEY": "<cloud-api-key>",
      "CONFLUENT_CLOUD_API_SECRET": "<cloud-api-secret>",

      "TABLEFLOW_API_KEY": "<tableflow-api-key>",
      "TABLEFLOW_API_SECRET": "<tableflow-api-secret>"
    }
    ```
  </Tab>

  <Tab title="セルフホスト Kafka">
    <Steps>
      <Step title="ネットワークアクセスを確認">
        CloudThinkerアプリケーションが `<broker-name>.<your-domain>:9092` でKafkaブローカーに到達できることを確認します。
      </Step>

      <Step title="CloudThinkerで接続を追加">
        CloudThinkerで **Connections → Kafka** に移動します。

        有効にしたスコープのフィールドを含むJSONファイルを作成します（下記の[接続フィールドテンプレート](#接続フィールドテンプレート-2)を参照）。接続フォームにこのJSONファイルをアップロードします。

        必須フィールドはプロファイルによって異なります。詳細は下記の[プロファイル](#プロファイル-2)を参照してください。

        **Connect** をクリックします。CloudThinkerが認証情報を検証し、**Connected** ステータスを表示します。
      </Step>
    </Steps>

    ### スコープベースの認証情報モデル

    セルフホストKafkaはスコープベースの設定を使用します。Kafkaのみのフィールドから始めて、後でSchema Registryのフィールドを追加できます。

    | スコープ                | アンロックされる機能                              | 主なフィールド                    |
    | ------------------- | --------------------------------------- | -------------------------- |
    | **Kafkaクラスター**      | トピックの管理（一覧表示・作成・削除）、メッセージのプロデュース/コンシューム | `BOOTSTRAP_SERVERS`        |
    | **Schema Registry** | データスキーマの一覧表示・検査・削除                      | `SCHEMA_REGISTRY_ENDPOINT` |

    ### プロファイル

    #### 最小構成（Kafkaのみ）

    必須:

    * `BOOTSTRAP_SERVERS`

    **できること:** トピックの管理（一覧表示・作成・削除）、メッセージのプロデュース/コンシューム。

    #### 標準構成（Schema Registry付き）

    追加:

    * `SCHEMA_REGISTRY_ENDPOINT`（通常ポート8081）

    **できること:** 最小構成のすべてに加え、データスキーマの一覧表示・検査・削除。

    ### 接続フィールドテンプレート

    このテンプレートを使用して、有効にしたスコープの値を入力してください:

    ```json theme={null}
    {
      "BOOTSTRAP_SERVERS": "<broker-name>.<your-domain>:9092",
      "SCHEMA_REGISTRY_ENDPOINT": "http://<schema-registry-host>:8081"
    }
    ```
  </Tab>
</Tabs>

***

## 接続の詳細

接続フィールドはJSON認証情報ファイルとして送信します。フィールドはプラットフォームと有効なスコープによって異なります。詳細は[セットアップ](#セットアップ)セクションの全テンプレートを参照してください。

<Note>
  CloudThinkerは部分的なスコープのオンボーディングをサポートしています。Kafkaのみのフィールドから始めて、後でSchema Registry、Flink、Cloud API、またはTableflowの認証情報を追加できます。
</Note>

| フィールド                                                    | プラットフォーム        | 説明                          |
| -------------------------------------------------------- | --------------- | --------------------------- |
| `BOOTSTRAP_SERVERS`                                      | 両方              | Kafkaクラスターのブートストラップアドレス（必須） |
| `KAFKA_API_KEY` / `KAFKA_API_SECRET`                     | Confluent Cloud | Kafkaスコープの認証情報              |
| `SCHEMA_REGISTRY_ENDPOINT`                               | 両方              | Schema Registry URL         |
| `SCHEMA_REGISTRY_API_KEY` / `SCHEMA_REGISTRY_API_SECRET` | Confluent Cloud | Schema Registry認証情報         |
| `FLINK_REST_ENDPOINT`                                    | Confluent Cloud | Flinkリージョンエンドポイント           |
| `CONFLUENT_CLOUD_API_KEY` / `CONFLUENT_CLOUD_API_SECRET` | Confluent Cloud | クラウド管理認証情報                  |
| `TABLEFLOW_API_KEY` / `TABLEFLOW_API_SECRET`             | Confluent Cloud | Tableflow認証情報               |

***

## 必要な権限

<Tip>
  Confluent Cloudでは、**Service Account** を使用し、各APIキーに必要なスコープのみを付与してください。Kafkaのみの認証情報から始めて、追加スコープを段階的に追加してください。
</Tip>

**Confluent Cloud:** スコープごとに別々のAPIキーとシークレットのペアを作成します。Kafka ACLをCloudThinkerが必要とする特定のトピックに制限します。クラウド管理認証情報には最低限MetricsViewerロールが必要です。

**セルフホスト Kafka:** APIキーは不要です。ブローカーのブートストラップアドレスがCloudThinkerからポート9092でネットワーク到達可能であることを確認してください。

***

## エージェント機能

接続後、[Alex](/ja/guide/agents/alex) と [Tony](/ja/guide/agents/tony) は次のことができます:

| 機能              | 説明                                       |
| --------------- | ---------------------------------------- |
| **コンシューマーラグ監視** | コンシューマーグループごとのラグを追跡し、遅いコンシューマーを特定        |
| **トピックヘルス分析**   | パーティション分散、レプリケーションファクター、レプリカ不足パーティションを確認 |
| **スループットメトリクス** | トピックごとのバイト入出力とメッセージレートを監視                |
| **ブローカーヘルス**    | ブローカーの可用性とISR（In-Sync Replicas）ステータスを追跡  |

### 接続を確認

```text theme={null}
@alex list all Kafka topics and check consumer group lag for the active consumer groups
```

### プロンプト例

```text theme={null}
@alex check consumer lag for the orders-service group
@tony analyze message throughput trends for the events topic
@alex identify under-replicated partitions and #report the affected topics
```

***

## トラブルシューティング

<Accordion title="接続が拒否またはタイムアウト">
  * Kafkaブローカープロセスが `<broker-name>.<your-domain>` で実行中であることを確認してください。
  * ブローカーポート（デフォルト9092）が開放されており、ファイアウォールでブロックされていないことを確認してください。
  * ブートストラップサーバーアドレス `<broker-name>.<your-domain>:9092` が正しく、CloudThinkerから到達可能であることを確認してください。
  * ローカル開発の場合、KafkaがアクセスしやすいIP（`127.0.0.1` だけでなく）にバインドされていることを確認してください。
</Accordion>

<Accordion title="Confluent Cloudの部分スコープフィールド">
  部分的なスコープのオンボーディングを使用する場合、未使用スコープの**キーと値のペア全体を削除**してください。空文字列は残さないでください。

  **正しい形式**（Kafkaのみ、Schema Registryを完全に削除）:

  ```json theme={null}
  {
    "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
    "KAFKA_API_KEY": "<kafka-api-key>",
    "KAFKA_API_SECRET": "<kafka-api-secret>",
    "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
    "KAFKA_CLUSTER_ID": "lkc-xxxxx",
    "KAFKA_ENV_ID": "env-xxxxx"
  }
  ```

  **誤った形式**（空文字列の値はバリデーションエラーを引き起こします）:

  ```json theme={null}
  {
    "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
    "KAFKA_API_KEY": "<kafka-api-key>",
    "KAFKA_API_SECRET": "<kafka-api-secret>",
    "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
    "KAFKA_CLUSTER_ID": "lkc-xxxxx",
    "KAFKA_ENV_ID": "env-xxxxx",
    "SCHEMA_REGISTRY_ENDPOINT": "",
    "SCHEMA_REGISTRY_API_KEY": ""
  }
  ```
</Accordion>

<Accordion title="Schema Registry接続の失敗">
  `SCHEMA_REGISTRY_ENDPOINT` URLが正しく、CloudThinkerから到達可能であることを確認してください。セルフホストの場合、ポート8081が開放されていることを確認してください。Confluent Cloudの場合、Schema Registry APIキーがお使いの環境に対して正しい権限を持っていることを確認してください。
</Accordion>

***

## セキュリティ

* **最小権限** — エージェントがユースケースに必要な権限のみを付与します。まず読み取り専用から始め、後から拡張してください。
* **デフォルトで読み取り専用** — エージェントにこの接続で変更を行わせる場合を除き、読み取り専用の認証情報を使用してください。
* **認証情報のローテーション** — 通常のスケジュールに従ってキーとトークンをローテーションしてください。接続を更新すると、CloudThinker が新しい値を自動的に取得します。
* **オフボーディング時に失効** — 接続を削除するか、チームメンバーが退職する際には、プロバイダー側で認証情報を無効化してください。

- **スコープ限定APIキー** — CloudThinkerが必要とするスコープのみを付与し、Kafkaのみから始めてSchema Registry、Flink、またはクラウド管理スコープを段階的に追加してください
- **ネットワーク制限** — セキュリティグループまたはファイアウォールルールで、ブートストラップおよびRESTエンドポイントをCloudThinkerのエグレスIPに制限してください

***

## 関連情報

<CardGroup cols={2}>
  <Card title="Alex エージェント" icon="server" href="/ja/guide/agents/alex">
    クラウドインフラストラクチャとストリーミング最適化エージェント
  </Card>

  <Card title="AWS 接続" icon="https://mintcdn.com/cloudthinker/aLd-ttc-SCW-aFky/images/icons/aws.svg?fit=max&auto=format&n=aLd-ttc-SCW-aFky&q=85&s=45d526a3e9345214c0345f277da2e829" href="/ja/guide/connections/aws" width="24" height="24" data-path="images/icons/aws.svg">
    AWSクラウドリソースのセットアップ手順
  </Card>
</CardGroup>
