> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cloudthinker.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

> Connect Apache Kafka on Confluent Cloud to CloudThinker with scope-based credentials for stream monitoring and management

Connect your Apache Kafka clusters to enable [Alex](/guide/agents/alex) (Cloud Engineer) and [Tony](/guide/agents/tony) (Database Engineer) to monitor topic health, analyze consumer lag, and optimize streaming performance.

***

## Supported Platforms

| Platform              | Support                |
| --------------------- | ---------------------- |
| **Confluent Cloud**   | All tiers              |
| **Self-hosted Kafka** | 2.8+ (KRaft mode), 3.x |

***

## Setup

Select your Kafka platform for specific connection instructions:

<Tabs>
  <Tab title="Confluent Cloud">
    <Steps>
      <Step title="Open Confluent Cloud and pick your environment">
        Go to [confluent.cloud/home](https://confluent.cloud/home), then open **Environments**.

        Click the environment you want to connect.

        The environment ID appears in the URL after you select it (for example, `env-xxxxx`).

        Example navigation:

        * Environment list: `https://confluent.cloud/environments`
        * Selected environment URL pattern: `https://confluent.cloud/environments/<env-id>/overview`
      </Step>

      <Step title="Get Kafka cluster fields">
        Inside the selected environment, open **Clusters** and click your target cluster (for example, `<cluster-name>`).

        Collect:

        * `BOOTSTRAP_SERVERS`
        * `KAFKA_REST_ENDPOINT`
        * `KAFKA_CLUSTER_ID`

        Keep `KAFKA_ENV_ID` as the selected environment ID from Step 1.
      </Step>

      <Step title="Create scoped API keys and secrets">
        Go to [confluent.cloud/settings/api-keys](https://confluent.cloud/settings/api-keys) and click **+ Add API Key**.

        Choose **Service Account** for production workloads, or **My Account** for development/testing.

        Select the desired scope in Confluent onboarding, then save the generated API key and API secret pair.

        Scopes you may create keys for:

        * Kafka cluster
        * Schema Registry
        * ksqlDB cluster
        * Flink region
        * Cloud resource management
        * Tableflow
      </Step>

      <Step title="Get Schema Registry endpoint (optional)">
        In the selected environment, open **Stream Governance -> Schema Registry**.

        Collect:

        * `SCHEMA_REGISTRY_ENDPOINT`

        URL pattern example:
        `https://confluent.cloud/environments/<env-id>/stream-governance/schema-registry/overview`
      </Step>

      <Step title="Get Flink fields (optional)">
        In the selected environment, open **Flink**.

        Open **Compute pools** and create a pool with **+ Add compute pool** if needed.

        Click the target compute pool and collect:

        * `FLINK_COMPUTE_POOL_ID`
        * `FLINK_ENV_ID` (same environment ID from URL)

        URL pattern example:
        `https://confluent.cloud/environments/<env-id>/flink/pools/<compute-pool-id>/overview`

        Set `FLINK_REST_ENDPOINT` from your cloud provider and region (AWS, Azure, or GCP; for example `<region-code>`).
      </Step>

      <Step title="Get organization ID (optional)">
        Go to [confluent.cloud/settings/organizations/edit](https://confluent.cloud/settings/organizations/edit) and collect:

        * `FLINK_ORG_ID`
      </Step>

      <Step title="Add connection in CloudThinker">
        In CloudThinker, navigate to **Connections → Kafka**.

        Create a JSON file with the fields for the scopes you enabled (see [Connection Field Template](#connection-field-template) below). Upload this JSON file in the connection form.

        Required fields depend on your profile — see [Profiles](#profiles) for details.

        You can leave optional scope fields empty and add them later.
      </Step>
    </Steps>

    ## Scope-Based Credential Model

    Confluent Cloud uses scope-based API credentials. Each API key and secret pair grants access to a specific resource scope.

    You can start with Kafka-only fields, then add Schema Registry, Flink, Cloud API, or Tableflow fields later.

    | Scope                         | What It Unlocks                                                                                          | Typical Fields                                                                                                      |
    | ----------------------------- | -------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- |
    | **Kafka cluster**             | Manage topics (list, create, delete, configure), produce/consume messages, view cluster metadata         | `BOOTSTRAP_SERVERS`, `KAFKA_API_KEY`, `KAFKA_API_SECRET`, `KAFKA_CLUSTER_ID`, `KAFKA_ENV_ID`, `KAFKA_REST_ENDPOINT` |
    | **Schema Registry**           | List, inspect, and delete data schemas                                                                   | `SCHEMA_REGISTRY_ENDPOINT`, `SCHEMA_REGISTRY_API_KEY`, `SCHEMA_REGISTRY_API_SECRET`                                 |
    | **Flink region**              | Create and manage Flink SQL statements, explore catalogs/databases/tables, health checks and diagnostics | `FLINK_REST_ENDPOINT`, `FLINK_API_KEY`, `FLINK_API_SECRET`, `FLINK_COMPUTE_POOL_ID`, `FLINK_ENV_ID`                 |
    | **Cloud resource management** | Discover environments and clusters, query operational metrics and billing costs                          | `CONFLUENT_CLOUD_API_KEY`, `CONFLUENT_CLOUD_API_SECRET`                                                             |
    | **Tableflow**                 | Manage Tableflow-enabled topics and catalog integrations (e.g., AWS Glue)                                | `TABLEFLOW_API_KEY`, `TABLEFLOW_API_SECRET`                                                                         |
    | **Organization metadata**     | Organization-level context for Flink resource management                                                 | `FLINK_ORG_ID`                                                                                                      |

    ## Profiles

    ### Minimal (Kafka-only)

    Required:

    * `BOOTSTRAP_SERVERS`
    * `KAFKA_API_KEY`
    * `KAFKA_API_SECRET`
    * `KAFKA_CLUSTER_ID`
    * `KAFKA_ENV_ID`

    **What you can do:** Manage topics (list, create, delete, configure), produce and consume messages, view cluster metadata and topic configurations.

    ### Standard (Kafka + Schema Registry + Cloud Management)

    Add:

    * `SCHEMA_REGISTRY_ENDPOINT`
    * `SCHEMA_REGISTRY_API_KEY`
    * `SCHEMA_REGISTRY_API_SECRET`
    * `CONFLUENT_CLOUD_API_KEY`
    * `CONFLUENT_CLOUD_API_SECRET`

    **What you can do:** Everything in Minimal, plus list and inspect data schemas, discover environments and clusters, query operational metrics, and view billing costs.

    ### Advanced (Flink / Tableflow)

    Add one or more optional scope groups as needed:

    * **Flink:** `FLINK_REST_ENDPOINT`, `FLINK_API_KEY`, `FLINK_API_SECRET`, `FLINK_COMPUTE_POOL_ID`, `FLINK_ENV_ID`
    * **Tableflow:** `TABLEFLOW_API_KEY`, `TABLEFLOW_API_SECRET`

    **What you can do:** Everything in Standard, plus create and manage Flink SQL statements, explore Flink catalogs and databases, run health checks on streaming queries, and manage Tableflow-enabled topics with catalog integrations (e.g., AWS Glue).

    ## Connection Field Template

    Use this template and fill values for your enabled scopes:

    ```json theme={null}
    {
      "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
      "KAFKA_API_KEY": "<kafka-api-key>",
      "KAFKA_API_SECRET": "<kafka-api-secret>",
      "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
      "KAFKA_CLUSTER_ID": "lkc-xxxxx",
      "KAFKA_ENV_ID": "env-xxxxx",

      "SCHEMA_REGISTRY_ENDPOINT": "https://psrc-xxxxx.<region>.<provider>.confluent.cloud",
      "SCHEMA_REGISTRY_API_KEY": "<schema-registry-api-key>",
      "SCHEMA_REGISTRY_API_SECRET": "<schema-registry-api-secret>",

      "FLINK_API_KEY": "<flink-api-key>",
      "FLINK_API_SECRET": "<flink-api-secret>",
      "FLINK_COMPUTE_POOL_ID": "lfcp-xxxxx",
      "FLINK_ENV_ID": "env-xxxxx",
      "FLINK_REST_ENDPOINT": "https://flink.<region>.<provider>.confluent.cloud",
      "FLINK_ORG_ID": "<org-id>",

      "CONFLUENT_CLOUD_API_KEY": "<cloud-api-key>",
      "CONFLUENT_CLOUD_API_SECRET": "<cloud-api-secret>",

      "TABLEFLOW_API_KEY": "<tableflow-api-key>",
      "TABLEFLOW_API_SECRET": "<tableflow-api-secret>"
    }
    ```
  </Tab>

  <Tab title="Self-hosted Kafka">
    <Steps>
      <Step title="Ensure Network Access">
        Ensure the CloudThinker application can reach your Kafka broker at `<broker-name>.<your-domain>:9092`.
      </Step>

      <Step title="Add Connection in CloudThinker">
        In CloudThinker, navigate to **Connections → Kafka**.

        Create a JSON file with the fields for the scopes you enabled (see [Connection Field Template](#connection-field-template-2) below). Upload this JSON file in the connection form.

        Required fields depend on your profile — see [Profiles](#profiles-2) below for details.
      </Step>
    </Steps>

    ## Scope-Based Credential Model

    Self-hosted Kafka uses scope-based configuration. You can start with Kafka-only fields, then add Schema Registry fields later.

    | Scope               | What It Unlocks                                                | Typical Fields             |
    | ------------------- | -------------------------------------------------------------- | -------------------------- |
    | **Kafka cluster**   | Manage topics (list, create, delete), produce/consume messages | `BOOTSTRAP_SERVERS`        |
    | **Schema Registry** | List, inspect, and delete data schemas                         | `SCHEMA_REGISTRY_ENDPOINT` |

    ## Profiles

    ### Minimal (Kafka-only)

    Required:

    * `BOOTSTRAP_SERVERS`

    **What you can do:** Manage topics (list, create, delete), produce and consume messages.

    ### Standard (With Schema Registry)

    Add:

    * `SCHEMA_REGISTRY_ENDPOINT` (typically port 8081)

    **What you can do:** Everything in Minimal, plus list, inspect, and delete data schemas.

    ## Connection Field Template

    Use this template and fill values for your enabled scopes:

    ```json theme={null}
    {
      "BOOTSTRAP_SERVERS": "<broker-name>.<your-domain>:9092",
      "SCHEMA_REGISTRY_ENDPOINT": "http://<schema-registry-host>:8081"
    }
    ```
  </Tab>
</Tabs>

***

## Agent Capabilities

Once connected, [Alex](/guide/agents/alex) and [Tony](/guide/agents/tony) can:

| Capability                  | Description                                                                   |
| --------------------------- | ----------------------------------------------------------------------------- |
| **Consumer Lag Monitoring** | Track lag per consumer group, identify slow consumers                         |
| **Topic Health Analysis**   | Check partition distribution, replication factor, under-replicated partitions |
| **Throughput Metrics**      | Monitor bytes in/out, message rates per topic                                 |
| **Broker Health**           | Track broker availability, ISR (In-Sync Replicas) status                      |

### Example Prompts

```bash theme={null}
@alex check consumer lag for the orders-service group
@alex identify under-replicated partitions
@tony analyze message throughput trends for the events topic
@tony check data retention policies across all topics
```

***

## Troubleshooting

<Accordion title="Connection refused or Timeout">
  * Verify the Kafka broker process is running on `<broker-name>.<your-domain>`.
  * Check that the broker port (default 9092) is open and not blocked by firewall.
  * Verify the bootstrap server address `<broker-name>.<your-domain>:9092` is correct and reachable from CloudThinker.
  * For local development, ensure Kafka is bound to an accessible IP (not just `127.0.0.1`).
</Accordion>

<Accordion title="Confluent Cloud partial scope fields">
  When using partial scope onboarding, **remove the entire key-value pair** for unused scopes. Do not leave empty strings.

  **Correct** (Kafka-only, Schema Registry removed entirely):

  ```json theme={null}
  {
    "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
    "KAFKA_API_KEY": "<kafka-api-key>",
    "KAFKA_API_SECRET": "<kafka-api-secret>",
    "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
    "KAFKA_CLUSTER_ID": "lkc-xxxxx",
    "KAFKA_ENV_ID": "env-xxxxx"
  }
  ```

  **Incorrect** (empty string values cause validation errors):

  ```json theme={null}
  {
    "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
    "KAFKA_API_KEY": "<kafka-api-key>",
    "KAFKA_API_SECRET": "<kafka-api-secret>",
    "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
    "KAFKA_CLUSTER_ID": "lkc-xxxxx",
    "KAFKA_ENV_ID": "env-xxxxx",
    "SCHEMA_REGISTRY_ENDPOINT": "",
    "SCHEMA_REGISTRY_API_KEY": ""
  }
  ```
</Accordion>

***

## Security Best Practices

### For Confluent Cloud

* **Network restrictions** - Restrict Kafka access to CloudThinker IPs via security groups.
* **Secure credentials** - Store secrets in a secure manager and rotate keys regularly.

### For Self-hosted Kafka

* **Network restrictions** - Restrict broker access to CloudThinker IPs via firewalls.
* **Private networks** - Keep brokers in private subnets, not exposed to the public internet.

<Note>
  CloudThinker supports partial scope onboarding. If you only provide Kafka scope fields first, you can still create the connection and add Schema Registry, Flink, Cloud API, or Tableflow credentials later.
</Note>

***

## Related

<CardGroup cols={2}>
  <Card title="Alex Agent" icon="server" href="/guide/agents/alex">
    Cloud infrastructure and streaming optimization agent
  </Card>

  <Card title="AWS Connection" icon="https://mintcdn.com/cloudthinker/aLd-ttc-SCW-aFky/images/icons/aws.svg?fit=max&auto=format&n=aLd-ttc-SCW-aFky&q=85&s=45d526a3e9345214c0345f277da2e829" href="/guide/connections/aws" width="24" height="24" data-path="images/icons/aws.svg">
    Setup instructions for AWS cloud resources
  </Card>
</CardGroup>
