Chuyển đến nội dung chính
Kết nối các cluster Apache Kafka của bạn để cho phép Alex (Kỹ sư Cloud) và Tony (Kỹ sư Cơ sở dữ liệu) giám sát độ lành mạnh của topic, phân tích consumer lag và tối ưu hiệu suất streaming. Kết nối Kafka được gửi dưới dạng tệp JSON chứa thông tin xác thực, với cặp API key riêng cho mỗi phạm vi (Confluent Cloud) hoặc địa chỉ bootstrap (self-hosted).

Nền tảng hỗ trợ

Nền tảngHỗ trợ
Confluent CloudTất cả các gói
Self-hosted Kafka2.8+ (KRaft mode), 3.x

Điều kiện tiên quyết

  • Tài khoản Confluent Cloud với ít nhất một Kafka environment và cluster, HOẶC cluster Kafka 2.8+ (KRaft mode) hoặc 3.x tự lưu trữ có thể truy cập từ CloudThinker.
  • Với Confluent Cloud: quyền admin để tạo API key tại confluent.cloud/settings/api-keys.
  • Quyền truy cập mạng từ CloudThinker đến các bootstrap server và REST endpoint của Kafka cluster.

Thiết lập

Chọn nền tảng Kafka của bạn để xem hướng dẫn kết nối cụ thể:
1

Mở Confluent Cloud và chọn environment

Vào confluent.cloud/home, sau đó mở Environments.Nhấn vào environment bạn muốn kết nối.ID của environment xuất hiện trên URL sau khi bạn chọn (ví dụ: env-xxxxx).Ví dụ điều hướng:
  • Danh sách environment: https://confluent.cloud/environments
  • Mẫu URL khi chọn environment: https://confluent.cloud/environments/<env-id>/overview
2

Lấy thông tin Kafka cluster

Trong environment đã chọn, mở Clusters và nhấn vào cluster mục tiêu (ví dụ: <cluster-name>).Thu thập:
  • BOOTSTRAP_SERVERS
  • KAFKA_REST_ENDPOINT
  • KAFKA_CLUSTER_ID
Giữ KAFKA_ENV_ID là ID environment đã chọn từ Bước 1.
3

Tạo API key và secret theo phạm vi

Vào confluent.cloud/settings/api-keys và nhấn + Add API Key.Chọn Service Account cho môi trường production, hoặc My Account cho development/testing.Chọn phạm vi mong muốn trong quy trình onboarding của Confluent, sau đó lưu cặp API key và API secret được tạo.Các phạm vi bạn có thể tạo key:
  • Kafka cluster
  • Schema Registry
  • ksqlDB cluster
  • Flink region
  • Cloud resource management
  • Tableflow
4

Lấy endpoint Schema Registry (tùy chọn)

Trong environment đã chọn, mở Stream Governance -> Schema Registry.Thu thập:
  • SCHEMA_REGISTRY_ENDPOINT
Ví dụ mẫu URL: https://confluent.cloud/environments/<env-id>/stream-governance/schema-registry/overview
5

Lấy thông tin Flink (tùy chọn)

Trong environment đã chọn, mở Flink.Mở Compute pools và tạo pool mới bằng + Add compute pool nếu cần.Nhấn vào compute pool mục tiêu và thu thập:
  • FLINK_COMPUTE_POOL_ID
  • FLINK_ENV_ID (cùng ID environment từ URL)
Ví dụ mẫu URL: https://confluent.cloud/environments/<env-id>/flink/pools/<compute-pool-id>/overviewĐặt FLINK_REST_ENDPOINT theo nhà cung cấp cloud và region của bạn (AWS, Azure, hoặc GCP; ví dụ: <region-code>).
6

Lấy organization ID (tùy chọn)

Vào confluent.cloud/settings/organizations/edit và thu thập:
  • FLINK_ORG_ID
7

Thêm kết nối trong CloudThinker

