> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cloudthinker.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

> 스트림 모니터링 및 관리를 위해 Confluent Cloud의 Apache Kafka를 범위 기반 자격 증명으로 CloudThinker에 연결하세요

Apache Kafka 클러스터를 연결하여 [Alex](/ko/guide/agents/alex)(클라우드 엔지니어)와 [Tony](/ko/guide/agents/tony)(데이터베이스 엔지니어)가 토픽 상태를 모니터링하고, 컨슈머 지연을 분석하며, 스트리밍 성능을 최적화할 수 있도록 합니다.

Kafka 연결은 범위별로 별도의 API 키 쌍을 갖는 JSON 자격 증명 파일(Confluent Cloud) 또는 부트스트랩 주소(자체 호스팅)로 제출됩니다.

***

## 지원 플랫폼

| 플랫폼                 | 지원                   |
| ------------------- | -------------------- |
| **Confluent Cloud** | 모든 티어                |
| **자체 호스팅 Kafka**    | 2.8+ (KRaft 모드), 3.x |

***

## 사전 요구 사항

* 하나 이상의 Kafka 환경과 클러스터가 있는 **Confluent Cloud** 계정, 또는 CloudThinker에서 접근 가능한 자체 호스팅 **Kafka 2.8+**(KRaft 모드) 또는 **3.x** 클러스터.
* Confluent Cloud의 경우: **confluent.cloud/settings/api-keys**에서 API 키를 생성할 관리자 접근 권한.
* CloudThinker에서 Kafka 클러스터의 부트스트랩 서버 및 REST 엔드포인트로의 네트워크 접근.

***

## 설정

플랫폼에 맞는 연결 방법을 선택하세요:

