Skip to content

Commit d9bb0b9

Browse files
committed
feat: support previous schema hash for smooth rolling upgrades
Add previousHash tracking to accept events from not-yet-upgraded nodes during rolling upgrades when multiple nodes have publish=true. Changes: - SchemaCache tracks previousHash when schema changes - Schema validation accepts events matching current OR previous hash - SchemaRegistry publishes both hashes for cluster visibility - Update documentation to reflect auto-resume and previousHash behavior
1 parent 2c8d26e commit d9bb0b9

14 files changed

+390
-74
lines changed

db/schema_cache.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
type SchemaCache struct {
1111
mu sync.RWMutex
1212
schemaHash string
13+
previousHash string // Hash before last schema change (for rolling upgrades)
1314
schemaManager *SchemaManager
1415
tables []string
1516
}
@@ -41,8 +42,17 @@ func (sc *SchemaCache) GetSchemaHash() string {
4142
return sc.schemaHash
4243
}
4344

45+
// GetPreviousHash returns the previous schema hash (O(1))
46+
// Used during rolling upgrades to accept events from nodes not yet upgraded
47+
func (sc *SchemaCache) GetPreviousHash() string {
48+
sc.mu.RLock()
49+
defer sc.mu.RUnlock()
50+
return sc.previousHash
51+
}
52+
4453
// Recompute recalculates the schema hash from the database
4554
// Called during pause state to detect if local DDL has been applied
55+
// When schema changes, the old hash is preserved as previousHash
4656
func (sc *SchemaCache) Recompute(ctx context.Context) (string, error) {
4757
sc.mu.Lock()
4858
defer sc.mu.Unlock()
@@ -51,6 +61,11 @@ func (sc *SchemaCache) Recompute(ctx context.Context) (string, error) {
5161
if err != nil {
5262
return "", fmt.Errorf("recomputing schema hash: %w", err)
5363
}
64+
65+
// Preserve old hash as previous when schema changes
66+
if hash != sc.schemaHash && sc.schemaHash != "" {
67+
sc.previousHash = sc.schemaHash
68+
}
5469
sc.schemaHash = hash
5570
return hash, nil
5671
}

db/schema_cache_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package db
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"testing"
7+
8+
_ "github.com/mattn/go-sqlite3"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
)
12+
13+
func TestSchemaCache_PreviousHash(t *testing.T) {
14+
// Create a test database
15+
db, err := sql.Open("sqlite3", ":memory:")
16+
require.NoError(t, err)
17+
defer db.Close()
18+
19+
// Create initial schema
20+
_, err = db.Exec(`CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`)
21+
require.NoError(t, err)
22+
23+
// Create schema manager
24+
sm, err := NewSchemaManager(db)
25+
require.NoError(t, err)
26+
27+
// Initialize schema cache
28+
sc := NewSchemaCache()
29+
ctx := context.Background()
30+
err = sc.Initialize(ctx, sm, []string{"users"})
31+
require.NoError(t, err)
32+
33+
// Get initial hash
34+
initialHash := sc.GetSchemaHash()
35+
assert.NotEmpty(t, initialHash)
36+
37+
// Previous hash should be empty initially
38+
assert.Empty(t, sc.GetPreviousHash())
39+
40+
// Modify the schema
41+
_, err = db.Exec(`ALTER TABLE users ADD COLUMN email TEXT`)
42+
require.NoError(t, err)
43+
44+
// Recompute the hash
45+
newHash, err := sc.Recompute(ctx)
46+
require.NoError(t, err)
47+
48+
// New hash should be different
49+
assert.NotEqual(t, initialHash, newHash)
50+
51+
// Previous hash should now be the initial hash
52+
assert.Equal(t, initialHash, sc.GetPreviousHash())
53+
54+
// Current hash should be the new hash
55+
assert.Equal(t, newHash, sc.GetSchemaHash())
56+
}
57+
58+
func TestSchemaCache_PreviousHashUnchangedWhenSameSchema(t *testing.T) {
59+
// Create a test database
60+
db, err := sql.Open("sqlite3", ":memory:")
61+
require.NoError(t, err)
62+
defer db.Close()
63+
64+
// Create initial schema
65+
_, err = db.Exec(`CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`)
66+
require.NoError(t, err)
67+
68+
// Create schema manager
69+
sm, err := NewSchemaManager(db)
70+
require.NoError(t, err)
71+
72+
// Initialize schema cache
73+
sc := NewSchemaCache()
74+
ctx := context.Background()
75+
err = sc.Initialize(ctx, sm, []string{"users"})
76+
require.NoError(t, err)
77+
78+
initialHash := sc.GetSchemaHash()
79+
80+
// Recompute without schema change
81+
newHash, err := sc.Recompute(ctx)
82+
require.NoError(t, err)
83+
84+
// Hash should be the same
85+
assert.Equal(t, initialHash, newHash)
86+
87+
// Previous hash should still be empty (no change occurred)
88+
assert.Empty(t, sc.GetPreviousHash())
89+
}
90+
91+
func TestSchemaCache_MultipleSchemaChanges(t *testing.T) {
92+
// Create a test database
93+
db, err := sql.Open("sqlite3", ":memory:")
94+
require.NoError(t, err)
95+
defer db.Close()
96+
97+
// Create initial schema
98+
_, err = db.Exec(`CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)`)
99+
require.NoError(t, err)
100+
101+
// Create schema manager
102+
sm, err := NewSchemaManager(db)
103+
require.NoError(t, err)
104+
105+
// Initialize schema cache
106+
sc := NewSchemaCache()
107+
ctx := context.Background()
108+
err = sc.Initialize(ctx, sm, []string{"users"})
109+
require.NoError(t, err)
110+
111+
hash1 := sc.GetSchemaHash()
112+
113+
// First schema change
114+
_, err = db.Exec(`ALTER TABLE users ADD COLUMN email TEXT`)
115+
require.NoError(t, err)
116+
117+
hash2, err := sc.Recompute(ctx)
118+
require.NoError(t, err)
119+
assert.Equal(t, hash1, sc.GetPreviousHash())
120+
121+
// Second schema change
122+
_, err = db.Exec(`ALTER TABLE users ADD COLUMN phone TEXT`)
123+
require.NoError(t, err)
124+
125+
hash3, err := sc.Recompute(ctx)
126+
require.NoError(t, err)
127+
128+
// Previous hash should now be hash2 (only tracks one version back)
129+
assert.Equal(t, hash2, sc.GetPreviousHash())
130+
assert.Equal(t, hash3, sc.GetSchemaHash())
131+
132+
// Verify all three hashes are different
133+
assert.NotEqual(t, hash1, hash2)
134+
assert.NotEqual(t, hash2, hash3)
135+
assert.NotEqual(t, hash1, hash3)
136+
}