Trong CloudThinker, điều hướng đến Connections → Kafka.Tạo tệp JSON với các trường cho phạm vi bạn đã bật (xem Mẫu trường kết nối bên dưới). Tải tệp JSON này lên trong biểu mẫu kết nối.Các trường bắt buộc phụ thuộc vào profile của bạn — xem Profiles để biết chi tiết.Nhấn Connect. CloudThinker xác minh thông tin xác thực và hiển thị trạng thái Connected.

Mô hình thông tin xác thực theo phạm vi

Confluent Cloud sử dụng thông tin xác thực API theo phạm vi. Mỗi cặp API key và secret cấp quyền truy cập vào một phạm vi tài nguyên cụ thể.Bạn có thể bắt đầu chỉ với các trường Kafka, sau đó thêm Schema Registry, Flink, Cloud API, hoặc Tableflow sau.
Phạm viTính năng mở khóaTrường thông thường
Kafka clusterQuản lý topic (liệt kê, tạo, xóa, cấu hình), produce/consume message, xem metadata clusterBOOTSTRAP_SERVERS, KAFKA_API_KEY, KAFKA_API_SECRET, KAFKA_CLUSTER_ID, KAFKA_ENV_ID, KAFKA_REST_ENDPOINT
Schema RegistryLiệt kê, kiểm tra và xóa data schemaSCHEMA_REGISTRY_ENDPOINT, SCHEMA_REGISTRY_API_KEY, SCHEMA_REGISTRY_API_SECRET
Flink regionTạo và quản lý Flink SQL statement, khám phá catalog/database/table, kiểm tra sức khỏe và chẩn đoánFLINK_REST_ENDPOINT, FLINK_API_KEY, FLINK_API_SECRET, FLINK_COMPUTE_POOL_ID, FLINK_ENV_ID
Cloud resource managementKhám phá environment và cluster, truy vấn metric vận hành và chi phí billingCONFLUENT_CLOUD_API_KEY, CONFLUENT_CLOUD_API_SECRET
TableflowQuản lý topic hỗ trợ Tableflow và tích hợp catalog (ví dụ: AWS Glue)TABLEFLOW_API_KEY, TABLEFLOW_API_SECRET
Organization metadataNgữ cảnh cấp tổ chức cho quản lý tài nguyên FlinkFLINK_ORG_ID

Profiles

Tối thiểu (chỉ Kafka)

Bắt buộc:
  • BOOTSTRAP_SERVERS
  • KAFKA_API_KEY
  • KAFKA_API_SECRET
  • KAFKA_CLUSTER_ID
  • KAFKA_ENV_ID
Bạn có thể làm gì: Quản lý topic (liệt kê, tạo, xóa, cấu hình), produce và consume message, xem metadata cluster và cấu hình topic.

Tiêu chuẩn (Kafka + Schema Registry + Cloud Management)

Thêm:
  • SCHEMA_REGISTRY_ENDPOINT
  • SCHEMA_REGISTRY_API_KEY
  • SCHEMA_REGISTRY_API_SECRET
  • CONFLUENT_CLOUD_API_KEY
  • CONFLUENT_CLOUD_API_SECRET
Bạn có thể làm gì: Tất cả những gì trong Tối thiểu, cộng thêm liệt kê và kiểm tra data schema, khám phá environment và cluster, truy vấn metric vận hành và xem chi phí billing.Thêm một hoặc nhiều nhóm phạm vi tùy chọn khi cần:
  • Flink: FLINK_REST_ENDPOINT, FLINK_API_KEY, FLINK_API_SECRET, FLINK_COMPUTE_POOL_ID, FLINK_ENV_ID
  • Tableflow: TABLEFLOW_API_KEY, TABLEFLOW_API_SECRET
Bạn có thể làm gì: Tất cả những gì trong Tiêu chuẩn, cộng thêm tạo và quản lý Flink SQL statement, khám phá catalog và database Flink, chạy kiểm tra sức khỏe trên streaming query, và quản lý topic hỗ trợ Tableflow với tích hợp catalog (ví dụ: AWS Glue).

