EXPLAIN ANALYZE on COUNT() pushdown

In a distributed database, some operations are offloaded to the storage layer in order to avoid shipping though the network a large number of rows that will be filtered out or aggregated later. I said offloaded because that's the term you know if you heard about Oracle Exadata, but this can be confusing with offloading reads on replicas. YugabyteDB started to support mostly all SQL (PostgreSQL) features and improves with pushdowns some of them, to the storage layer (DocDB), when the need for optimization comes. As it is used mostly for OLTP workloads, aggregations on a large number of rows is not a critical use case. However, many applications count the rows from a table, for good reasons or not. So this top-level COUNT aggregation is one of the first that has been pushed-down.

And here is how to see it from the execution plan, which is, for the moment, the PostgreSQL one without specific information about pushdowns.

I create a one million rows table

yugabyte=# create table demo (id bigint primary key);
CREATE TABLE
yugabyte=# insert into demo select generate_series id from generate_series(1,1000000);
INSERT 0 1000000

I count the rows, with EXPLAIN ANALYZE:

yugabyte=# explain (analyze, summary false) select count(*)from demo;
                                                 QUERY PLAN
-------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=102.50..102.51 rows=1 width=8) (actual time=604.684..604.685 rows=1 loops=1)
   ->  Seq Scan on demo  (cost=0.00..100.00 rows=1000 width=0) (actual time=604.670..604.676 rows=6 loops=1)

Scroll to the right to get the "actual" values rather than the estimated ones. The actual number of rows returned by the Seq Scan is rows=6. This is because a per-tablet count was already executed on the storage layer as we will see later.

PostgreSQL

The same in PostgreSQL would have displayed the million rows from Seq Scan, aggregated to 1 row (the count result) by the Aggregate in the parent operation:

postgres=# show max_parallel_workers_per_gather;
 max_parallel_workers_per_gather
---------------------------------
 2

postgres=# set max_parallel_workers_per_gather=0;
SET
postgres=# explain (analyze, summary false) select count(*)from demo;
                                                      QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=16925.00..16925.01 rows=1 width=8) (actual time=321.357..321.358 rows=1 loops=1)
   ->  Seq Scan on demo  (cost=0.00..14425.00 rows=1000000 width=0) (actual time=0.014..155.062 rows=1000000 loops=1)

I have disabled parallel query because, with parallelism, EXPLAIN ANALYZE shows only the count from the main backend process, not the workers, so one third if I have two workers:

postgres=# show max_parallel_workers_per_gather;
 max_parallel_workers_per_gather
--------------------------------------
 2

postgres=# explain (analyze, summary false) select count(*)from demo;
                                                              QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=10633.81..10633.82 rows=1 width=8) (actual time=124.288..125.051 rows=1 loops=1)
   ->  Gather  (cost=10633.59..10633.81 rows=2 width=8) (actual time=124.126..125.041 rows=3 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial Aggregate  (cost=9633.59..9633.60 rows=1 width=8) (actual time=119.838..119.839 rows=1 loops=3)
               ->  Parallel Seq Scan on demo  (cost=0.00..8591.88 rows=416688 width=0) (actual time=0.024..73.334 rows=333333 loops=3)

This is a bit misleading because the loops=3 are for leader plus workers but the rows=333333 is the mean for the 3 loops (Thanks @michristofidesšŸ˜‰). All doubts are cleared with the VERBOSE option:

postgres=# explain (verbose,analyze, summary false) select count(*)from demo;
                                                                  QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=10633.81..10633.82 rows=1 width=8) (actual time=84.420..85.632 rows=1 loops=1)
   Output: count(*)
   ->  Gather  (cost=10633.59..10633.81 rows=2 width=8) (actual time=84.225..85.621 rows=3 loops=1)
         Output: (PARTIAL count(*))
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial Aggregate  (cost=9633.59..9633.60 rows=1 width=8) (actual time=82.101..82.102 rows=1 loops=3)
               Output: PARTIAL count(*)
               Worker 0:  actual time=81.174..81.175 rows=1 loops=1
               Worker 1:  actual time=81.191..81.192 rows=1 loops=1
               ->  Parallel Seq Scan on public.demo  (cost=0.00..8591.88 rows=416688 width=0) (actual time=0.014..46.421 rows=333333 loops=3)
                     Output: id
                     Worker 0:  actual time=0.012..45.723 rows=335610 loops=1
                     Worker 1:  actual time=0.013..46.258 rows=324034 loops=1

The leader processed 3*333333-335610-324034=340355 plus 1 that was lost in the average rounding.

YugabyteDB

But currently not all functions are implemented with pushdown. Let's do the same on a 1 node cluster with yb_num_shards_per_tserver=1. I leave my credentials so you can test but CPU is limited there so it can take longer that usual.

$ psql postgres://franck:[email protected]:5433/yb_demo_northwind