db/sqlite.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,14 @@ func (conn *SqliteStreamDB) GetSchemaHash() string {
386386
return conn.schemaCache.GetSchemaHash()
387387
}
388388

389+
// GetPreviousHash returns the previous schema hash (for rolling upgrades)
390+
func (conn *SqliteStreamDB) GetPreviousHash() string {
391+
if conn.schemaCache == nil {
392+
return ""
393+
}
394+
return conn.schemaCache.GetPreviousHash()
395+
}
396+
389397
// GetSchemaCache returns the schema cache for direct access
390398
func (conn *SqliteStreamDB) GetSchemaCache() *SchemaCache {
391399
return conn.schemaCache

docs/docs/architecture.md

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -373,11 +373,14 @@ HarmonyLite maintains a NATS KeyValue registry (`harmonylite-schema-registry`) t
373373
{
374374
"node_id": 1,
375375
"schema_hash": "a1b2c3d4e5f6...",
376+
"previous_hash": "x9y8z7w6v5u4...",
376377
"harmonylite_version": "1.2.0",
377378
"updated_at": "2025-01-20T10:30:00Z"
378379
}
379380
```
380381

382+
The `previous_hash` field enables smooth rolling upgrades: a node accepts events matching either its current hash or its previous hash. This prevents unnecessary pauses when upgrading nodes one at a time in a multi-publisher cluster.
383+
381384
This enables operators to monitor schema rollout progress across the cluster.
382385

383386
### Rolling Upgrade Flow
@@ -389,21 +392,30 @@ sequenceDiagram
389392
participant N2 as Node 2 (Old Schema)
390393
participant N3 as Node 3 (Old Schema)
391394
392-
Note over N1: Schema upgraded, new hash computed
393-
N1->>NATS: Publish change (hash: abc123)
395+
Note over N1: Schema upgraded, new hash H2 computed
396+
Note over N1: Previous hash H1 preserved
397+
N1->>NATS: Publish change (hash: H2)
394398
395-
NATS->>N2: Push message (hash: abc123)
396-
Note over N2: Local hash: xyz789abc123
399+
NATS->>N2: Push message (hash: H2)
400+
Note over N2: Local hash: H1H2, no previous hash
397401
N2->>NATS: NAK with delay (pause)
398402
399-
Note over N2: Operator upgrades schema
400-
Note over N2: Restart HarmonyLite
403+
Note over N2: Operator applies DDL (no restart needed)
404+
Note over N2: HarmonyLite auto-detects within 5 min
405+
Note over N2: New hash: H2, previous: H1
401406
402-
NATS->>N2: Redeliver message (hash: abc123)
403-
Note over N2: Local hash: abc123 = abc123
407+
NATS->>N2: Redeliver message (hash: H2)
408+
Note over N2: Local hash: H2 = H2
404409
N2->>N2: Apply change
405410
N2->>NATS: ACK
406411
412+
Note over N1,N2: Meanwhile, N2 still publishes with H1
413+
N2->>NATS: Publish change (hash: H1)
414+
NATS->>N1: Push message (hash: H1)
415+
Note over N1: H1 = previous_hash, accept!
416+
N1->>N1: Apply change
417+
N1->>NATS: ACK
418+
407419
Note over N3: Same process repeats
408420
```
409421

0 commit comments

Comments
 (0)