flink-streaming-platform-web/docs/catalog.md

155 lines
3.5 KiB
Markdown
Raw Permalink Normal View History

2025-03-28 17:52:09 +08:00
**cataLog操作说明**
## 依懒jar参考官方文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/hive/
不同hive所需jar不一样
官方catalog连接配置说明
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/hive/#%E8%BF%9E%E6%8E%A5%E5%88%B0hive
`可以把jar放到 lib 下 也可以放到http服务器上 然后在使用的时候添加http服务。`
## demo1
~~~~ sql
CREATE CATALOG testmyhive WITH (
'type' = 'hive',
'default-database' = 'zhp',
'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);
USE CATALOG testmyhive;
CREATE TABLE source_table_01 (
f0 INT,
f1 INT,
f2 STRING
) WITH (
'connector' = 'datagen',
'rows-per-second'='5'
);
CREATE TABLE print_table_01 (
f0 INT,
f1 INT,
f2 STRING
) WITH (
'connector' = 'print'
);
insert into print_table_01 select f0,f1,f2 from source_table_01;
SHOW TABLES;
SHOW FUNCTIONS;
SHOW CATALOGS;
SHOW DATABASES;
~~~~
## demo2
如果已经使用过了可以直接
~~~~ sql
CREATE CATALOG testmyhive WITH (
'type' = 'hive',
'default-database' = 'zhp',
'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);
USE CATALOG testmyhive;
insert into print_table_01 select f0,f1,f2 from source_table_01;
~~~~
## demo3
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/hive/hive_read_write.html
流数据结果sink到hive
**注意写到hive必须要开启checkpoint**
~~~~ sql
CREATE CATALOG testmyhive WITH (
'type' = 'hive',
'default-database' = 'zhp',
'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);
USE CATALOG testmyhive;
drop table IF EXISTS item_test;
drop table IF EXISTS hive_flink_table;
create table item_test (
itemId BIGINT,
price BIGINT,
proctime AS PROCTIME ()
)with (
'connector' = 'kafka',
'topic' = 'flink-catalog-v1',
'properties.bootstrap.servers'='127.0.0.1:9092',
'properties.group.id'='test-1',
'format'='json',
'scan.startup.mode' = 'earliest-offset'
);
SET table.sql-dialect=hive;
CREATE TABLE hive_flink_table (
itemId BIGINT,
price BIGINT,
ups string
) TBLPROPERTIES (
'sink.rolling-policy.rollover-interval'='1min',
'sink.partition-commit.trigger'='process-time',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
insert into hive_flink_table select itemId,price, 'XXXXaaa' as ups from item_test;
~~~~
## kafka 生产者 数据demo 用于测试用
~~~java
public class KafkaSend {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
Map<String, Long> map = new HashMap<>();
for (long i = 0; i <10000 ; i++) {
map.put("itemId", i);
map.put("price", i+1);
producer.send(new ProducerRecord<String, String>("flink-catalog-v1", null, JSON.toJSONString(map)));
producer.flush();
Thread.sleep(1000L);
}
producer.close();
}
}
~~~