Mẫu trường kết nối

Sử dụng mẫu này và điền giá trị cho các phạm vi bạn đã bật:
{
  "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
  "KAFKA_API_KEY": "<kafka-api-key>",
  "KAFKA_API_SECRET": "<kafka-api-secret>",
  "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
  "KAFKA_CLUSTER_ID": "lkc-xxxxx",
  "KAFKA_ENV_ID": "env-xxxxx",

  "SCHEMA_REGISTRY_ENDPOINT": "https://psrc-xxxxx.<region>.<provider>.confluent.cloud",
  "SCHEMA_REGISTRY_API_KEY": "<schema-registry-api-key>",
  "SCHEMA_REGISTRY_API_SECRET": "<schema-registry-api-secret>",

  "FLINK_API_KEY": "<flink-api-key>",
  "FLINK_API_SECRET": "<flink-api-secret>",
  "FLINK_COMPUTE_POOL_ID": "lfcp-xxxxx",
  "FLINK_ENV_ID": "env-xxxxx",
  "FLINK_REST_ENDPOINT": "https://flink.<region>.<provider>.confluent.cloud",
  "FLINK_ORG_ID": "<org-id>",

  "CONFLUENT_CLOUD_API_KEY": "<cloud-api-key>",
  "CONFLUENT_CLOUD_API_SECRET": "<cloud-api-secret>",

  "TABLEFLOW_API_KEY": "<tableflow-api-key>",
  "TABLEFLOW_API_SECRET": "<tableflow-api-secret>"
}

Chi tiết kết nối

Các trường kết nối được gửi dưới dạng tệp JSON chứa thông tin xác thực. Các trường khác nhau tùy theo nền tảng và phạm vi đã bật — xem mẫu đầy đủ trong phần Thiết lập.
CloudThinker hỗ trợ onboarding theo phạm vi một phần — bạn có thể bắt đầu chỉ với các trường Kafka và thêm thông tin xác thực Schema Registry, Flink, Cloud API, hoặc Tableflow sau.
TrườngNền tảngMô tả
BOOTSTRAP_SERVERSCả haiĐịa chỉ bootstrap của Kafka cluster (bắt buộc)
KAFKA_API_KEY / KAFKA_API_SECRETConfluent CloudThông tin xác thực phạm vi Kafka
SCHEMA_REGISTRY_ENDPOINTCả haiURL Schema Registry
SCHEMA_REGISTRY_API_KEY / SCHEMA_REGISTRY_API_SECRETConfluent CloudThông tin xác thực Schema Registry
FLINK_REST_ENDPOINTConfluent CloudEndpoint vùng Flink
CONFLUENT_CLOUD_API_KEY / CONFLUENT_CLOUD_API_SECRETConfluent CloudThông tin xác thực quản lý Cloud
TABLEFLOW_API_KEY / TABLEFLOW_API_SECRETConfluent CloudThông tin xác thực Tableflow

Quyền yêu cầu

Với Confluent Cloud, sử dụng Service Account và chỉ cấp cho mỗi API key phạm vi nó cần. Bắt đầu với thông tin xác thực chỉ Kafka và thêm các phạm vi bổ sung dần dần.
Confluent Cloud: Tạo cặp API key và secret riêng cho từng phạm vi. Hạn chế Kafka ACL chỉ cho các topic mà CloudThinker cần. Thông tin xác thực Cloud Management yêu cầu tối thiểu role MetricsViewer. Self-hosted Kafka: Không cần API key. Đảm bảo địa chỉ bootstrap của broker có thể truy cập từ CloudThinker trên port 9092.

Khả năng của agent

Sau khi kết nối, AlexTony có thể:
Khả năngMô tả
Giám sát consumer lagTheo dõi lag theo consumer group, xác định consumer chậm
Phân tích sức khỏe topicKiểm tra phân phối partition, replication factor, các partition under-replicated
Metric throughputGiám sát bytes vào/ra, tốc độ message theo topic
Sức khỏe brokerTheo dõi tính khả dụng của broker, trạng thái ISR (In-Sync Replicas)

