> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cloudthinker.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

> Kết nối Apache Kafka trên Confluent Cloud với CloudThinker bằng thông tin xác thực theo phạm vi để giám sát và quản lý stream

Kết nối các cluster Apache Kafka của bạn để cho phép [Alex](/vi/guide/agents/alex) (Kỹ sư Cloud) và [Tony](/vi/guide/agents/tony) (Kỹ sư Cơ sở dữ liệu) giám sát độ lành mạnh của topic, phân tích consumer lag và tối ưu hiệu suất streaming.

Kết nối Kafka được gửi dưới dạng tệp JSON chứa thông tin xác thực, với cặp API key riêng cho mỗi phạm vi (Confluent Cloud) hoặc địa chỉ bootstrap (self-hosted).

***

## Nền tảng hỗ trợ

| Nền tảng              | Hỗ trợ                 |
| --------------------- | ---------------------- |
| **Confluent Cloud**   | Tất cả các gói         |
| **Self-hosted Kafka** | 2.8+ (KRaft mode), 3.x |

***

## Điều kiện tiên quyết

* Tài khoản **Confluent Cloud** với ít nhất một Kafka environment và cluster, HOẶC cluster **Kafka 2.8+** (KRaft mode) hoặc **3.x** tự lưu trữ có thể truy cập từ CloudThinker.
* Với Confluent Cloud: quyền admin để tạo API key tại **confluent.cloud/settings/api-keys**.
* Quyền truy cập mạng từ CloudThinker đến các bootstrap server và REST endpoint của Kafka cluster.

***

## Thiết lập

Chọn nền tảng Kafka của bạn để xem hướng dẫn kết nối cụ thể:

<Tabs>
  <Tab title="Confluent Cloud">
    <Steps>
      <Step title="Mở Confluent Cloud và chọn environment">
        Vào [confluent.cloud/home](https://confluent.cloud/home), sau đó mở **Environments**.

        Nhấn vào environment bạn muốn kết nối.

        ID của environment xuất hiện trên URL sau khi bạn chọn (ví dụ: `env-xxxxx`).

        Ví dụ điều hướng:

        * Danh sách environment: `https://confluent.cloud/environments`
        * Mẫu URL khi chọn environment: `https://confluent.cloud/environments/<env-id>/overview`
      </Step>

      <Step title="Lấy thông tin Kafka cluster">
        Trong environment đã chọn, mở **Clusters** và nhấn vào cluster mục tiêu (ví dụ: `<cluster-name>`).

        Thu thập:

        * `BOOTSTRAP_SERVERS`
        * `KAFKA_REST_ENDPOINT`
        * `KAFKA_CLUSTER_ID`

        Giữ `KAFKA_ENV_ID` là ID environment đã chọn từ Bước 1.
      </Step>

      <Step title="Tạo API key và secret theo phạm vi">
        Vào [confluent.cloud/settings/api-keys](https://confluent.cloud/settings/api-keys) và nhấn **+ Add API Key**.

        Chọn **Service Account** cho môi trường production, hoặc **My Account** cho development/testing.

        Chọn phạm vi mong muốn trong quy trình onboarding của Confluent, sau đó lưu cặp API key và API secret được tạo.

        Các phạm vi bạn có thể tạo key:

        * Kafka cluster
        * Schema Registry
        * ksqlDB cluster
        * Flink region
        * Cloud resource management
        * Tableflow
      </Step>

      <Step title="Lấy endpoint Schema Registry (tùy chọn)">
        Trong environment đã chọn, mở **Stream Governance -> Schema Registry**.

        Thu thập:

        * `SCHEMA_REGISTRY_ENDPOINT`

        Ví dụ mẫu URL:
        `https://confluent.cloud/environments/<env-id>/stream-governance/schema-registry/overview`
      </Step>

      <Step title="Lấy thông tin Flink (tùy chọn)">
        Trong environment đã chọn, mở **Flink**.

        Mở **Compute pools** và tạo pool mới bằng **+ Add compute pool** nếu cần.

        Nhấn vào compute pool mục tiêu và thu thập:

        * `FLINK_COMPUTE_POOL_ID`
        * `FLINK_ENV_ID` (cùng ID environment từ URL)

        Ví dụ mẫu URL:
        `https://confluent.cloud/environments/<env-id>/flink/pools/<compute-pool-id>/overview`

        Đặt `FLINK_REST_ENDPOINT` theo nhà cung cấp cloud và region của bạn (AWS, Azure, hoặc GCP; ví dụ: `<region-code>`).
      </Step>

      <Step title="Lấy organization ID (tùy chọn)">
        Vào [confluent.cloud/settings/organizations/edit](https://confluent.cloud/settings/organizations/edit) và thu thập:

        * `FLINK_ORG_ID`
      </Step>

      <Step title="Thêm kết nối trong CloudThinker">
        Trong CloudThinker, điều hướng đến **Connections → Kafka**.

        Tạo tệp JSON với các trường cho phạm vi bạn đã bật (xem [Mẫu trường kết nối](#connection-field-template) bên dưới). Tải tệp JSON này lên trong biểu mẫu kết nối.

        Các trường bắt buộc phụ thuộc vào profile của bạn — xem [Profiles](#profiles) để biết chi tiết.

        Nhấn **Connect**. CloudThinker xác minh thông tin xác thực và hiển thị trạng thái **Connected**.
      </Step>
    </Steps>

    ### Mô hình thông tin xác thực theo phạm vi

    Confluent Cloud sử dụng thông tin xác thực API theo phạm vi. Mỗi cặp API key và secret cấp quyền truy cập vào một phạm vi tài nguyên cụ thể.

    Bạn có thể bắt đầu chỉ với các trường Kafka, sau đó thêm Schema Registry, Flink, Cloud API, hoặc Tableflow sau.

    | Phạm vi                       | Tính năng mở khóa                                                                                   | Trường thông thường                                                                                                 |
    | ----------------------------- | --------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- |
    | **Kafka cluster**             | Quản lý topic (liệt kê, tạo, xóa, cấu hình), produce/consume message, xem metadata cluster          | `BOOTSTRAP_SERVERS`, `KAFKA_API_KEY`, `KAFKA_API_SECRET`, `KAFKA_CLUSTER_ID`, `KAFKA_ENV_ID`, `KAFKA_REST_ENDPOINT` |
    | **Schema Registry**           | Liệt kê, kiểm tra và xóa data schema                                                                | `SCHEMA_REGISTRY_ENDPOINT`, `SCHEMA_REGISTRY_API_KEY`, `SCHEMA_REGISTRY_API_SECRET`                                 |
    | **Flink region**              | Tạo và quản lý Flink SQL statement, khám phá catalog/database/table, kiểm tra sức khỏe và chẩn đoán | `FLINK_REST_ENDPOINT`, `FLINK_API_KEY`, `FLINK_API_SECRET`, `FLINK_COMPUTE_POOL_ID`, `FLINK_ENV_ID`                 |
    | **Cloud resource management** | Khám phá environment và cluster, truy vấn metric vận hành và chi phí billing                        | `CONFLUENT_CLOUD_API_KEY`, `CONFLUENT_CLOUD_API_SECRET`                                                             |
    | **Tableflow**                 | Quản lý topic hỗ trợ Tableflow và tích hợp catalog (ví dụ: AWS Glue)                                | `TABLEFLOW_API_KEY`, `TABLEFLOW_API_SECRET`                                                                         |
    | **Organization metadata**     | Ngữ cảnh cấp tổ chức cho quản lý tài nguyên Flink                                                   | `FLINK_ORG_ID`                                                                                                      |

    ### Profiles

    #### Tối thiểu (chỉ Kafka)

    Bắt buộc:

    * `BOOTSTRAP_SERVERS`
    * `KAFKA_API_KEY`
    * `KAFKA_API_SECRET`
    * `KAFKA_CLUSTER_ID`
    * `KAFKA_ENV_ID`

    **Bạn có thể làm gì:** Quản lý topic (liệt kê, tạo, xóa, cấu hình), produce và consume message, xem metadata cluster và cấu hình topic.

    #### Tiêu chuẩn (Kafka + Schema Registry + Cloud Management)

    Thêm:

    * `SCHEMA_REGISTRY_ENDPOINT`
    * `SCHEMA_REGISTRY_API_KEY`
    * `SCHEMA_REGISTRY_API_SECRET`
    * `CONFLUENT_CLOUD_API_KEY`
    * `CONFLUENT_CLOUD_API_SECRET`

    **Bạn có thể làm gì:** Tất cả những gì trong Tối thiểu, cộng thêm liệt kê và kiểm tra data schema, khám phá environment và cluster, truy vấn metric vận hành và xem chi phí billing.

    #### Nâng cao (Flink / Tableflow)

    Thêm một hoặc nhiều nhóm phạm vi tùy chọn khi cần:

    * **Flink:** `FLINK_REST_ENDPOINT`, `FLINK_API_KEY`, `FLINK_API_SECRET`, `FLINK_COMPUTE_POOL_ID`, `FLINK_ENV_ID`
    * **Tableflow:** `TABLEFLOW_API_KEY`, `TABLEFLOW_API_SECRET`

    **Bạn có thể làm gì:** Tất cả những gì trong Tiêu chuẩn, cộng thêm tạo và quản lý Flink SQL statement, khám phá catalog và database Flink, chạy kiểm tra sức khỏe trên streaming query, và quản lý topic hỗ trợ Tableflow với tích hợp catalog (ví dụ: AWS Glue).

    ### Mẫu trường kết nối

    Sử dụng mẫu này và điền giá trị cho các phạm vi bạn đã bật:

    ```json theme={null}
    {
      "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
      "KAFKA_API_KEY": "<kafka-api-key>",
      "KAFKA_API_SECRET": "<kafka-api-secret>",
      "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
      "KAFKA_CLUSTER_ID": "lkc-xxxxx",
      "KAFKA_ENV_ID": "env-xxxxx",

      "SCHEMA_REGISTRY_ENDPOINT": "https://psrc-xxxxx.<region>.<provider>.confluent.cloud",
      "SCHEMA_REGISTRY_API_KEY": "<schema-registry-api-key>",
      "SCHEMA_REGISTRY_API_SECRET": "<schema-registry-api-secret>",

      "FLINK_API_KEY": "<flink-api-key>",
      "FLINK_API_SECRET": "<flink-api-secret>",
      "FLINK_COMPUTE_POOL_ID": "lfcp-xxxxx",
      "FLINK_ENV_ID": "env-xxxxx",
      "FLINK_REST_ENDPOINT": "https://flink.<region>.<provider>.confluent.cloud",
      "FLINK_ORG_ID": "<org-id>",

      "CONFLUENT_CLOUD_API_KEY": "<cloud-api-key>",
      "CONFLUENT_CLOUD_API_SECRET": "<cloud-api-secret>",

      "TABLEFLOW_API_KEY": "<tableflow-api-key>",
      "TABLEFLOW_API_SECRET": "<tableflow-api-secret>"
    }
    ```
  </Tab>

  <Tab title="Self-hosted Kafka">
    <Steps>
      <Step title="Đảm bảo quyền truy cập mạng">
        Đảm bảo ứng dụng CloudThinker có thể kết nối đến Kafka broker của bạn tại `<broker-name>.<your-domain>:9092`.
      </Step>

      <Step title="Thêm kết nối trong CloudThinker">
        Trong CloudThinker, điều hướng đến **Connections → Kafka**.

        Tạo tệp JSON với các trường cho phạm vi bạn đã bật (xem [Mẫu trường kết nối](#connection-field-template-2) bên dưới). Tải tệp JSON này lên trong biểu mẫu kết nối.

        Các trường bắt buộc phụ thuộc vào profile của bạn — xem [Profiles](#profiles-2) bên dưới để biết chi tiết.

        Nhấn **Connect**. CloudThinker xác minh thông tin xác thực và hiển thị trạng thái **Connected**.
      </Step>
    </Steps>

    ### Mô hình thông tin xác thực theo phạm vi

    Self-hosted Kafka sử dụng cấu hình theo phạm vi. Bạn có thể bắt đầu chỉ với các trường Kafka, sau đó thêm trường Schema Registry sau.

    | Phạm vi             | Tính năng mở khóa                                          | Trường thông thường        |
    | ------------------- | ---------------------------------------------------------- | -------------------------- |
    | **Kafka cluster**   | Quản lý topic (liệt kê, tạo, xóa), produce/consume message | `BOOTSTRAP_SERVERS`        |
    | **Schema Registry** | Liệt kê, kiểm tra và xóa data schema                       | `SCHEMA_REGISTRY_ENDPOINT` |

    ### Profiles

    #### Tối thiểu (chỉ Kafka)

    Bắt buộc:

    * `BOOTSTRAP_SERVERS`

    **Bạn có thể làm gì:** Quản lý topic (liệt kê, tạo, xóa), produce và consume message.

    #### Tiêu chuẩn (với Schema Registry)

    Thêm:

    * `SCHEMA_REGISTRY_ENDPOINT` (thường là port 8081)

    **Bạn có thể làm gì:** Tất cả những gì trong Tối thiểu, cộng thêm liệt kê, kiểm tra và xóa data schema.

    ### Mẫu trường kết nối

    Sử dụng mẫu này và điền giá trị cho các phạm vi bạn đã bật:

    ```json theme={null}
    {
      "BOOTSTRAP_SERVERS": "<broker-name>.<your-domain>:9092",
      "SCHEMA_REGISTRY_ENDPOINT": "http://<schema-registry-host>:8081"
    }
    ```
  </Tab>
</Tabs>

***

## Chi tiết kết nối

Các trường kết nối được gửi dưới dạng tệp JSON chứa thông tin xác thực. Các trường khác nhau tùy theo nền tảng và phạm vi đã bật — xem mẫu đầy đủ trong phần [Thiết lập](#setup).

<Note>
  CloudThinker hỗ trợ onboarding theo phạm vi một phần — bạn có thể bắt đầu chỉ với các trường Kafka và thêm thông tin xác thực Schema Registry, Flink, Cloud API, hoặc Tableflow sau.
</Note>

| Trường                                                   | Nền tảng        | Mô tả                                          |
| -------------------------------------------------------- | --------------- | ---------------------------------------------- |
| `BOOTSTRAP_SERVERS`                                      | Cả hai          | Địa chỉ bootstrap của Kafka cluster (bắt buộc) |
| `KAFKA_API_KEY` / `KAFKA_API_SECRET`                     | Confluent Cloud | Thông tin xác thực phạm vi Kafka               |
| `SCHEMA_REGISTRY_ENDPOINT`                               | Cả hai          | URL Schema Registry                            |
| `SCHEMA_REGISTRY_API_KEY` / `SCHEMA_REGISTRY_API_SECRET` | Confluent Cloud | Thông tin xác thực Schema Registry             |
| `FLINK_REST_ENDPOINT`                                    | Confluent Cloud | Endpoint vùng Flink                            |
| `CONFLUENT_CLOUD_API_KEY` / `CONFLUENT_CLOUD_API_SECRET` | Confluent Cloud | Thông tin xác thực quản lý Cloud               |
| `TABLEFLOW_API_KEY` / `TABLEFLOW_API_SECRET`             | Confluent Cloud | Thông tin xác thực Tableflow                   |

***

## Quyền yêu cầu

<Tip>
  Với Confluent Cloud, sử dụng **Service Account** và chỉ cấp cho mỗi API key phạm vi nó cần. Bắt đầu với thông tin xác thực chỉ Kafka và thêm các phạm vi bổ sung dần dần.
</Tip>

**Confluent Cloud:** Tạo cặp API key và secret riêng cho từng phạm vi. Hạn chế Kafka ACL chỉ cho các topic mà CloudThinker cần. Thông tin xác thực Cloud Management yêu cầu tối thiểu role MetricsViewer.

**Self-hosted Kafka:** Không cần API key. Đảm bảo địa chỉ bootstrap của broker có thể truy cập từ CloudThinker trên port 9092.

***

## Khả năng của agent

Sau khi kết nối, [Alex](/vi/guide/agents/alex) và [Tony](/vi/guide/agents/tony) có thể:

| Khả năng                     | Mô tả                                                                            |
| ---------------------------- | -------------------------------------------------------------------------------- |
| **Giám sát consumer lag**    | Theo dõi lag theo consumer group, xác định consumer chậm                         |
| **Phân tích sức khỏe topic** | Kiểm tra phân phối partition, replication factor, các partition under-replicated |
| **Metric throughput**        | Giám sát bytes vào/ra, tốc độ message theo topic                                 |
| **Sức khỏe broker**          | Theo dõi tính khả dụng của broker, trạng thái ISR (In-Sync Replicas)             |

### Xác minh kết nối

```text theme={null}
@alex list all Kafka topics and check consumer group lag for the active consumer groups
```

### Ví dụ prompt

```text theme={null}
@alex check consumer lag for the orders-service group
@tony analyze message throughput trends for the events topic
@alex identify under-replicated partitions and #report the affected topics
```

***

## Xử lý sự cố

<Accordion title="Kết nối bị từ chối hoặc timeout">
  * Xác minh tiến trình Kafka broker đang chạy trên `<broker-name>.<your-domain>`.
  * Kiểm tra rằng port của broker (mặc định 9092) đang mở và không bị chặn bởi firewall.
  * Xác minh địa chỉ bootstrap server `<broker-name>.<your-domain>:9092` chính xác và có thể truy cập từ CloudThinker.
  * Đối với development nội bộ, đảm bảo Kafka được bind vào IP có thể truy cập (không chỉ `127.0.0.1`).
</Accordion>

<Accordion title="Trường phạm vi một phần trên Confluent Cloud">
  Khi sử dụng onboarding theo phạm vi một phần, **hãy xóa toàn bộ cặp key-value** cho các phạm vi không dùng. Không để chuỗi rỗng.

  **Đúng** (chỉ Kafka, Schema Registry đã xóa hoàn toàn):

  ```json theme={null}
  {
    "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
    "KAFKA_API_KEY": "<kafka-api-key>",
    "KAFKA_API_SECRET": "<kafka-api-secret>",
    "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
    "KAFKA_CLUSTER_ID": "lkc-xxxxx",
    "KAFKA_ENV_ID": "env-xxxxx"
  }
  ```

  **Sai** (giá trị chuỗi rỗng gây lỗi validation):

  ```json theme={null}
  {
    "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
    "KAFKA_API_KEY": "<kafka-api-key>",
    "KAFKA_API_SECRET": "<kafka-api-secret>",
    "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
    "KAFKA_CLUSTER_ID": "lkc-xxxxx",
    "KAFKA_ENV_ID": "env-xxxxx",
    "SCHEMA_REGISTRY_ENDPOINT": "",
    "SCHEMA_REGISTRY_API_KEY": ""
  }
  ```
</Accordion>

<Accordion title="Kết nối Schema Registry thất bại">
  Xác minh URL `SCHEMA_REGISTRY_ENDPOINT` chính xác và có thể truy cập từ CloudThinker. Với self-hosted, đảm bảo port 8081 đang mở. Với Confluent Cloud, xác nhận API key của Schema Registry có đúng quyền cho environment của bạn.
</Accordion>

***

## Bảo mật

* **Quyền tối thiểu** — chỉ cấp các quyền mà agent cần cho trường hợp sử dụng của bạn; bắt đầu với quyền chỉ đọc và mở rộng sau.
* **Chỉ đọc theo mặc định** — sử dụng thông tin xác thực chỉ đọc trừ khi bạn muốn agent thực hiện thay đổi qua kết nối này.
* **Xoay vòng thông tin xác thực** — xoay vòng khóa và token theo lịch trình thông thường của bạn; CloudThinker sẽ lấy giá trị mới khi bạn cập nhật kết nối.
* **Thu hồi khi bàn giao** — xóa thông tin xác thực tại nhà cung cấp khi bạn xóa một kết nối hoặc khi đồng nghiệp rời nhóm.

- **API key giới hạn phạm vi** — chỉ cấp các phạm vi mà CloudThinker cần; bắt đầu chỉ với Kafka và thêm Schema Registry, Flink, hoặc Cloud Management dần dần
- **Hạn chế mạng** — giới hạn bootstrap và REST endpoint chỉ cho IP egress của CloudThinker qua security group hoặc quy tắc firewall

***

## Liên quan

<CardGroup cols={2}>
  <Card title="Agent Alex" icon="server" href="/vi/guide/agents/alex">
    Agent tối ưu hạ tầng cloud và streaming
  </Card>

  <Card title="Kết nối AWS" icon="https://mintcdn.com/cloudthinker/aLd-ttc-SCW-aFky/images/icons/aws.svg?fit=max&auto=format&n=aLd-ttc-SCW-aFky&q=85&s=45d526a3e9345214c0345f277da2e829" href="/vi/guide/connections/aws" width="24" height="24" data-path="images/icons/aws.svg">
    Hướng dẫn thiết lập tài nguyên AWS cloud
  </Card>
</CardGroup>
