Using DuckDB With CockroachDB –

by Blog Admin
0 comment
using-duckdb-with-cockroachdb-–

Motivation

CockroachDB has native support for change data capture. It supports object storage sinks across all major cloud providers. At the time of writing, there are a couple of supported formats available like Avro and Newline Delimited JSON. Up until now, I’ve been avoiding Newline Delimited JSON because I don’t find it easy to use. Today, I’d like to look at DuckDB as a viable tool to parse the CDC-generated output in newline-delimited format.

High-Level Steps

  • Start a CockroachDB cluster
  • Parse CockroachDB newly-delimited changefeed output using DuckDB
  • Query CockroachDB tables using DuckDB
  • Conclusion

Step-By-Step Instructions

Start a CockroachDB Cluster

I am using a serverless instance of CockroachDB. It has enterprise change feeds enabled by default. You can sign up for a free instance.

Parse CockroachDB Newly-Delimited Changefeed Ouptut Using DuckDB

We’re going to follow the example to send sample data to an S3 bucket. DuckDB supports reading from S3 directly but today I’m going to download files to my machine and parse them locally.

I’m using the tpcc workload to generate changefeed data but you can use the example in the doc above.

Initialize:

cockroach workload init tpcc   --warehouses 100 $DATABASE_URL

Execute the workload:

cockroach workload run tpcc   --duration=120m   --concurrency=3   --max-rate=1000   --tolerate-errors   --warehouses=10   --conns 60   --ramp=1m   --workers=100   $DATABASE_URL

Create a changefeed job:

CREATE CHANGEFEED FOR TABLE history INTO 's3://artemawsbucket/tpcc/history?AWS_ACCESS_KEY_ID=&AWS_SECRET_ACCESS_KEY=' with updated;

Then, navigate to your S3 bucket and find the files there.

Navigate to your S3 bucket and find the files there

Copy data from S3 to your filesystem.

aws s3 cp s3://artemawsbucket/tpcc/history . --recursive

Install duckdb:

Finally, navigate to the directory with the JSON files and start duckdb.

Looking at the available JSON functions, the standard JSON function works.

SELECT * FROM read_json_objects('202305161404194891609990000000000-fb5d1ff7b5a47331-2-15-00000000-history-a.ndjson');

│ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2404, "h_c_w_id": 1, "h_d_id": 8, "h_data": "9v3L5bOacQHehuVoJHJ2vp…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2431, "h_c_w_id": 1, "h_d_id": 8, "h_data": "ljve8BmeEvbQ5dJWLgvcp"…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 8, "h_c_id": 2382, "h_c_w_id": 1, "h_d_id": 8, "h_data": "ve8BmeEvbQ5dJWLgvcp", …  │

Similarly, there’s a newline-delimited function read_ndjson_objects. This time we’re going to use globbing instead of individual files. We’re also going to limit the output as my entire dataset is 3 million rows.

SELECT * FROM read_ndjson_objects('*.ndjson') LIMIT 5;

┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │                                                             json                                                             │ │                                                             json                                                             │ ├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1166, "h_c_w_id": 25, "h_d_id": 10, "h_data": "Z5x9v3L5bOacQHehuVo…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1181, "h_c_w_id": 25, "h_d_id": 10, "h_data": "3L5bOacQHehuVoJHJ2v…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1171, "h_c_w_id": 25, "h_d_id": 10, "h_data": "L5bOacQHehuVoJHJ2vp…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1188, "h_c_w_id": 25, "h_d_id": 10, "h_data": "cQHehuVoJHJ2vp", "h…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1184, "h_c_w_id": 25, "h_d_id": 10, "h_data": "VzccrxcAzZ5x9v3L5b"…  │ └──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────

We can create a DuckDB table out of the JSON files.

CREATE TABLE history AS SELECT * FROM read_ndjson_objects('*.ndjson');

┌─────────┐ │  name   │ │ varchar │ ├─────────┤ │ history │ └─────────┘

select json as col from history limit 5;

┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │                                                             col                                                              │ │                                                             json                                                             │ ├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1166, "h_c_w_id": 25, "h_d_id": 10, "h_data": "Z5x9v3L5bOacQHehuVo…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1181, "h_c_w_id": 25, "h_d_id": 10, "h_data": "3L5bOacQHehuVoJHJ2v…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1171, "h_c_w_id": 25, "h_d_id": 10, "h_data": "L5bOacQHehuVoJHJ2vp…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1188, "h_c_w_id": 25, "h_d_id": 10, "h_data": "cQHehuVoJHJ2vp", "h…  │ │ {"after": {"h_amount": 10.00, "h_c_d_id": 10, "h_c_id": 1184, "h_c_w_id": 25, "h_d_id": 10, "h_data": "VzccrxcAzZ5x9v3L5b"…  │ └──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

We can query the individual columns.

select json->'after'->'h_amount' from history limit 1; ┌─────────────────────────────────┐ │ "json" -> 'after' -> 'h_amount' │ │              json               │ ├─────────────────────────────────┤ │ 10.0                            │ └─────────────────────────────────┘

We can cast, too.

select json->'after'->'h_data', cast (json->'after'->'h_c_id' as integer) as c_id from history where c_id > 2000 limit 5; ┌───────────────────────────────┬───────┐ │ "json" -> 'after' -> 'h_data' │ c_id  │ │             json              │ int32 │ ├───────────────────────────────┼───────┤ │ "7xrljve8BmeEvbQ5dJW"         │  2002 │ │ "AzZ5x9v3L5bOac"              │  2001 │ │ "x9v3L5bOacQHehuVoJ"          │  2024 │ │ "2vp7xrljve8Bme"              │  2006 │ │ "UtEdpJzCGyo91sT"             │  2029 │ └───────────────────────────────┴───────┘

We can use ->> notation to output values as varchar instead of JSON.

SELECT distinct(cast (json->>'after'->>'h_amount' as float)) FROM history LIMIT 5; ┌──────────────────────────────────────────────────────┐ │ CAST((("json" ->> 'after') ->> 'h_amount') AS FLOAT) │ │                        float                         │ ├──────────────────────────────────────────────────────┤ │                                                 10.0 │ │                                              2612.12 │ │                                              3986.51 │ │                                              2836.18 │ │                                                359.5 │ └──────────────────────────────────────────────────────┘

Another useful JSON function is read_json_auto. It handles column types implicitly.

SELECT * FROM read_json_auto('*.ndjson'); ┌──────────────────────────────────────────────┬──────────────────────────────────────────────┬────────────────────────────────┐ │                    after                     │                     key                      │            updated             │ │ struct(h_amount double, h_c_d_id ubigint, …  │                    json[]                    │            varchar             │ ├──────────────────────────────────────────────┼──────────────────────────────────────────────┼────────────────────────────────┤ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "42674618-a16f-4000-8000-0000000bdfb5"] │ 1684245859489160999.0000000000 │ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "426799fb-7793-4c00-8000-0000000bdfc4"] │ 1684245859489160999.0000000000 │ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "4267620e-e8d1-4000-8000-0000000bdfba"] │ 1684245859489160999.0000000000 │ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "4267c121-0eb5-4800-8000-0000000bdfcb"] │ 1684245859489160999.0000000000 │ │ {'h_amount': 10.0, 'h_c_d_id': 10, 'h_c_id…  │ [25, "4267aac2-6f34-4400-8000-0000000bdfc7"] │ 1684245859489160999.0000000000 │

We can drill down to the individual array index level.

SELECT CAST (key->0 AS INTEGER) AS hkey FROM read_json_auto('*.ndjson') WHERE hkey = 25 LIMIT 5; ┌───────┐ │ hkey  │ │ int32 │ ├───────┤ │    25 │ │    25 │ │    25 │ │    25 │ │    25 │ └───────┘

This has a lot of promise and I will look closely as DuckDB grows in popularity. It will definitely help in analyzing the CDC output.

Query CockroachDB Tables Using DuckDB

DuckDB supports querying PostgreSQL directly using the PostgreSQL extension, and today I’d like to see if we can do the same by accessing CockroachDB.

INSTALL postgres_scanner; LOAD postgres_scanner;  CREATE SCHEMA abc;  CALL postgres_attach('dbname=defaultdb user=artem host=hostname port=26257 password=password' sslmode=verify-full sslrootcert=certlocation, source_schema='public' , sink_schema='abc');

┌─────────┐ │ Success │ │ boolean │ ├─────────┤ │ 0 rows  │ └─────────┘

SELECT table_schema,table_name,table_type  FROM information_schema.tables;

┌──────────────┬──────────────────┬────────────┐ │ table_schema │    table_name    │ table_type │ │   varchar    │     varchar      │  varchar   │ ├──────────────┼──────────────────┼────────────┤ │ abc          │ pgbench_tellers  │ VIEW       │ │ abc          │ pgbench_history  │ VIEW       │ │ abc          │ pgbench_branches │ VIEW       │ │ abc          │ pgbench_accounts │ VIEW       │ │ abc          │ example          │ VIEW       │ └──────────────┴──────────────────┴────────────┘

┌──────────────────┐ │       name       │ │     varchar      │ ├──────────────────┤ │ example          │ │ pgbench_accounts │ │ pgbench_branches │ │ pgbench_history  │ │ pgbench_tellers  │ └──────────────────┘

Query the tables directly, and make sure to specify the abc schema.

SELECT * FROM abc.pgbench_history LIMIT 5;

Error: Invalid Error: IO Error: Unable to query Postgres: ERROR:  at or near "(": syntax error DETAIL:  source SQL: COPY (SELECT "tid", "bid", "tbalance", "filler" FROM "public"."pgbench_tellers" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid ) TO STDOUT (FORMAT binary)      ^  ERROR:  at or near "(": syntax error DETAIL:  source SQL: COPY (SELECT "tid", "bid", "tbalance", "filler" FROM "public"."pgbench_tellers" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid ) TO STDOUT (FORMAT binary)

This is where it starts to break. The problem stems from DuckDB needing to return the result with FORMAT binary. In CockroachDB 23.1, COPY command works with text and csv format only. I’ve filed issues 1, 2, and 3 to add support for binary, json and parquet.

demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT csv);                                      1 2 3 4 5 demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT text);                1 2 3 4 5 demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT binary);                                   ERROR: unimplemented: binary format for COPY TO not implemented SQLSTATE: 0A000 HINT: You have attempted to use a feature that is not yet implemented. See: https://go.crdb.dev/issue-v/97180/v23.1  demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT json);                                     invalid syntax: statement ignored: at or near "json": syntax error: unimplemented: this syntax SQLSTATE: 0A000 DETAIL: source SQL: COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT json)                                                     ^ HINT: You have attempted to use a feature that is not yet implemented. See: https://go.crdb.dev/issue-v/96590/v23.1  demo@127.0.0.1:26257/defaultdb> COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT parquet);                                  invalid syntax: statement ignored: at or near "parquet": syntax error: unimplemented: this syntax SQLSTATE: 0A000 DETAIL: source SQL: COPY (SELECT * FROM test LIMIT 5) TO STDOUT (FORMAT parquet)                                                     ^ HINT: You have attempted to use a feature that is not yet implemented. See: https://go.crdb.dev/issue-v/96590/v23.1

Unfortunately, the postgres_scanner does not work with text or csv, or at least I haven’t found a way.

D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT csv); Error: Invalid Error: IO Error: Unable to query Postgres: SSL SYSCALL error: EOF detected  SSL SYSCALL error: EOF detected D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT text); Error: Catalog Error: Copy Function with name text does not exist! Did you mean "parquet"? D COPY (SELECT * FROM abc.test) TO STDOUT (FORMAT parquet); Error: Invalid Error: IO Error: Unable to query Postgres: SSL SYSCALL error: EOF detected  SSL SYSCALL error: EOF detected

Conclusion

Your mileage will vary. This was a fun experiment and I will be paying close attention as this project matures.

You may also like

Leave a Comment