Xác minh kết nối

@alex list all Kafka topics and check consumer group lag for the active consumer groups

Ví dụ prompt

@alex check consumer lag for the orders-service group
@tony analyze message throughput trends for the events topic
@alex identify under-replicated partitions and #report the affected topics

Xử lý sự cố

  • Xác minh tiến trình Kafka broker đang chạy trên <broker-name>.<your-domain>.
  • Kiểm tra rằng port của broker (mặc định 9092) đang mở và không bị chặn bởi firewall.
  • Xác minh địa chỉ bootstrap server <broker-name>.<your-domain>:9092 chính xác và có thể truy cập từ CloudThinker.
  • Đối với development nội bộ, đảm bảo Kafka được bind vào IP có thể truy cập (không chỉ 127.0.0.1).
Khi sử dụng onboarding theo phạm vi một phần, hãy xóa toàn bộ cặp key-value cho các phạm vi không dùng. Không để chuỗi rỗng.Đúng (chỉ Kafka, Schema Registry đã xóa hoàn toàn):
{
  "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
  "KAFKA_API_KEY": "<kafka-api-key>",
  "KAFKA_API_SECRET": "<kafka-api-secret>",
  "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
  "KAFKA_CLUSTER_ID": "lkc-xxxxx",
  "KAFKA_ENV_ID": "env-xxxxx"
}
Sai (giá trị chuỗi rỗng gây lỗi validation):
{
  "BOOTSTRAP_SERVERS": "pkc-xxxxx.<region>.<provider>.confluent.cloud:9092",
  "KAFKA_API_KEY": "<kafka-api-key>",
  "KAFKA_API_SECRET": "<kafka-api-secret>",
  "KAFKA_REST_ENDPOINT": "https://pkc-xxxxx.<region>.<provider>.confluent.cloud:443",
  "KAFKA_CLUSTER_ID": "lkc-xxxxx",
  "KAFKA_ENV_ID": "env-xxxxx",
  "SCHEMA_REGISTRY_ENDPOINT": "",
  "SCHEMA_REGISTRY_API_KEY": ""
}
Xác minh URL SCHEMA_REGISTRY_ENDPOINT chính xác và có thể truy cập từ CloudThinker. Với self-hosted, đảm bảo port 8081 đang mở. Với Confluent Cloud, xác nhận API key của Schema Registry có đúng quyền cho environment của bạn.

Bảo mật

  • Quyền tối thiểu — chỉ cấp các quyền mà agent cần cho trường hợp sử dụng của bạn; bắt đầu với quyền chỉ đọc và mở rộng sau.
  • Chỉ đọc theo mặc định — sử dụng thông tin xác thực chỉ đọc trừ khi bạn muốn agent thực hiện thay đổi qua kết nối này.
  • Xoay vòng thông tin xác thực — xoay vòng khóa và token theo lịch trình thông thường của bạn; CloudThinker sẽ lấy giá trị mới khi bạn cập nhật kết nối.
  • Thu hồi khi bàn giao — xóa thông tin xác thực tại nhà cung cấp khi bạn xóa một kết nối hoặc khi đồng nghiệp rời nhóm.
  • API key giới hạn phạm vi — chỉ cấp các phạm vi mà CloudThinker cần; bắt đầu chỉ với Kafka và thêm Schema Registry, Flink, hoặc Cloud Management dần dần
  • Hạn chế mạng — giới hạn bootstrap và REST endpoint chỉ cho IP egress của CloudThinker qua security group hoặc quy tắc firewall

Liên quan

Agent Alex

Agent tối ưu hạ tầng cloud và streaming
https://mintcdn.com/cloudthinker/aLd-ttc-SCW-aFky/images/icons/aws.svg?fit=max&auto=format&n=aLd-ttc-SCW-aFky&q=85&s=45d526a3e9345214c0345f277da2e829

Kết nối AWS

Hướng dẫn thiết lập tài nguyên AWS cloud