使用Flink将StarRocks数据导出至iceberg

发布时间:2026/5/15 19:13:18

使用Flink将StarRocks数据导出至iceberg 使用Flink批任务导出CREATE TABLE IF NOT EXISTS source_metric ( logtime STRING NOT NULL, log_id STRING NOT NULL, user STRING NOT NULL, department_id STRING NOT NULL, mcp_name STRING NULL, tool_name STRING NULL, tool_id STRING NULL, toolkit_id STRING NULL, request_time STRING NULL, request STRING NULL, response STRING NULL, cost_time BIGINT NULL, status BIGINT NULL, info STRING NULL, env STRING NULL, app_name STRING NULL, label BIGINT NULL, type BIGINT NULL, trace_id STRING NULL, pt date NULL, version STRING NULL, PRIMARY KEY (pt) NOT ENFORCED ) WITH ( connector starrocks, jdbc-url jdbc:mysql:loadbalance://fe-query-lvs-inner.cn:9030, scan-urlfe-lvs-inner.cn:8030, database-name starrocks_test, table-name source_metric, username chat, password 12345 ); -- 创建iceberg catalog CREATE CATALOG iceberg_hive_catalog WITH ( type iceberg, default-database default, catalog-type hive, uri thrift://PRO:9083, clients 5, property-version 1, hive-conf-dir /app/hive/conf ); USE CATALOG iceberg_hive_catalog; insert into iceberg_hive_catalog.app_dev.test_aibrain_ai_mcp_metric select logtime, log_id, user, department_id, mcp_name, tool_name, tool_id, toolkit_id, request_time, request, response, cost_time, status, info, env, app_name, label, type, trace_id, version, cast(pt as string) from default_catalog.default_database.source_metric where pt TO_DATE(20251110, yyyyMMdd);

相关新闻