Paimon学习笔记(一)
目录
使用
读取:支持以下三种方式消费数据:
- 从历史快照(批处理)
- 从最新偏移量(流处理)
- 混合模式
写入:支持CDC获取变更的流式写入和离线数据的批量写入
核心特性
- 统一批处理和流处理
- 数据湖能力
- 合并引擎
- 支持变更日志的生成,ods表的变更会不会传递到dwd层去,再往后传递下去
- 丰富表类型,支持只追加的表,支持有序的流读取替代kafka
- 假如我们在mysql中对数据表结构变更,我们会不会在paimon中也将这个业务表结构直接进行修改和变更
基本概念
- 快照:访问表的最新状态和之前的状态。
- 分区
- 分桶:如果一共有10个桶,就可以并行度为10
- 一致性保证:两阶段提交
文件布局
paimon的文件用分层方式组织,下面是文件布局的说明
文件布局说明
snapshot Files
所有的快照文件都存储在快照目录中
快照文件是一个json文件,里面是快照的信息:
- 正在使用的schema文件
- 包含这个快照的所有更改的清单列表
Manifest Files
包括清单列表和清单文件
清单列表是清单文件名的列表
清单文件是关于LSM数据文件和更改日志文件的文件信息,比如对应的快照中创建了哪个LSM数据文件,删除了哪个文件
Data Files
数据文件按照分区和分桶来创建的目录,每个分桶的目录下面包括一个LSM树和变更日志文件
LSM Trees
采用LSM树(日志结构合并树)来作为文件存储的数据结构
Sorted Runs
LSM树是把文件分成多个的Sorted Runs,每一个Sortsed Runs都是由若干个数据文件组成的
数据文件中的记录是按照主键排序的,不同的Sorted Run可能会有重叠的主键范围,而且值不一样,因为插入内容的时间不一样,查询的时候必须合并所有的Sorted Runs,不同的Sorted Run之间如果主键相同但是值不同的话,要根据时间戳取到主键最新的值
这里就是存储的文件格式
写入LSM的数据会先缓存到内存中,当内存缓冲区满了,再将数据刷写到磁盘中
Compaction
合并
当非常多的记录写入到LSM树中。sorted Run的数量会增加,LSM树会将所有的Sorted Run合并,合并一个大的Sorted Run,但是这个合并也会消耗cpu和io,所以不能频繁的进行合并,会导致写入变慢
默认情况,Paimon会将记录追加到LSM树的时候会根据需要执行合并,用户可以选择在”专用Compaction”作业中独立执行所有的合并
集成Flink
CataLog
Paimon的Catalog可以持久化数据,支持文本系统和hive
文件系统(默认):将元数据和表文件存储在文件系统中。
1
2
3
4
5
6
7CREATE CATALOG fs_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://hadoop102:8020/paimon/fs'
);
USE CATALOG fs_catalog;hive:在 hive metastore中存储元数据。用户可以直接从 Hive 访问表。
1
2
3
4
5
6
7
8
9
10CREATE CATALOG hive_catalog WITH (
'type' = 'paimon',
'metastore' = 'hive',
'uri' = 'thrift://hadoop102:9083',
'hive-conf-dir' = '/opt/module/hive/conf',//这个可写可不写
'warehouse' = 'hdfs://hadoop102:8020/paimon/hive'
);
USE CATALOG hive_catalog;
sql初始化
vim conf/sql-client-init.sql
1 | CREATE CATALOG fs_catalog WITH ( |
启动的时候指定这个sql的初始化文件
bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql
这样每次启动的时候都是已经创建好catalog并且指定了我们创建的catalog了
DDL
管理表
这个管理表的概念和hive中的内部表类似,hive分为内部表和外部表,如果是外部表的话,那么删除这个表的话,表的元数据其实并没有被删除,内部表的话的删除就是既要删除表的数据也要删除这个表的元数据了
创建表
1
2
3
4
5
6
7
8CREATE TABLE test (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);创建分区表
1
2
3
4
5
6
7
8CREATE TABLE test_p (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);这里创建的分区表必须是主键的子集,否则会创建失败
设置分区字段有三种情况可以考虑:
- 创建时间:这条数据的创建时间create_time
- 事件事件:对于cdc的数据同步过来的时候会有一个更新的事件,可能是update_time,可以根据这个更新时间来进行分区
- CDC的其他字段,一般不使用这种字段作为分区的字段
CTAS
1
CREATE TABLE table_b AS SELECT id, name FORM table_a,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23CREATE TABLE test1(
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE test2 AS SELECT * FROM test1;
-- 指定分区
CREATE TABLE test2_p WITH ('partition' = 'dt') AS SELECT * FROM test_p;
-- 指定配置
CREATE TABLE test3(
user_id BIGINT,
item_id BIGINT
) WITH ('file.format' = 'orc');
CREATE TABLE test3_op WITH ('file.format' = 'parquet') AS SELECT * FROM test3;
-- 指定主键
CREATE TABLE test_pk WITH ('primary-key' = 'dt,hh') AS SELECT * FROM test;
-- 指定主键和分区
CREATE TABLE test_all WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM test_p;表属性
1
2
3
4
5
6
7
8
9
10
11
12CREATE TABLE tbl(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
WITH (
'bucket' = '2',
'bucket-key' = 'user_id'
);可以指定表属性来提高paimon的性能
外部表
删除外部表,表文件不会被删除
1 | CREATE TABLE ex ( |
临时表
临时表是当flink sql会话关闭的时候临时表就会被删除
1 | USE CATALOG hive_catalog; |
修改表
这里暂时省略,比较简单,现查现用
DML
插入数据
insert into
flink不支持使用null,需要将插入的null转为其他的实际的数据类型的值,比如CAST (NULL AS STRING)
这个写法解决null的情况
不能将另一个表的可为空列插入到一个表的非空列中,可以使用coalesce函数来处理
1 | INSERT INTO test VALUES(1,1,'order','2023-07-01','1'), (2,2,'pay','2023-07-01','2'); |
还有一种可以使用流/批进行处理的写法
1 | INSERT INTO test_p SELECT * from test; |
这种写法区分流/批,如果是流的话,会一直运行,有数据就插入,如果是批的话,就会插入一次后结束。流模式下flink的任务一直是running
覆盖数据
insert overwrite
insert overwrite只支持batch,批模式
需要先进行下面的配置
1 | RESET 'execution.checkpointing.interval'; |
先把模式改为批模式
覆盖未分区的表
1 | INSERT OVERWRITE test VALUES(3,3,'pay','2023-07-01','2'); |
覆盖分区表
paimon默认的分区是动态的分区覆盖
1 | INSERT OVERWRITE test_p SELECT * from test; |
这种方式会动态的分区覆盖这个分区表
也可以覆盖指定的分区
1 | INSERT OVERWRITE test_p PARTITION (dt = '2023-07-01', hh = '2') SELECT user_id,item_id,behavior from test; |
更新数据
update的使用需要三种情况都满足才行:
- 主键表
- flink是1.17之后的版本
- 批处理模式
1 | UPDATE test SET item_id = 4, behavior = 'pv' WHERE user_id = 3; |
删除数据
需要满足flink的版本为1.17之后的版本并且表为主键表
1 | DELETE FROM test WHERE user_id = 3; |
Merge Into
比纯粹的update更加强大的功能:更新+插入的功能
只有主键表才支持这种功能,这个操作不会产生更新之前的旧纪录
merge into的操作使用的是upsert,该行如果存在就执行更新,如果没有存在就执行插入
这个功能和join什么的不同的是这个功能是匹配到的内容不进行查询直接进行操作,比如更新,删除,插入等操作
语法
这里的使用其实主要是把这个功能封装在了一个jar包中,调用这个jar来执行这个merge into的操作
1 | <FLINK_HOME>/bin/flink run \ |
/bin/flink run 运行 Flink 作业的命令。
/path/to/paimon-flink-action-0.5-SNAPSHOT.jar
Paimon Flink Action 的 JAR 包路径。
merge-into
指定要执行的操作是 merge-into。
–warehouse
数据仓库的路径。用于指定存储数据的位置。
–database
数据库名称。指定目标表所在的数据库。
–table
目标表的名称。进行合并操作的表。
[–target-as
] 目标表的别名。可以为目标表指定一个别名。
–source-table
源表的名称。合并操作的数据来源表。
[–source-sql
…] 源 SQL 查询。可以指定多个 SQL 查询来过滤或处理源表的数据。
–on
合并条件。指定目标表和源表之间的匹配条件。
–merge-actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>
合并操作的类型。包括匹配时的更新和删除、不匹配时的插入、源表不匹配时的更新和删除。
match的说明
match的说明也就是中间的,matched-xxx-condition
这个的内容说明
- matched:更改的行来自目标表,也就是
--table <target-table>
这个里面的内容就是目标表,如果数据内容需要来自这张表就是要matched。也就是a去与b关联,去把关联上的a数据中满足条件的内容进行一些操作 - not-matched:更改的行来自源表,也就是
--source-table <source-table-name>
,这个里面的内容就是源表。也就是a去与b关联,a没有关联上的b内容去做一些操作 - not-matched-by-source:更改的行来自目标表,也就是
--table <target-table>
,这里的内容就是目标表,也就是a去与b关联,a没有关联上的b内容去对a做一些操作
案例
准备:
1 | use catalog hive_catalog; |
案例一:matched的使用
ws_t与ws1匹配id,将ws_t中ts>2的vc改为10,ts<=2的删除
1
2
3
4
5
6
7
8
9
10
11
12bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
merge-into \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table ws_t \
--source-table test.ws1 \
--on "ws_t.id = ws1.id" \
--merge-actions matched-upsert,matched-delete \
--matched-upsert-condition "ws_t.ts > 2" \
--matched-upsert-set "vc = 10" \
--matched-delete-condition "ws_t.ts <= 2"案例二:not-matched的使用
ws_t与ws1匹配id,匹配上的将ws_t中vc加10,ws1中没匹配上的插入ws_t中
1
2
3
4
5
6
7
8
9
10
11bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
merge-into \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table ws_t \
--source-table test.ws1 \
--on "ws_t.id = ws1.id" \
--merge-actions matched-upsert,not-matched-insert \
--matched-upsert-set "vc = ws_t.vc + 10" \
--not-matched-insert-values "*"案例三:not-matched-by-source的使用
ws_t与ws1匹配id,ws_t中没匹配上的,ts大于4则vc加20,ts=4则删除
1
2
3
4
5
6
7
8
9
10
11
12bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
merge-into \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table ws_t \
--source-table test.ws1 \
--on "ws_t.id = ws1.id" \
--merge-actions not-matched-by-source-upsert,not-matched-by-source-delete \
--not-matched-by-source-upsert-condition "ws_t.ts > 4" \
--not-matched-by-source-upsert-set "vc = ws_t.vc + 20" \
--not-matched-by-source-delete-condition " ws_t.ts = 4"案例四:source-sql的使用
使用–source-sql创建新catalog下的源表,匹配ws_t的id,没匹配上的插入ws_t
1
2
3
4
5
6
7
8
9
10
11
12
13bin/flink run \
/opt/module/flink-1.17.0/opt/paimon-flink-action-0.5-20230703.002437-53.jar \
merge-into \
--warehouse hdfs://hadoop102:8020/paimon/hive \
--database test \
--table ws_t \
--source-sql "CREATE CATALOG fs2 WITH ('type' = 'paimon','warehouse' = 'hdfs://hadoop102:8020/paimon/fs2')" \
--source-sql "CREATE DATABASE IF NOT EXISTS fs2.test" \
--source-sql "CREATE TEMPORARY VIEW fs2.test.ws2 AS SELECT id+10 as id,ts,vc FROM test.ws1" \
--source-table fs2.test.ws2 \
--on "ws_t.id = ws2. id" \
--merge-actions not-matched-insert\
--not-matched-insert-values "*"
DQL查询表
批量查询
时间旅行
我们可以读取一个表的快照信息
我们之前每做一次的操作就会保存一个快照,我们可以查看快照,快照中保存的不是之前的数据,而是变更的数据信息,没有变更数据信息在快照中是没有的
查看表的快照
1
SELECT * FROM ws_t&snapshots;
查看这个表的快照后,会发现有一个snapshot_id的字段和一个commit_timestamep的字段,这个commit_timestamp就是快照的时间
读取指定id的快照
1
2
3SELECT * FROM ws_t /*+ OPTIONS('scan.snapshot-id' = '1') */;
SELECT * FROM ws_t /*+ OPTIONS('scan.snapshot-id' = '2') */;读取指定时间戳的快照
这里我们先把上面查看到的着commit_timestamp的格式转为时间戳然后
1
SELECT * FROM ws_t /*+ OPTIONS('scan.timestamp-millis' = '1688369660841') */;
读取指定标签,这里我们没有给快照打上标签,所以,这里的内容下次再写
增量查询
读取快照的时候我们可以查询两个快照之间的修改,比如我们要查询一下快照3和快照5之间的更改就
1 | SELECT * FROM ws_t /*+ OPTIONS('incremental-between' = '3,5') */; |