最近开始着手Citus7.4到Citus 9.3的升级,因此比较全面地浏览了这期间的Citus变动。 从Citus7.4到Citus 9.3不少方面的改进,本文只列出一些比较重要的部分。html
如下用到了一些示例,示例的验证环境以下node
软件git
- PostreSQL 12
- Citus 9.3
集群成员github
- CN
- 127.0.0.1:9000
- Worker
- 127.0.0.1:9001
- 127.0.0.1:9002
SQL支持加强类
1.支持非分区列的count distinct
这个Citus 7.4应该已经支持了,不知道是Citus的Changelog更新延误,仍是Citus 7.5支持得更完善了。算法
表定义sql
create table tb1(id int,c1 int); select create_distributed_table('tb1','id');
非分区列的count distinct的执行计划数据库
postgres=# explain select count(distinct c1) from tb1; QUERY PLAN ------------------------------------------------------------------------------------------ Aggregate (cost=250.00..250.01 rows=1 width=8) -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=4) Task Count: 32 Tasks Shown: One of 32 -> Task Node: host=127.0.0.1 port=9001 dbname=postgres -> HashAggregate (cost=38.25..40.25 rows=200 width=4) Group Key: c1 -> Seq Scan on tb1_102339 tb1 (cost=0.00..32.60 rows=2260 width=4) (9 rows)
2.支持UPSERT
支持UPSERT,即支持INSERT INTO SELECT..ON CONFLICT/RETURNING
json
表定义api
create table tb1(id int, c1 int); select create_distributed_table('tb1','id'); create table tb2(id int primary key, c1 int); select create_distributed_table('tb2','id');
UPSERT SQL执行缓存
postgres=# INSERT INTO tb2 SELECT * from tb1 ON CONFLICT(id) DO UPDATE SET c1 = EXCLUDED.c1; INSERT 0 1
3.支持GENERATED ALWAYS AS STORED
使用示例以下:
create table tbgenstore(id int, c1 int GENERATE ALWAYS AS (id+1)STORED); select create_distributed_table('tbgenstore','id');
4.支持用户定义的分布式函数
支持用户自定义分布式函数。Citus会把分布式函数(包括聚合函数)以及依赖的对象定义下发到全部Worker上。 后续在执行SQL的时候也能够合理的把分布式函数的执行下推到Worker。
分布式函数还能够和某个分布表绑定"亲和"关系,这一个特性的使用场景以下:
在多租户类型的业务中,把单个租户的一个事务中的多个SQL打包成一个“分布式函数”下发到Worker上。 CN只须要下推一次分布式函数的调用,分布式函数内部的多个SQL的执行所有在Worker节点内部完成。 避免CN和Worker之间来回交互,能够大大提高OLTP的性能(利用这个特性去跑TPCC,简直太溜了!)。
下面看下手册里的例子。
-- an example function which updates a hypothetical -- event_responses table which itself is distributed by event_id CREATE OR REPLACE FUNCTION register_for_event(p_event_id int, p_user_id int) RETURNS void LANGUAGE plpgsql AS $fn$ BEGIN INSERT INTO event_responses VALUES ($1, $2, 'yes') ON CONFLICT (event_id, user_id) DO UPDATE SET response = EXCLUDED.response; END; $fn$; -- distribute the function to workers, using the p_event_id argument -- to determine which shard each invocation affects, and explicitly -- colocating with event_responses which the function updates SELECT create_distributed_function( 'register_for_event(int, int)', 'p_event_id', colocate_with := 'event_responses' );
5.彻底支持聚合函数
Citus中对聚合函数有3种不一样的执行方式
- 按照分片字段分组的聚合,直接下推到Worker执行聚合
- 对部分Citus可以识别的聚合函数,Citus执行两阶段聚合,如今Worker执行部分聚合,再把结果汇总到CN上进行最终聚合。
- 对其余的聚合函数,Citus把数据拉到CN上,在CN上执行聚合。
显然第3种方式性能会比较差,对不按分片字段分组的聚合,怎么让它按第2种方式执行呢?
Citus中预约义了一部分聚合函数能够按第2中方式执行。
citus-9.3.0/src/include/distributed/multi_logical_optimizer.h
:
static const char *const AggregateNames[] = { "invalid", "avg", "min", "max", "sum", "count", "array_agg", "jsonb_agg", "jsonb_object_agg", "json_agg", "json_object_agg", "bit_and", "bit_or", "bool_and", "bool_or", "every", "hll_add_agg", "hll_union_agg", "topn_add_agg", "topn_union_agg", "any_value" };
对不在上面白名单的聚合函数,好比用户自定义的聚合函数,能够经过create_distributed_function()
添加。 示例以下:
citus-9.3.0/src/test/regress/expected/aggregate_support.out
:
create function sum2_sfunc(state int, x int) returns int immutable language plpgsql as $$ begin return state + x; end; $$; create function sum2_finalfunc(state int) returns int immutable language plpgsql as $$ begin return state * 2; end; $$; create aggregate sum2 (int) ( sfunc = sum2_sfunc, stype = int, finalfunc = sum2_finalfunc, combinefunc = sum2_sfunc, initcond = '0' ); select create_distributed_function('sum2(int)');
执行这个自定义的聚合函数的执行计划以下
postgres=# explain select sum2(c1) from tb1; QUERY PLAN ------------------------------------------------------------------------------------------ Aggregate (cost=250.00..250.01 rows=1 width=4) -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=32) Task Count: 32 Tasks Shown: One of 32 -> Task Node: host=127.0.0.1 port=9001 dbname=postgres -> Aggregate (cost=38.25..38.26 rows=1 width=32) -> Seq Scan on tb1_102339 tb1 (cost=0.00..32.60 rows=2260 width=4) (8 rows)
可是当前这种方式不支持stype = internal
的自定义聚合函数。 Citus社区已经在对应这个问题,详细参考https://github.com/citusdata/citus/issues/3916
6.彻底支持窗口函数
对不按分片字段分组的聚合函数,Citus支持把数据拉到CN上再执行,和聚合函数类型。 须要注意这种执行方式对性能的影响,特别是包含多个不一样分组字段的窗口函数时, Worker拉到CN上结果集是这些字段组合的笛卡尔积。
7.支持在事务块中传播LOCAL参数
当在CN的事务块中设置LOCAL参数时,能够把这个参数传播到Worker节点。
前提条件是citus.propagate_set_commands
参数必须为local
set citus.propagate_set_commands TO local;
事务块中设置LOCAL参数
postgres=# begin; BEGIN postgres=*# set local enable_hashagg to off; SET postgres=*# SELECT current_setting('enable_hashagg') FROM tb1 WHERE id = 3; current_setting ----------------- off (1 row)
8. 支持本地表和参考表Join
若是一个数据库须要用到本地表,而本地表和以参考表的形式部署的维表又有Join的需求,改如何处理?
原来咱们只能在CN上再建立一套本地的维表,而后由应用或者经过触发器维护两套维表之间的数据同步。
如今能够用更简单的方式实现。 具体就是把CN节点也能够做为一个Worker加到Citus集群里,groupid必定要设置为0。
SELECT master_add_node('127.0.0.1', 9001, groupid => 0);
这样CN上也就和其余Worker同样拥有了参考表的一个副本,本地表和参考表Join的时候就直接在本地执行了。
DDL支持加强
9.支持把SCHEMA的赋权广播到Worker上
GRANT USAGE ON SCHEMA dist_schema TO role1;
10.支持修改表SCHEMA广播到Worker上
ALTER TABLE ... SET SCHEMA
11.支持建立索引时指定INCLUDE选项
create index tb1_idx_id on tb1(id) include (c1);
12. 支持使用CONCURRENTLY
选项建立索引
create index CONCURRENTLY tb1_idx_id2 on tb1(id);
13. 支持传播REINDEX到worker节点上
以前版本reindex不能传播到Worker节点,还须要到每一个worker分别执行reindex。 新版的Citus支持了。
reindex index tb1_idx_id;
Citus MX功能加强
14.支持在MX 节点上对参考表执行DML
表定义
create table tbref(id int, c1 int); select create_refence_table('tbref');
在MX worker(即扩展worker)上修改参考表
postgres=# insert into tbref values(1,1),(2,2); INSERT 0 2 postgres=# update tbref set c1=10; UPDATE 2 postgres=# delete from tbref where id=1; DELETE 1 postgres=# select * from tbref; id | c1 ----+---- 2 | 10 (1 row)
15.支持在MX节点上执行TRUNCATE
以前MX节点上是不支持对分布表和参考表执行truncate操做的。如今也支持了
postgres=# truncate tb1; TRUNCATE TABLE postgres=# truncate tbref; TRUNCATE TABLE
16.支持在Citus MX架构下使用serial和smallserial
以前在Citus MX(即多CN部署)环境下,自增序列只能使用bigserial类型,如今也能够支持serial和smallserial了。
表定义
create table tbserial(id int,c1 int); select create_distributed_table('tbserial','id');
Citus中,自增字段经过CN和MX节点上逻辑表上的序列对象实现。
postgres=# \d tbserial Table "public.tbserial" Column | Type | Collation | Nullable | Default --------+---------+-----------+----------+-------------------------------------- id | integer | | not null | nextval('tbserial_id_seq'::regclass) c1 | integer | | |
为了防止多个MX节点产生的序列冲突。在Citus MX环境下,序列值的开头部分是产生序列的节点的groupid,后面才是顺序累加的值。 这等于按groupid把序列值分红了不一样的范围,互不重叠。
即:
全局序列值 = groupid,节点内的顺序递增值
对不一样serial的数据类型,groupid占的位数是不同的。具体以下
- bigserial:16bit
- serial:4bit
- smallserial:4bit
根据上groupid占的长度,咱们须要注意
- 单个节点(CN或扩展Worker)上,能产生的序列值的数量变少了,要防止溢出。
- 若是使用了serial或smallserial,最多部署7个扩展Worker节点。
序列对象的定义
上面提到的全局序列的实现具体体现为:在不一样节点上,序列对象定义的范围不同。以下
CN节点上的序列对象定义(CN节点的groupid固定为0)
postgres=# \d tbserial_id_seq Sequence "public.tbserial_id_seq" Type | Start | Minimum | Maximum | Increment | Cycles? | Cache ---------+-------+---------+------------+-----------+---------+------- integer | 1 | 1 | 2147483647 | 1 | no | 1 Owned by: public.tbserial.id
MX Worker节点上的序列对象定义(groupid=1)
postgres=# \d tbserial_id_seq Sequence "public.tbserial_id_seq" Type | Start | Minimum | Maximum | Increment | Cycles? | Cache --------+-----------+-----------+-----------+-----------+---------+------- bigint | 268435457 | 268435457 | 536870913 | 1 | no | 1
如何知道每一个Worker节点的groupid?
每一个Worker节点的groupid能够从pg_dist_node
获取。
postgres=# select * from pg_dist_node; nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards --------+---------+-----------+----------+----------+-------------+----------+----------+-------------+----------------+------------------ 2 | 2 | 127.0.0.1 | 9002 | default | t | t | primary | default | t | t 1 | 1 | 127.0.0.1 | 9001 | default | t | t | primary | default | t | t (2 rows)
也能够在每一个节点本地查询pg_dist_local_group
得到本节点的groupid。
postgres=# select * from pg_dist_local_group; groupid --------- 1 (1 row)
CN节点和普通的Worker节点(非MX Worker)的pg_dist_local_group
中查询到的groupid都为0.
17.在Citus MX经过本地执行提高性能
以前测试Citus MX架构的时候发现,当Citus MX节点上放分片时,性能比不放分片差一倍。 新版的Citus在这方面作了优化,当在Citus MX节点上访问本节点上的分片时,再也不走新建一个到本地的数据库链接再读写分片的常规执行方式。 而是直接用当前链接访问分片。根据下面的测试数据,性能能够提高一倍。
https://github.com/citusdata/citus/pull/2938
- Test 1: HammerDB test with 250 users, 1,000,000 transactions per. 8 Node Citus MX - (a) With local execution: `System achieved 116473 PostgreSQL TPM at 160355 NOPM` - (b) without local execution: ` System achieved 61392 PostgreSQL TPM at 100503 NOPM` - Test 2: HammerDB test with 250 users, 10,000,000 transactions per. 8 Node Citus MX - (a) With local execution: `System achieved 91921 PostgreSQL TPM at 174557 NOPM` - (b) without local execution: ` System achieved 84186 PostgreSQL TPM at 98408 NOPM` - Test 3: Pgbench, 1 worker node, -c64 -c256 -T 120 - (a) Local execution enabled (tps): `select-only`: 56202 `simple-update`: 11771 `tpcb-like`: 7796 - (a) Local execution disabled (tps): `select-only`: 24524 `simple-update`: 5077 `tpcb-like`: 3510 (some connection errors for tpcb-like)
在我司的多CN部署方式下,扩展Worker上是不放分片的。因此这个优化和咱们无关。
性能加强
18.替换real-time
为新的执行器Adaptive Executor
Adaptive Executor
是一个新的执行器,它和real-time
的差别主要体如今能够经过参数对CN到worker的链接数进行控制。具体以下:
-
citus.max_shared_pool_size
能够经过citus.max_shared_pool_size
控制CN(或MX Worker)在单个Worker上可同时创建的最大链接数,默认值等于CN的max_connections
。 达到链接数使用上限后,新的SQL请求可能等待,有些操做不受限制,好比COPY和重分区的Join。 Citus MX架构下,单个Worker上同时接受到链接数最大多是max_shared_pool_size * (1 + MX Worker节点数)
-
citus.max_adaptive_executor_pool_size
能够经过citus.max_adaptive_executor_pool_size
控制CN(或MX Worker)上的单个会话在单个Worker上可同时创建的最大链接数,默认值等于16。 -
citus.max_cached_conns_per_worker
能够经过citus.max_cached_conns_per_worker
控制CN(或MX Worker)上的单个会话在事务结束后对每一个Worker缓存的链接数,默认值等于1。 -
citus.executor_slow_start_interval
对于执行时间很短的多shard的SQL,并发开多个链接,不只频繁建立销毁链接的消耗很高,也极大的消耗了worker上有限的链接资源。 adaptive执行器,在执行多shard的SQL时,不是一次就建立出全部须要的链接数,而是先建立一部分,隔一段时间再建立一部分。 中途若是有shard的任务提早完成了,它的链接能够被复用,就能够减小对新建链接的需求。 所以执行多shard的SQL最少只须要一个链接,最多不超过max_adaptive_executor_pool_size
,固然也不会超过目标worker上的shard数。这个算法叫"慢启动",慢启动的间隔由参数
citus.executor_slow_start_interval
控制,默认值为10ms。 初始建立的链接数是:max(1,citus.max_cached_conns_per_worker
),以后每批新建的链接数都在前一批的基础上加1。 即默认状况下,每批新建的链接数依次为1,2,3,4,5,6..."慢启动"主要优化了短查询,对长查询(手册上给的标准是大于500ms),会增长必定的响应时间。
下面看几个例子
citus.max_shared_pool_size
的使用示例
postgres=# alter system set citus.max_shared_pool_size to 4; ALTER SYSTEM postgres=# select pg_reload_conf(); pg_reload_conf ---------------- t (1 row) postgres=# begin; BEGIN postgres=*# update tb1 set c1=11; UPDATE 1 postgres=*# select * from citus_remote_connection_stats(); hostname | port | database_name | connection_count_to_node -----------+------+---------------+-------------------------- 127.0.0.1 | 9002 | postgres | 4 127.0.0.1 | 9001 | postgres | 4 (2 rows)
citus.executor_slow_start_interval
的使用示例
tb1总共有32个分片,每一个worker上有16个分片。 初始每一个worker上保持2个链接
postgres=# select * from citus_remote_connection_stats(); hostname | port | database_name | connection_count_to_node -----------+------+---------------+-------------------------- 127.0.0.1 | 9002 | postgres | 2 127.0.0.1 | 9001 | postgres | 2 (2 rows)
citus.executor_slow_start_interval = '10ms'
时,执行一个空表的update,只额外建立了2个新链接。
postgres=# set citus.executor_slow_start_interval='10ms'; SET postgres=# begin; BEGIN postgres=*# update tb1 set c1=100; UPDATE 0 postgres=*# select * from citus_remote_connection_stats(); hostname | port | database_name | connection_count_to_node -----------+------+---------------+-------------------------- 127.0.0.1 | 9002 | postgres | 4 127.0.0.1 | 9001 | postgres | 4 (2 rows)
citus.executor_slow_start_interval = '500ms'
时,没有建立新的链接,都复用了一个缓存的链接
postgres=# set citus.executor_slow_start_interval='500ms'; SET postgres=# begin; BEGIN postgres=*# update tb1 set c1=100; UPDATE 0 postgres=*# select * from citus_remote_connection_stats(); hostname | port | database_name | connection_count_to_node -----------+------+---------------+-------------------------- 127.0.0.1 | 9002 | postgres | 2 127.0.0.1 | 9001 | postgres | 2 (2 rows)
citus.executor_slow_start_interval = '0ms'
时,建立了比较多的新链接。
postgres=# set citus.executor_slow_start_interval = '0ms'; SET postgres=# begin; BEGIN postgres=*# update tb1 set c1=100; UPDATE 0 postgres=*# select * from citus_remote_connection_stats(); hostname | port | database_name | connection_count_to_node -----------+------+---------------+-------------------------- 127.0.0.1 | 9002 | postgres | 5 127.0.0.1 | 9001 | postgres | 14 (2 rows)
参考
adaptive执行器链接建立"慢启动"的代码参考:
citus-9.3.0/src/backend/distributed/executor/adaptive_executor.c:
static void ManageWorkerPool(WorkerPool *workerPool) { ... /* cannot open more than targetPoolSize connections */ int maxNewConnectionCount = targetPoolSize - initiatedConnectionCount;//targetPoolSize的值为max(1,`citus.max_cached_conns_per_worker`) /* total number of connections that are (almost) available for tasks */ int usableConnectionCount = UsableConnectionCount(workerPool); /* * Number of additional connections we would need to run all ready tasks in * parallel. */ int newConnectionsForReadyTasks = readyTaskCount - usableConnectionCount; /* * Open enough connections to handle all tasks that are ready, but no more * than the target pool size. */ newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount); if (newConnectionCount > 0 && ExecutorSlowStartInterval != SLOW_START_DISABLED) { if (MillisecondsPassedSince(workerPool->lastConnectionOpenTime) >= ExecutorSlowStartInterval) { newConnectionCount = Min(newConnectionCount, workerPool->maxNewConnectionsPerCycle); /* increase the open rate every cycle (like TCP slow start) */ workerPool->maxNewConnectionsPerCycle += 1; } else { /* wait a bit until opening more connections */ return; } }
19.经过adaptive执行器执行重分布的Join
当citus.enable_repartition_joins=on
时,Citus支持经过数据重分布的方式执行非亲和Inner Join, 以前版本Citus会自动切换到task-tracker
执行器执行重分布的Join,可是使用task-tracker
执行器须要CN节点给Worker下发任务再不断检查任务完成状态,其额外消耗很大,响应时间很是长。
新版Citus改进后,能够经过adaptive执行器执行重分布的Join。
根据官网博客,1000w如下数据的重分布Join,性能提高了10倍。 详细参考:https://www.citusdata.com/blog/2020/03/02/citus-9-2-speeds-up-large-scale-htap/
咱们本身的简单测试中,2张空表的重分布Join,以前须要16秒,如今只须要2秒。
20.支持重分布的方式执行INSERT...SELECT
表定义
create table tb1(id int, c1 int); select create_distributed_table('tb1','id'); set citus.shard_count to 16; create table tb2(id int primary key, c1 int); select create_distributed_table('tb2','id');
tb1和tb2的分片数不同,即它们不是亲和的。 此前,Citus必须把数据全拉到CN节点上中转。 新版Citus能够经过重分布的方式执行这个SQL,各个Worker之间直接互相传送数据,CN节点只执行工具函数驱动任务执行,性能可大幅提高。
postgres=# explain INSERT INTO tb2 SELECT * from tb1 ON CONFLICT(id) DO UPDATE SET c1 = EXCLUDED.c1; QUERY PLAN ------------------------------------------------------------------------------------ Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0) INSERT/SELECT method: repartition -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=8) Task Count: 32 Tasks Shown: One of 32 -> Task Node: host=127.0.0.1 port=9001 dbname=postgres -> Seq Scan on tb1_102339 tb1 (cost=0.00..32.60 rows=2260 width=8) (8 rows)
根据官网博客,这项优化使性能提高了5倍。 详细参考:https://www.citusdata.com/blog/2020/03/02/citus-9-2-speeds-up-large-scale-htap/
须要注意的是,若是插入时,须要在目标表上自动生成自增字段,Citus会退回到原来的执行方式,数据都会通过CN中转一下。
21.支持以轮询的方式访问参考表的多个副本
以前Citus查询参考表时,始终只访问参考表的第一个副本,新版Citus能够经过参数设置,在参考表多个副本轮询访问,均衡负载。
postgres=# set citus.task_assignment_policy TO "round-robin"; SET postgres=# explain select * from tbref; QUERY PLAN ---------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task Node: host=127.0.0.1 port=9001 dbname=postgres -> Seq Scan on tbref_102371 tbref (cost=0.00..32.60 rows=2260 width=8) (6 rows) postgres=# explain select * from tbref; QUERY PLAN ---------------------------------------------------------------------------------- Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task Node: host=127.0.0.1 port=9002 dbname=postgres -> Seq Scan on tbref_102371 tbref (cost=0.00..32.60 rows=2260 width=8) (6 rows)
citus.task_assignment_policy
的默认值是greedy。greedy比较适合多副本的分布表。 对于涉及多个shard的SQL,每一个shard都有多个可选的副本,在greedy策略下, Citus会尽可能确保每一个worker分配到任务数相同。
具体实现时Citus一次轮询全部Worker,直到把全部shard任务都分配完。 所以对参考表这种只有一个shard的场景,greedy会致使其始终把任务分配给第一个worker。 详细能够参考GreedyAssignTaskList()
函数的代码。
22.表数据导出优化
Citus导出数据时,中间结果会写到在CN上,并且CN从Worker拉数据是并行拉的,不过Worker仍是CN负载都会很高。 新版Citus优化了COPY导出处理,依次从每一个Worker上抽出数返回给客户端,中途数据不落盘。
可是这一优化只适用于下面这种固定形式的全表COPY到STDOUT的场景
COPY table tb1 to STDOUT
这能够大大优化pg_dump
,延迟更低,内存使用更少。
集群管理加强
23.支持控制worker不分配shard
能够经过设置节点的shouldhaveshards属性控制某个节点不放分片。
SELECT master_set_node_property('127.0.0.1', 9002, 'shouldhaveshards', false);
shouldhaveshards属性会对后续建立新的分布表和参考表生效。 也会对后续执行的企业版Citus的rebalance功能生效,社区版不支持rebalance,但若是自研Citus部署和维护工具也能够利用这个参数。
- 扩展Worker的实现逻辑改成使用这个参数,简化处理逻辑,不用先建好分布表后再挪分片。
- 扩缩容脚本也可使用这个参数决定Worker上是否放置分片,不须要区分是否是
所有是扩展Worker
的部署架构
24.支持使用master_update_node
实施failover
采用主备流复制实现Worker高可用时,通常CN经过VIP访问Worker,worker主备切换时只须要漂移vip到新的主节点便可。 新版Citus提供了一个新的可选方案,经过master_update_node()
函数修改某个worker的IP和Port。 这提供了一种新的不依赖VIP的Worker HA实现方案。
postgres=# \df master_update_node List of functions -[ RECORD 1 ]-------+----------------------------------------------------------------------------------------------------------------------------- Schema | pg_catalog Name | master_update_node Result data type | void Argument data types | node_id integer, new_node_name text, new_node_port integer, force boolean DEFAULT false, lock_cooldown integer DEFAULT 10000 Type | func
25.支持变动亲和定义
新版Citus能够在分布表建立后,修改亲和关系。
表定义
create table tba(id int,c1 int); select create_distributed_table('tba','id'); create table tbb(id int,c1 int); select create_distributed_table('tbb','id'); create table tbc(id text,c1 int); select create_distributed_table('tbc','id');
tba和tbb这两个表是亲和的
postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass); logicalrelid | partmethod | partkey | colocationid | repmodel --------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+---------- tba | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 3 | s tbb | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 3 | s (2 rows)
将tbb设置为新的亲和ID,打破它们的亲和关系
postgres=# SELECT update_distributed_table_colocation('tbb', colocate_with => 'none'); update_distributed_table_colocation ------------------------------------- (1 row) postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass); logicalrelid | partmethod | partkey | colocationid | repmodel --------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+---------- tba | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 3 | s tbb | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 14 | s (2 rows)
从新设置它们亲和
postgres=# SELECT update_distributed_table_colocation('tbb', colocate_with => 'tba'); update_distributed_table_colocation ------------------------------------- (1 row) postgres=# select * from pg_dist_partition where logicalrelid in ('tba'::regclass,'tbb'::regclass); logicalrelid | partmethod | partkey | colocationid | repmodel --------------+------------+------------------------------------------------------------------------------------------------------------------------+--------------+---------- tba | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 3 | s tbb | h | {VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 3 | s (2 rows)
也能够用批量将一组表设置为和某一个表亲和
postgres=# SELECT mark_tables_colocated('tba', ARRAY['tbb', 'tbc']); ERROR: cannot colocate tables tba and tbc DETAIL: Distribution column types don't match for tba and tbc.
tbc的分片字段类型不一致,不能亲和,去掉tbc再次执行成功。
postgres=# SELECT mark_tables_colocated('tba', ARRAY['tbb']); mark_tables_colocated ----------------------- (1 row)
26.支持truncate分布表的本地数据
把一个原来就有数据的本地表建立成分布表,会把原来的数据拷贝到各个shard上,但原始本地表上的数据不会删除,只是对用户不可见。
原来没有直接的办法删掉这些不须要的本地数据(能够经过临时篡改元数据的方式删),如今能够用一个函数实现。
SELECT truncate_local_data_after_distributing_table('tb1');
27. 延迟复制参考表副本
当新的Worker节点添加到Citus集群的时候,会同步参考表的副本到上面。 若是集群中存在比较大参考表,会致使添加Worker节点的时间不可控。 这可能使得用户不敢在业务高峰期扩容节点。
如今Citus能够支持把参考表的同步延迟到下次建立分片的的时候。 方法就是设置下面这个参数为off,它的默认值为on。
citus.replicate_reference_tables_on_activate = off
这样咱们能够在白天扩容,夜里在后台同步数据。
28.建立集群范围一致的恢复点
以前咱们备份Citus集群的时候,都是各个节点各自备份恢复,真发生故障,没办法恢复到一个集群范围的一致点。
如今可使用下面的函数,建立一个全局的恢复点实行全局一致性备份。 使用方法相似于PG的pg_create_restore_point(),详细可参考手册。
select citus_create_restore_point('foo');
29.支持设置Citus集群节点间互联的链接选项
能够经过citus.node_conninfo
参数设置Citus内节点间互连的一些非敏感的链接选项。 支持链接选项下面的libpq的一个子集。
- application_name
- connect_timeout
- gsslib
- keepalives
- keepalives_count
- keepalives_idle
- keepalives_interval
- krbsrvname
- sslcompression
- sslcrl
- sslmode (defaults to “require” as of Citus 8.1)
- sslrootcert
Citus 8.1之后,在支持SSL的PostgreSQL上,citus.node_conninfo
的默认值为'sslmode=require'。 即默认开启了SSL。这是Citus出于安全的考虑,可是启用SSL后部署和维护会比较麻烦。 所以咱们的部署环境下,须要将其修改成sslmode=prefer
。
postgres=# show citus.node_conninfo; citus.node_conninfo --------------------- sslmode=prefer (1 row)
30.默认关闭Citus统计收集
以前Citus的守护进程默认会收集Citus集群的一些元数据信息上报到CitusData公司的服务上(明显有安全问题)。 新版本把这个功能默认关闭了。固然更完全的作法是在编译Citus的时候就把这个功能屏蔽掉。
postgres=# show citus.enable_statistics_collection; citus.enable_statistics_collection ------------------------------------ off (1 row)
31. 增长查看集群范围活动的函数和视图
新版Citus提供了几个函数和视图,能够在CN上很是方便的查看总体Citus的当前活动情况
citus_remote_connection_stats()
查看全部worker上的来自CN节点和MX Worker节点的远程链接数。
postgres=# select * from citus_remote_connection_stats(); hostname | port | database_name | connection_count_to_node -----------+------+---------------+-------------------------- 127.0.0.1 | 9002 | postgres | 3 127.0.0.1 | 9001 | postgres | 3 (2 rows)
citus_dist_stat_activity
查看从本CN节点或MX worker上发起的活动。这个视图在pg_stat_activity
上附加了一些Citus相关的信息。
postgres=# select * from citus_dist_stat_activity; -[ RECORD 1 ]----------+------------------------------ query_hostname | coordinator_host query_hostport | 9000 master_query_host_name | coordinator_host master_query_host_port | 9000 transaction_number | 57 transaction_stamp | 2020-06-19 15:05:22.142242+08 datid | 13593 datname | postgres pid | 2574 usesysid | 10 usename | postgres application_name | psql client_addr | client_hostname | client_port | -1 backend_start | 2020-06-19 10:57:58.472994+08 xact_start | 2020-06-19 15:05:17.45487+08 query_start | 2020-06-19 15:05:22.140954+08 state_change | 2020-06-19 15:05:22.140957+08 wait_event_type | Client wait_event | ClientRead state | active backend_xid | backend_xmin | 5114 query | select * from tb1; backend_type | client backend
注意上面的transaction_number
,它表明一个事务号。 涉及更新的SQL,事务块中查询和push-pull
方式执行的查询都会分配一个非0的事务号。 经过这个事务号,咱们能够很容易地识别出全部worker上来自同一SQL(或事务)的活动。
详细参考下面的注释。(这段注释应该写错了,下面2类SQL的区别不是是否能被'show',而是transaction_number
是否非0) citus-9.3.0/src/backend/distributed/transaction/citus_dist_stat_activity.c
* An important note on this views is that they only show the activity * that are inside distributed transactions. Distributed transactions * cover the following: * - All multi-shard modifications (DDLs, COPY, UPDATE, DELETE, INSERT .. SELECT) * - All multi-shard queries with CTEs (modifying CTEs, read-only CTEs) * - All recursively planned subqueries * - All queries within transaction blocks (BEGIN; query; COMMMIT;) * * In other words, the following types of queries won't be observed in these * views: * - Single-shard queries that are not inside transaction blocks * - Multi-shard select queries that are not inside transaction blocks * - Task-tracker queries
citus_worker_stat_activity
查看全部worker上的活动。排除非citus会话,即不通过CN或MX worker直连worker的会话。
咱们能够指定transaction_number
查看特定SQL在worker上的活动。
postgres=# select * from citus_worker_stat_activity where transaction_number = 57; -[ RECORD 1 ]----------+--------------------------------------------- query_hostname | 127.0.0.1 query_hostport | 9001 master_query_host_name | coordinator_host master_query_host_port | 9000 transaction_number | 57 transaction_stamp | 2020-06-19 15:05:22.142242+08 datid | 13593 datname | postgres pid | 4108 usesysid | 10 usename | postgres application_name | citus client_addr | 127.0.0.1 client_hostname | client_port | 33676 backend_start | 2020-06-19 15:05:22.162829+08 xact_start | 2020-06-19 15:05:22.168811+08 query_start | 2020-06-19 15:05:22.171398+08 state_change | 2020-06-19 15:05:22.172237+08 wait_event_type | Client wait_event | ClientRead state | idle in transaction backend_xid | backend_xmin | query | SELECT id, c1 FROM tb1_102369 tb1 WHERE true backend_type | client backend ...
citus_lock_waits
查看Citus集群内的被阻塞的查询。下面引用Ciuts手册上的例子
表定义
CREATE TABLE numbers AS SELECT i, 0 AS j FROM generate_series(1,10) AS i; SELECT create_distributed_table('numbers', 'i');
使用2个会话终端,顺序执行下面的SQL。
-- session 1 -- session 2 ------------------------------------- ------------------------------------- BEGIN; UPDATE numbers SET j = 2 WHERE i = 1; BEGIN; UPDATE numbers SET j = 3 WHERE i = 1; -- (this blocks)
经过citus_lock_waits
能够看到,这2个查询是阻塞状态。
SELECT * FROM citus_lock_waits; -[ RECORD 1 ]-------------------------+---------------------------------------- waiting_pid | 88624 blocking_pid | 88615 blocked_statement | UPDATE numbers SET j = 3 WHERE i = 1; current_statement_in_blocking_process | UPDATE numbers SET j = 2 WHERE i = 1; waiting_node_id | 0 blocking_node_id | 0 waiting_node_name | coordinator_host blocking_node_name | coordinator_host waiting_node_port | 5432 blocking_node_port | 5432
这个视图只能在CN节点查看,MX worker节点查不到数据。可是并不要求阻塞所涉及的SQL必须从CN节点发起。
32. 增长查看表元数据的函数和视图
master_get_table_metadata()
查看分布表的元数据
postgres=# select * from master_get_table_metadata('tb1'); -[ RECORD 1 ]---------+----------- logical_relid | 17148 part_storage_type | t part_method | h part_key | id part_replica_count | 1 part_max_size | 1073741824 part_placement_policy | 2
get_shard_id_for_distribution_column()
查看某个分布列值对应的shardid
postgres=# SELECT get_shard_id_for_distribution_column('tb1', 4); get_shard_id_for_distribution_column -------------------------------------- 102347 (1 row)
其余
33. 容许在CN备库执行简单的DML
经过设置citus.writable_standby_coordinator
参数为on,能够在CN的备库上执行部分简单的DML。 看下下面的例子
表定义
create table tbl(id int,c1 int); select create_distributed_table('tbserial','id');
在CN备节点上能够执行带分片字段的DML
postgres=# insert into tb1 values(3,3); ERROR: writing to worker nodes is not currently allowed DETAIL: the database is in recovery mode postgres=# set citus.writable_standby_coordinator TO ON; SET postgres=# insert into tb1 values(3,3); INSERT 0 1 postgres=# update tb1 set c1=20 where id=3; UPDATE 1 postgres=# delete from tb1 where id=3; DELETE 1
不支持不带分片字段的UPDATE和DELETE
postgres=# update tb1 set c1=20; ERROR: cannot assign TransactionIds during recovery postgres=# delete from tb1 where c1=20; ERROR: cannot assign TransactionIds during recovery
也不支持跨节点的事务
postgres=# begin; BEGIN postgres=*# insert into tb1 values(3,3); INSERT 0 1 postgres=*# insert into tb1 values(4,4); INSERT 0 1 postgres=*# commit; ERROR: cannot assign TransactionIds during recovery
对于2pc的分布式事务,Citus须要将事务信息记录到事务表pg_dist_transaction
中。 因此,Citus也没法在CN备节点上支持2pc的分布式事务。
可是若是切换成1pc提交模式,仍是能够支持跨节点事务的。
postgres=# set citus.multi_shard_commit_protocol TO '1pc'; SET postgres=# begin; BEGIN postgres=*# insert into tb1 values(4,4); INSERT 0 1 postgres=*# insert into tb1 values(5,5); INSERT 0 1 postgres=*# commit;
而且在1pc提交模式下,跨多个分片的SQL也是支持的。
postgres=# set citus.multi_shard_commit_protocol TO '1pc'; SET postgres=# update tb1 set c1=10; UPDATE 3