<Tabs>
  <Tab title="Confluent Cloud">
    <Steps>
      <Step title="Confluent Cloud를 열고 환경 선택">
        [confluent.cloud/home](https://confluent.cloud/home)으로 이동한 후 **Environments**를 엽니다.

        연결할 환경을 클릭합니다.

        환경 ID는 선택 후 URL에 표시됩니다(예: `env-xxxxx`).

        탐색 예시:

        * 환경 목록: `https://confluent.cloud/environments`
        * 선택된 환경 URL 패턴: `https://confluent.cloud/environments/<env-id>/overview`
      </Step>

      <Step title="Kafka 클러스터 필드 가져오기">
        선택된 환경에서 **Clusters**를 열고 대상 클러스터(예: `<cluster-name>`)를 클릭합니다.

        다음 값을 수집합니다:

        * `BOOTSTRAP_SERVERS`
        * `KAFKA_REST_ENDPOINT`
        * `KAFKA_CLUSTER_ID`

        `KAFKA_ENV_ID`는 1단계에서 선택한 환경 ID로 유지합니다.
      </Step>

      <Step title="범위별 API 키 및 시크릿 생성">
        [confluent.cloud/settings/api-keys](https://confluent.cloud/settings/api-keys)로 이동하여 **+ Add API Key**를 클릭합니다.

        프로덕션 워크로드에는 **Service Account**, 개발/테스트에는 **My Account**를 선택합니다.

        Confluent 온보딩에서 원하는 범위를 선택한 후, 생성된 API 키와 API 시크릿 쌍을 저장합니다.

        생성할 수 있는 키 범위:

        * Kafka 클러스터
        * Schema Registry
        * ksqlDB 클러스터
        * Flink 리전
        * 클라우드 리소스 관리
        * Tableflow
      </Step>

      <Step title="Schema Registry 엔드포인트 가져오기 (선택)">
        선택된 환경에서 **Stream Governance -> Schema Registry**를 엽니다.

        다음 값을 수집합니다:

        * `SCHEMA_REGISTRY_ENDPOINT`

        URL 패턴 예시:
        `https://confluent.cloud/environments/<env-id>/stream-governance/schema-registry/overview`
      </Step>

      <Step title="Flink 필드 가져오기 (선택)">
        선택된 환경에서 **Flink**를 엽니다.

        **Compute pools**를 열고 필요한 경우 **+ Add compute pool**로 풀을 생성합니다.

        대상 컴퓨팅 풀을 클릭하여 다음을 수집합니다:

        * `FLINK_COMPUTE_POOL_ID`
        * `FLINK_ENV_ID` (URL의 환경 ID와 동일)

        URL 패턴 예시:
        `https://confluent.cloud/environments/<env-id>/flink/pools/<compute-pool-id>/overview`

        클라우드 공급자 및 리전(AWS, Azure, 또는 GCP; 예: `<region-code>`)에 따라 `FLINK_REST_ENDPOINT`를 설정합니다.
      </Step>

      <Step title="조직 ID 가져오기 (선택)">
        [confluent.cloud/settings/organizations/edit](https://confluent.cloud/settings/organizations/edit)로 이동하여 다음을 수집합니다:

        * `FLINK_ORG_ID`
      </Step>

      <Step title="CloudThinker에 연결 추가">
        CloudThinker에서 **Connections → Kafka**로 이동합니다.

        활성화한 범위에 해당하는 필드로 JSON 파일을 작성합니다(아래 [연결 필드 템플릿](#연결-필드-템플릿) 참고). 연결 양식에서 이 JSON 파일을 업로드합니다.

        필수 필드는 프로필에 따라 다릅니다 — 자세한 내용은 [프로필](#프로필)을 참고하세요.

        **Connect**를 클릭합니다. CloudThinker가 자격 증명을 확인하고 **Connected** 상태를 표시합니다.
      </Step>
    </Steps>

    ### 범위 기반 자격 증명 모델

    Confluent Cloud는 범위 기반 API 자격 증명을 사용합니다. 각 API 키와 시크릿 쌍은 특정 리소스 범위에 대한 접근 권한을 부여합니다.

    Kafka 전용 필드로 시작한 후 Schema Registry, Flink, Cloud API, 또는 Tableflow 필드를 나중에 추가할 수 있습니다.

    | 범위                  | 활성화 기능                                               | 주요 필드                                                                                                               |
    | ------------------- | ---------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------- |
    | **Kafka 클러스터**      | 토픽 관리(목록, 생성, 삭제, 구성), 메시지 생산/소비, 클러스터 메타데이터 조회      | `BOOTSTRAP_SERVERS`, `KAFKA_API_KEY`, `KAFKA_API_SECRET`, `KAFKA_CLUSTER_ID`, `KAFKA_ENV_ID`, `KAFKA_REST_ENDPOINT` |
    | **Schema Registry** | 데이터 스키마 목록 조회, 검사, 삭제                                | `SCHEMA_REGISTRY_ENDPOINT`, `SCHEMA_REGISTRY_API_KEY`, `SCHEMA_REGISTRY_API_SECRET`                                 |
    | **Flink 리전**        | Flink SQL 구문 생성 및 관리, 카탈로그/데이터베이스/테이블 탐색, 상태 확인 및 진단 | `FLINK_REST_ENDPOINT`, `FLINK_API_KEY`, `FLINK_API_SECRET`, `FLINK_COMPUTE_POOL_ID`, `FLINK_ENV_ID`                 |
    | **클라우드 리소스 관리**     | 환경 및 클러스터 검색, 운영 메트릭 및 청구 비용 조회                      | `CONFLUENT_CLOUD_API_KEY`, `CONFLUENT_CLOUD_API_SECRET`                                                             |
    | **Tableflow**       | Tableflow 활성화 토픽 및 카탈로그 통합(예: AWS Glue) 관리           | `TABLEFLOW_API_KEY`, `TABLEFLOW_API_SECRET`                                                                         |
    | **조직 메타데이터**        | Flink 리소스 관리를 위한 조직 수준 컨텍스트                          | `FLINK_ORG_ID`                                                                                                      |

    ### 프로필

    #### 최소 구성 (Kafka 전용)

    필수:

    * `BOOTSTRAP_SERVERS`
    * `KAFKA_API_KEY`
    * `KAFKA_API_SECRET`
    * `KAFKA_CLUSTER_ID`
    * `KAFKA_ENV_ID`

    **가능한 작업:** 토픽 관리(목록, 생성, 삭제, 구성), 메시지 생산 및 소비, 클러스터 메타데이터 및 토픽 구성 조회.

    #### 표준 구성 (Kafka + Schema Registry + 클라우드 관리)

    추가:

    * `SCHEMA_REGISTRY_ENDPOINT`
    * `SCHEMA_REGISTRY_API_KEY`
    * `SCHEMA_REGISTRY_API_SECRET`
    * `CONFLUENT_CLOUD_API_KEY`
    * `CONFLUENT_CLOUD_API_SECRET`

    **가능한 작업:** 최소 구성의 모든 기능에 더해 데이터 스키마 목록 조회 및 검사, 환경 및 클러스터 검색, 운영 메트릭 조회, 청구 비용 확인.

    #### 고급 구성 (Flink / Tableflow)

    필요에 따라 선택적 범위 그룹을 하나 이상 추가합니다:

    * **Flink:** `FLINK_REST_ENDPOINT`, `FLINK_API_KEY`, `FLINK_API_SECRET`, `FLINK_COMPUTE_POOL_ID`, `FLINK_ENV_ID`
    * **Tableflow:** `TABLEFLOW_API_KEY`, `TABLEFLOW_API_SECRET`

    **가능한 작업:** 표준 구성의 모든 기능에 더해 Flink SQL 구문 생성 및 관리, Flink 카탈로그 및 데이터베이스 탐색, 스트리밍 쿼리 상태 확인, 카탈로그 통합(예: AWS Glue)과 함께 Tableflow 활성화 토픽 관리.

    ### 연결 필드 템플릿

    이 템플릿을 사용하여 활성화한 범위에 맞는 값을 채우세요:

    ```json theme={null}
    {
      "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
      "KAFKA_API_KEY": "<kafka-api-key>",
      "KAFKA_API_SECRET": "<kafka-api-secret>",
      "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
      "KAFKA_CLUSTER_ID": "lkc-xxxxx",
      "KAFKA_ENV_ID": "env-xxxxx",

      "SCHEMA_REGISTRY_ENDPOINT": "https://psrc-xxxxx.<region>.<provider>.confluent.cloud",
      "SCHEMA_REGISTRY_API_KEY": "<schema-registry-api-key>",
      "SCHEMA_REGISTRY_API_SECRET": "<schema-registry-api-secret>",

      "FLINK_API_KEY": "<flink-api-key>",
      "FLINK_API_SECRET": "<flink-api-secret>",
      "FLINK_COMPUTE_POOL_ID": "lfcp-xxxxx",
      "FLINK_ENV_ID": "env-xxxxx",
      "FLINK_REST_ENDPOINT": "https://flink.<region>.<provider>.confluent.cloud",
      "FLINK_ORG_ID": "<org-id>",

      "CONFLUENT_CLOUD_API_KEY": "<cloud-api-key>",
      "CONFLUENT_CLOUD_API_SECRET": "<cloud-api-secret>",

      "TABLEFLOW_API_KEY": "<tableflow-api-key>",
      "TABLEFLOW_API_SECRET": "<tableflow-api-secret>"
    }
    ```
  </Tab>

  <Tab title="자체 호스팅 Kafka">
    <Steps>
      <Step title="네트워크 접근 확인">
        CloudThinker 애플리케이션이 `<broker-name>.<your-domain>:9092`의 Kafka 브로커에 도달할 수 있는지 확인합니다.
      </Step>

      <Step title="CloudThinker에 연결 추가">
        CloudThinker에서 **Connections → Kafka**로 이동합니다.

        활성화한 범위에 해당하는 필드로 JSON 파일을 작성합니다(아래 [연결 필드 템플릿](#연결-필드-템플릿-2) 참고). 연결 양식에서 이 JSON 파일을 업로드합니다.

        필수 필드는 프로필에 따라 다릅니다 — 자세한 내용은 아래 [프로필](#프로필-2)을 참고하세요.

        **Connect**를 클릭합니다. CloudThinker가 자격 증명을 확인하고 **Connected** 상태를 표시합니다.
      </Step>
    </Steps>

    ### 범위 기반 자격 증명 모델

    자체 호스팅 Kafka는 범위 기반 구성을 사용합니다. Kafka 전용 필드로 시작한 후 나중에 Schema Registry 필드를 추가할 수 있습니다.

    | 범위                  | 활성화 기능                       | 주요 필드                      |
    | ------------------- | ---------------------------- | -------------------------- |
    | **Kafka 클러스터**      | 토픽 관리(목록, 생성, 삭제), 메시지 생산/소비 | `BOOTSTRAP_SERVERS`        |
    | **Schema Registry** | 데이터 스키마 목록 조회, 검사, 삭제        | `SCHEMA_REGISTRY_ENDPOINT` |

    ### 프로필

    #### 최소 구성 (Kafka 전용)

    필수:

    * `BOOTSTRAP_SERVERS`

    **가능한 작업:** 토픽 관리(목록, 생성, 삭제), 메시지 생산 및 소비.

    #### 표준 구성 (Schema Registry 포함)

    추가:

    * `SCHEMA_REGISTRY_ENDPOINT` (일반적으로 포트 8081)

    **가능한 작업:** 최소 구성의 모든 기능에 더해 데이터 스키마 목록 조회, 검사, 삭제.

    ### 연결 필드 템플릿

    이 템플릿을 사용하여 활성화한 범위에 맞는 값을 채우세요:

    ```json theme={null}
    {
      "BOOTSTRAP_SERVERS": "<broker-name>.<your-domain>:9092",
      "SCHEMA_REGISTRY_ENDPOINT": "http://<schema-registry-host>:8081"
    }
    ```
  </Tab>
</Tabs>

***

## 연결 세부 정보

연결 필드는 JSON 자격 증명 파일로 제출됩니다. 필드는 플랫폼과 활성화된 범위에 따라 다릅니다 — [설정](#설정) 섹션의 전체 템플릿을 참고하세요.

<Note>
  CloudThinker는 단계적 범위 온보딩을 지원합니다 — Kafka 전용 필드로 시작하여 나중에 Schema Registry, Flink, Cloud API, 또는 Tableflow 자격 증명을 추가할 수 있습니다.
</Note>

| 필드                                                       | 플랫폼             | 설명                       |
| -------------------------------------------------------- | --------------- | ------------------------ |
| `BOOTSTRAP_SERVERS`                                      | 공통              | Kafka 클러스터 부트스트랩 주소 (필수) |
| `KAFKA_API_KEY` / `KAFKA_API_SECRET`                     | Confluent Cloud | Kafka 범위 자격 증명           |
| `SCHEMA_REGISTRY_ENDPOINT`                               | 공통              | Schema Registry URL      |
| `SCHEMA_REGISTRY_API_KEY` / `SCHEMA_REGISTRY_API_SECRET` | Confluent Cloud | Schema Registry 자격 증명    |
| `FLINK_REST_ENDPOINT`                                    | Confluent Cloud | Flink 리전 엔드포인트           |
| `CONFLUENT_CLOUD_API_KEY` / `CONFLUENT_CLOUD_API_SECRET` | Confluent Cloud | 클라우드 관리 자격 증명            |
| `TABLEFLOW_API_KEY` / `TABLEFLOW_API_SECRET`             | Confluent Cloud | Tableflow 자격 증명          |

***

## 필요 권한

<Tip>
  Confluent Cloud의 경우, **Service Account**를 사용하고 각 API 키에 필요한 범위만 부여하세요. Kafka 전용 자격 증명으로 시작하여 추가 범위를 점진적으로 추가합니다.
</Tip>

**Confluent Cloud:** 범위별로 별도의 API 키와 시크릿 쌍을 생성합니다. CloudThinker가 필요한 특정 토픽으로 Kafka ACL을 제한합니다. 클라우드 관리 자격 증명은 최소한 MetricsViewer 역할이 필요합니다.

**자체 호스팅 Kafka:** API 키가 필요하지 않습니다. 브로커의 부트스트랩 주소가 포트 9092에서 CloudThinker로부터 네트워크 접근이 가능한지 확인합니다.

***

## 에이전트 기능

연결 후 [Alex](/ko/guide/agents/alex)와 [Tony](/ko/guide/agents/tony)는 다음을 수행할 수 있습니다:

| 기능              | 설명                                   |
| --------------- | ------------------------------------ |
| **컨슈머 지연 모니터링** | 컨슈머 그룹별 지연 추적, 느린 컨슈머 식별             |
| **토픽 상태 분석**    | 파티션 분배, 복제 팩터, 복제 미완료 파티션 확인         |
| **처리량 메트릭**     | 토픽별 바이트 인/아웃, 메시지 속도 모니터링            |
| **브로커 상태**      | 브로커 가용성, ISR(In-Sync Replicas) 상태 추적 |

### 연결 확인

```text theme={null}
@alex list all Kafka topics and check consumer group lag for the active consumer groups
```

### 예시 프롬프트

```text theme={null}
@alex check consumer lag for the orders-service group
@tony analyze message throughput trends for the events topic
@alex identify under-replicated partitions and #report the affected topics
```

***

## 문제 해결

<Accordion title="연결 거부 또는 타임아웃">
  * `<broker-name>.<your-domain>`에서 Kafka 브로커 프로세스가 실행 중인지 확인합니다.
  * 브로커 포트(기본값 9092)가 열려 있고 방화벽에 의해 차단되지 않는지 확인합니다.
  * 부트스트랩 서버 주소 `<broker-name>.<your-domain>:9092`가 올바르고 CloudThinker에서 접근 가능한지 확인합니다.
  * 로컬 개발 환경의 경우, Kafka가 접근 가능한 IP에 바인딩되어 있는지 확인합니다(`127.0.0.1`만 바인딩되지 않도록).
</Accordion>

<Accordion title="Confluent Cloud 부분 범위 필드">
  부분 범위 온보딩 사용 시, 사용하지 않는 범위의 **전체 키-값 쌍을 제거**하세요. 빈 문자열을 남기지 마세요.

  **올바른 예** (Kafka 전용, Schema Registry 완전 제거):

  ```json theme={null}
  {
    "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
    "KAFKA_API_KEY": "<kafka-api-key>",
    "KAFKA_API_SECRET": "<kafka-api-secret>",
    "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
    "KAFKA_CLUSTER_ID": "lkc-xxxxx",
    "KAFKA_ENV_ID": "env-xxxxx"
  }
  ```

  **잘못된 예** (빈 문자열 값은 유효성 검사 오류 발생):

  ```json theme={null}
  {
    "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
    "KAFKA_API_KEY": "<kafka-api-key>",
    "KAFKA_API_SECRET": "<kafka-api-secret>",
    "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
    "KAFKA_CLUSTER_ID": "lkc-xxxxx",
    "KAFKA_ENV_ID": "env-xxxxx",
    "SCHEMA_REGISTRY_ENDPOINT": "",
    "SCHEMA_REGISTRY_API_KEY": ""
  }
  ```
</Accordion>

<Accordion title="Schema Registry 연결 실패">
  `SCHEMA_REGISTRY_ENDPOINT` URL이 올바르고 CloudThinker에서 접근 가능한지 확인합니다. 자체 호스팅의 경우 포트 8081이 열려 있는지 확인합니다. Confluent Cloud의 경우 Schema Registry API 키가 해당 환경에 올바른 권한을 갖고 있는지 확인합니다.
</Accordion>

***

## 보안

* **최소 권한** — 에이전트가 사용 사례에 필요한 권한만 부여하세요. 읽기 전용으로 시작한 후 필요에 따라 확장하세요.
* **기본 읽기 전용** — 에이전트가 이 연결을 통해 변경 작업을 수행하게 할 것이 아니라면 읽기 전용 자격증명을 사용하세요.
* **자격증명 교체** — 정기 일정에 따라 키와 토큰을 교체하세요. 연결을 업데이트하면 CloudThinker가 새 값을 자동으로 반영합니다.
* **오프보딩 시 취소** — 연결을 삭제하거나 팀원이 퇴사할 때 프로바이더에서 자격증명을 제거하세요.

- **범위 제한 API 키** — CloudThinker에 필요한 범위만 부여하고, Kafka 전용으로 시작하여 Schema Registry, Flink, 또는 클라우드 관리 범위를 점진적으로 추가합니다.
- **네트워크 제한** — 보안 그룹 또는 방화벽 규칙을 통해 부트스트랩 및 REST 엔드포인트를 CloudThinker의 이그레스 IP로 제한합니다.

***

## 관련 항목

<CardGroup cols={2}>
  <Card title="Alex 에이전트" icon="server" href="/ko/guide/agents/alex">
    클라우드 인프라 및 스트리밍 최적화 에이전트
  </Card>

  <Card title="AWS 연결" icon="https://mintcdn.com/cloudthinker/aLd-ttc-SCW-aFky/images/icons/aws.svg?fit=max&auto=format&n=aLd-ttc-SCW-aFky&q=85&s=45d526a3e9345214c0345f277da2e829" href="/ko/guide/connections/aws" width="24" height="24" data-path="images/icons/aws.svg">
    AWS 클라우드 리소스 설정 가이드
  </Card>
</CardGroup>