yb_demo_northwind=> create table demo (id bigint primary key, value int);
CREATE TABLE
yb_demo_northwind=> insert into demo select generate_series id from generate_series(1,1000000);
INSERT 0 1000000
yb_demo_northwind=> explain (analyze, summary false) select count(*)from demo;
                                                   QUERY PLAN
----------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=102.50..102.51 rows=1 width=8) (actual time=26197.534..26197.534 rows=1 loops=1)
   ->  Seq Scan on demo  (cost=0.00..100.00 rows=1000 width=0) (actual time=26197.519..26197.522 rows=1 loops=1)

You can see rows=1 in the actual statistics, from the Seq Scan because the count has been pushed down and my small table is split to one tablet only here.

The same for COUNT(column) even when the column is nullable:

yb_demo_northwind=> explain (analyze, summary false) select count(value)from demo;
                                                   QUERY PLAN
----------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=102.50..102.51 rows=1 width=8) (actual time=14680.306..14680.306 rows=1 loops=1)
   ->  Seq Scan on demo  (cost=0.00..100.00 rows=1000 width=4) (actual time=14680.295..14680.298 rows=1 loops=1)

Currently, the DocDB layer cannot offload operations on columns, then with a where clause the filtering happens on the PostgreSQL layer:

yb_demo_northwind=> explain (analyze, summary false) select count(*)from demo where id<42;
                                                   QUERY PLAN
---------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=105.00..105.01 rows=1 width=8) (actual time=22545.719..22545.719 rows=1 loops=1)
   ->  Seq Scan on demo  (cost=0.00..102.50 rows=1000 width=0) (actual time=132.771..22545.647 rows=41 loops=1)
         Filter: (id < 42)
         Rows Removed by Filter: 999959

Now, is it a problem? On a distributed database, SQL or NoSQL, the sharding method is the most important decision. By default, it is hash partitioned on the primary key. But if you need to scan a range of value, like this <42 where clause, a range partitioning is the best decision. I redefine the table with id as an ascending sort key:

yb_demo_northwind=> create table demo (id bigint, value int, primary key(id asc));
CREATE TABLE
yb_demo_northwind=> insert into demo select generate_series id from generate_series(1,1000000);
INSERT 0 1000000

Now, with range partitioning, we can benefit from Predicate Pushdown (Read more about pushdowns on this blog post):

yb_demo_northwind=> explain (analyze, summary false) select count(*)from demo where id<42;
                                                        QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=4.12..4.12 rows=1 width=8) (actual time=123.770..123.770 rows=1 loops=1)
   ->  Index Scan using demo_pkey on demo  (cost=0.00..4.11 rows=1 width=0) (actual time=123.737..123.761 rows=41 loops=1)
         Index Cond: (id < 42)

When the primary key was hash partitioned, a scan was seen as a Seq Scan by the PostgreSQL layer as it has to read all tablets. Now that it is range partitioned, the same scan is seen as an Index Scan with a condition.

Finally I used COUNT but MIN, MAX, SUM are also pushed down when possible:

yb_demo_northwind=> \d orders
                           Table "public.orders"
      Column      |         Type          | Collation | Nullable | Default
-----------------------+-----------------------+-----------+----------+---------
 order_id         | smallint              |           | not null |
 customer_id      | bpchar                |           |          |
 employee_id      | smallint              |           |          |
 order_date       | date                  |           |          |
 required_date    | date                  |           |          |
 shipped_date     | date                  |           |          |
 ship_via         | smallint              |           |          |
 freight          | real                  |           |          |
 ship_name        | character varying(40) |           |          |
 ship_address     | character varying(60) |           |          |
 ship_city        | character varying(15) |           |          |
 ship_region      | character varying(15) |           |          |
 ship_postal_code | character varying(10) |           |          |
 ship_country     | character varying(15) |           |          |
...

yb_demo_northwind=> explain (analyze, summary false) select count(*), max(shipped_date), min(freight) from orders;
                                                 QUERY PLAN
------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=107.50..107.51 rows=1 width=16) (actual time=11.498..11.498 rows=1 loops=1)
   ->  Seq Scan on orders  (cost=0.00..100.00 rows=1000 width=8) (actual time=11.475..11.478 rows=1 loops=1)

So aggregation on integers and reals are pushed down. However, text datatypes require specific database logic which is not yet available in DocDB:

yb_demo_northwind=> explain (analyze, summary false) select min(ship_postal_code), count(*), max(shipped_date), min(freight) from orders;
                                                   QUERY PLAN
---------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=110.00..110.01 rows=1 width=48) (actual time=14.601..14.602 rows=1 loops=1)
   ->  Seq Scan on orders  (cost=0.00..100.00 rows=1000 width=46) (actual time=13.990..14.354 rows=830 loops=1)

In summary, with YugabyteDB as with PostgreSQL, the EXPLAIN plan, especially with the ANALYZE execution statistics, helps to understand what happens. But for a distributed database you also need to understand the partitioning scheme. Don't worry, it is well documented, open source and there's a community to help.

22