Skip to content

target stream created is append-only and incremental sync result is not right when source stream is mutable stream and sling from proton to proton #28

@jhao0117

Description

@jhao0117
  1. Source stream is a mutable stream:
timeplusd :) show create coinbase_ohlc_1m_vkv

SHOW CREATE STREAM coinbase_ohlc_1m_vkv

Query id: 4e15a8d2-e04b-43b2-b90e-754046a5176c

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE MUTABLE STREAM default.coinbase_ohlc_1m_vkv
(
  `time` datetime64(3),
  `symbol` string,
  `open` float32,
  `close` float32,
  `high` float32,
  `low` float32,
  `_tp_time` datetime64(3, 'UTC') DEFAULT now64(3, 'UTC') CODEC(DoubleDelta, LZ4)
)
ENGINE = MutableStream(1, 1)
PRIMARY KEY (time, symbol) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

1 row in set. Elapsed: 0.001 sec.

timeplusd :)
  1. run sling to replicate data:
(base) jameshao@192 sling % ./sling run --src-conn AWS_PROTON_SERVER_2 --src-stream 'coinbase_ohlc_1m_vkv' --tgt-
conn MAC_LOCAL --tgt-object 'coinbase_ohlc_1m_vkv' --mode incremental --update-key '_tp_time'
11:05AM INF connecting to source database (proton)
11:05AM INF connecting to target database (proton)
11:05AM INF getting checkpoint value
11:05AM INF Creating intermediate configuration
11:05AM INF Exporting data from source Proton database
11:05AM INF connecting to source database (proton)
11:05AM INF reading from source database
11:05AM INF writing to target file system (file)
11:05AM INF wrote 51554 rows [18,266 r/s] to /var/folders/10/lvb7c5rn6js2w5jcb1pfk9hh0000gn/T/proton_transfer_915658077.csv
11:05AM INF Checking exported data
11:05AM INF Preparing to import data to target Proton database
11:05AM INF Importing data to target Proton database
11:05AM INF connecting to target database (proton)
11:05AM INF reading from source file system (file)
11:05AM INF writing to target database [mode: incremental]
11:05AM INF streaming data
11:05AM INF created table `default`.`coinbase_ohlc_1m_vkv`
11:05AM INF inserted 51554 rows into default.`coinbase_ohlc_1m_vkv` in 9 secs [5,318 r/s]
11:05AM INF Transferred 51554 rows between Proton databases in 12 secs [5,317 r/s]
11:05AM INF execution succeeded
(base) jameshao@192 sling %
  1. target stream is not a mutable stream but append-only:
timeplusd :) show create coinbase_ohlc_1m_vkv

SHOW CREATE STREAM coinbase_ohlc_1m_vkv

Query id: 2091a5a9-e0c5-4fd8-a3c6-a998d826da5e

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE STREAM default.coinbase_ohlc_1m_vkv
(
  `time` nullable(datetime64(6)),
  `symbol` nullable(string),
  `open` nullable(decimal(25, 6)),
  `close` nullable(decimal(25, 6)),
  `high` nullable(decimal(25, 6)),
  `low` nullable(decimal(25, 6)),
  `_tp_time` datetime64(3, 'UTC') DEFAULT now64(3, 'UTC') CODEC(DoubleDelta, LZ4),
  INDEX _tp_time_index _tp_time TYPE minmax GRANULARITY 256
)
ENGINE = Stream(1, 1, rand())
PARTITION BY to_YYYYMMDD(_tp_time)
ORDER BY to_start_of_hour(_tp_time)
SETTINGS index_granularity = 8192 │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

1 row in set. Elapsed: 0.001 sec.

timeplusd :)
  1. add some data into the mutable stream and sling incremental again,
(base) jameshao@192 sling % ./sling run --src-conn AWS_PROTON_SERVER_2 --src-stream 'coinbase_ohlc_1m_vkv' --tgt-conn MAC_LOCAL --tgt-object 'coinbase_ohlc_1m_vkv' --mode incremental --update-key '_tp_time'
11:26AM INF connecting to source database (proton)
11:26AM INF connecting to target database (proton)
11:26AM INF getting checkpoint value
11:26AM INF Creating intermediate configuration
11:26AM INF Exporting data from source Proton database
11:26AM INF connecting to source database (proton)
11:26AM INF reading from source database
11:26AM INF writing to target file system (file)
11:26AM INF wrote 44 rows [33 r/s] to /var/folders/10/lvb7c5rn6js2w5jcb1pfk9hh0000gn/T/proton_transfer_1789645138.csv
11:26AM INF Checking exported data
11:26AM INF Preparing to import data to target Proton database
11:26AM INF Importing data to target Proton database
11:26AM INF connecting to target database (proton)
11:26AM INF reading from source file system (file)
11:26AM INF writing to target database [mode: incremental]
11:26AM INF streaming data
11:27AM INF inserted 44 rows into default.`coinbase_ohlc_1m_vkv` in 9 secs [5 r/s]
11:27AM INF Transferred 44 rows between Proton databases in 10 secs [5 r/s]
11:27AM INF execution succeeded
(base) jameshao@192 sling %
  1. the result is not correct:
ubuntu@proton2:~/timeplus/bin$ ./timeplusd client -h 127.0.0.1
timeplusd client version 2.4.7.
Connecting to 127.0.0.1:8463 as user default.
Connected to timeplusd server version 2.4.7 revision 54459.

timeplusd :) select count() from table(coinbase_ohlc_1m_vkv)

SELECT
  count()
FROM
  table(coinbase_ohlc_1m_vkv)

Query id: 045ca5ab-9452-4dad-a6f7-635f2b95270e

┌─count()─┐
│   51594 │
└─────────┘

1 row in set. Elapsed: 0.017 sec.

timeplusd :)

timeplusd :) select count() from table(coinbase_ohlc_1m_vkv)

SELECT
  count()
FROM
  table(coinbase_ohlc_1m_vkv)

Query id: 22eecd1d-d211-4b11-9e95-4e6e321b9c8c

┌─count()─┐
│   51598 │
└─────────┘

1 row in set. Elapsed: 0.002 sec.

timeplusd :)
  1. check the earliest _tp_time of the source stream and target stream, they are same, so no retention policy impact.
timeplusd :) select min(_tp_time) from table(coinbase_ohlc_1m_vkv)

SELECT
  min(_tp_time)
FROM
  table(coinbase_ohlc_1m_vkv)

Query id: a4429796-c140-4782-9c3a-6f82f35178a8

┌───────────min(_tp_time)─┐
│ 2024-10-08 09:21:26.779 │
└─────────────────────────┘

1 row in set. Elapsed: 0.021 sec. Processed 51.59 thousand rows, 412.75 KB (2.46 million rows/s., 19.69 MB/s.)

timeplusd :)

timeplusd :) select min(_tp_time) from table(coinbase_ohlc_1m_vkv)

SELECT
  min(_tp_time)
FROM
  table(coinbase_ohlc_1m_vkv)

Query id: 7e2fd33e-51ab-4cef-a7d8-1e4928e3411e

┌───────────min(_tp_time)─┐
│ 2024-10-08 09:21:26.779 │
└─────────────────────────┘

1 row in set. Elapsed: 0.006 sec. Processed 51.60 thousand rows, 412.78 KB (8.36 million rows/s., 66.86 MB/s.)

timeplusd :)

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions