Skip to content

Commit 8c6e5f8

Browse files
*: use add field to avoid field mismatch (#887)
Signed-off-by: huanghaoyuanhhy <[email protected]>
1 parent a01849e commit 8c6e5f8

File tree

4 files changed

+212
-9
lines changed

4 files changed

+212
-9
lines changed

core/restore/collection_ddl_task.go

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package restore
33
import (
44
"context"
55
"fmt"
6+
"sort"
67
"strings"
78

89
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@@ -89,36 +90,96 @@ func (ddlt *collectionDDLTask) dropExistedColl(ctx context.Context) error {
8990
return nil
9091
}
9192

93+
type sortableFields []*schemapb.FieldSchema
94+
95+
func (s sortableFields) Len() int { return len(s) }
96+
func (s sortableFields) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
97+
func (s sortableFields) Less(i, j int) bool { return s[i].GetFieldID() < s[j].GetFieldID() }
98+
99+
// fields returns two types of fields:
100+
// 1. fields that can be created in CreateCollection API
101+
// 2. fields that need to be created by addField API
102+
//
103+
// If the original collection had addField operations, the $meta field might not be at the end of the fields list.
104+
// However, the CreateCollection API always appends the $meta field to the end.
105+
// This discrepancy in field order between the new collection and the backup file can cause restore errors.
106+
// Therefore, we stop processing fields if the field IDs are not continuous.
107+
// The remaining fields will be imported via addField.
108+
func (ddlt *collectionDDLTask) fields() ([]*schemapb.FieldSchema, []*schemapb.FieldSchema, error) {
109+
fields, err := ddlt.convFields(ddlt.collBackup.GetSchema().GetFields())
110+
if err != nil {
111+
return nil, nil, fmt.Errorf("collection: get fields: %w", err)
112+
}
113+
114+
if !ddlt.collBackup.GetSchema().GetEnableDynamicField() {
115+
return fields, nil, nil
116+
}
117+
118+
sort.Sort(sortableFields(fields))
119+
120+
var createFields, addFields []*schemapb.FieldSchema
121+
if len(fields) == 0 {
122+
return createFields, addFields, nil
123+
}
124+
125+
createFields = append(createFields, fields[0])
126+
isContinuous := true
127+
for i := 1; i < len(fields); i++ {
128+
prevField := fields[i-1]
129+
currField := fields[i]
130+
131+
if isContinuous && currField.GetFieldID() != prevField.GetFieldID()+1 {
132+
isContinuous = false
133+
}
134+
135+
if isContinuous {
136+
createFields = append(createFields, currField)
137+
} else {
138+
addFields = append(addFields, currField)
139+
}
140+
}
141+
142+
ddlt.logger.Info("fields", zap.Any("create_fields", createFields), zap.Any("add_fields", addFields))
143+
return createFields, addFields, nil
144+
}
145+
146+
func (ddlt *collectionDDLTask) addFields(ctx context.Context, fields []*schemapb.FieldSchema) error {
147+
// add fields
148+
for _, field := range fields {
149+
if err := ddlt.grpcCli.AddField(ctx, ddlt.targetNS.DBName(), ddlt.targetNS.CollName(), field); err != nil {
150+
return fmt.Errorf("restore: add field %s id %d: %w", field.GetName(), field.GetFieldID(), err)
151+
}
152+
}
153+
154+
return nil
155+
}
156+
92157
func (ddlt *collectionDDLTask) createColl(ctx context.Context) error {
93158
if ddlt.option.SkipCreateCollection {
94159
ddlt.logger.Info("skip create collection")
95160
return nil
96161
}
97162

98-
fields, err := ddlt.convFields(ddlt.collBackup.GetSchema().GetFields())
163+
createFields, addFields, err := ddlt.fields()
99164
if err != nil {
100165
return fmt.Errorf("collection: get fields: %w", err)
101166
}
102-
ddlt.logger.Info("restore collection fields", zap.Any("fields", fields))
103-
104167
functions := conv.Functions(ddlt.collBackup.GetSchema().GetFunctions())
105-
ddlt.logger.Info("restore collection functions", zap.Any("functions", functions))
106-
107168
structArrayFields, err := ddlt.structArrayFields()
108169
if err != nil {
109170
return fmt.Errorf("restore: conv struct array fields: %w", err)
110171
}
111-
112172
schema := &schemapb.CollectionSchema{
113173
Name: ddlt.targetNS.CollName(),
114174
Description: ddlt.collBackup.GetSchema().GetDescription(),
115175
AutoID: ddlt.collBackup.GetSchema().GetAutoID(),
116176
Functions: functions,
117-
Fields: fields,
177+
Fields: createFields,
118178
EnableDynamicField: ddlt.collBackup.GetSchema().GetEnableDynamicField(),
119179
Properties: pbconv.BakKVToMilvusKV(ddlt.collBackup.GetSchema().GetProperties()),
120180
StructArrayFields: structArrayFields,
121181
}
182+
ddlt.logger.Info("create collection", zap.Any("schema", schema))
122183

123184
opt := milvus.CreateCollectionInput{
124185
DB: ddlt.targetNS.DBName(),
@@ -132,6 +193,10 @@ func (ddlt *collectionDDLTask) createColl(ctx context.Context) error {
132193
return fmt.Errorf("restore: call create collection api after retry: %w", err)
133194
}
134195

196+
if err := ddlt.addFields(ctx, addFields); err != nil {
197+
return fmt.Errorf("restore: add fields: %w", err)
198+
}
199+
135200
return nil
136201
}
137202

@@ -141,7 +206,7 @@ func (ddlt *collectionDDLTask) convFields(bakFields []*backuppb.FieldSchema) ([]
141206
for _, bakField := range bakFields {
142207
defaultValue, err := conv.DefaultValue(bakField)
143208
if err != nil {
144-
return nil, fmt.Errorf("restore: failed to get default value: %w", err)
209+
return nil, fmt.Errorf("restore: get default value: %w", err)
145210
}
146211

147212
fieldRestore := &schemapb.FieldSchema{
@@ -173,7 +238,7 @@ func (ddlt *collectionDDLTask) structArrayFields() ([]*schemapb.StructArrayField
173238
for _, bakField := range bakFields {
174239
fields, err := ddlt.convFields(bakField.GetFields())
175240
if err != nil {
176-
return nil, fmt.Errorf("restore: failed to convert struct array fields: %w", err)
241+
return nil, fmt.Errorf("restore: convert struct array fields: %w", err)
177242
}
178243

179244
structArrayField := &schemapb.StructArrayFieldSchema{

core/restore/collection_ddl_task_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,56 @@ func TestCollectionDDLTask_shardNum(t *testing.T) {
3434
})
3535
}
3636

37+
func TestCollectionDDLTask_fields(t *testing.T) {
38+
t.Run("WithDynamicField", func(t *testing.T) {
39+
ddlt := newTestCollectionDDLTask()
40+
41+
ddlt.collBackup = &backuppb.CollectionBackupInfo{
42+
Schema: &backuppb.CollectionSchema{
43+
Fields: []*backuppb.FieldSchema{
44+
{Name: "field1", FieldID: 1},
45+
{Name: "field2", FieldID: 2},
46+
{Name: "field3", FieldID: 3},
47+
// $meta field's field id is 4
48+
{Name: "field5", FieldID: 5},
49+
{Name: "field6", FieldID: 6},
50+
},
51+
EnableDynamicField: true,
52+
},
53+
}
54+
55+
createFields, addFields, err := ddlt.fields()
56+
assert.NoError(t, err)
57+
assert.Len(t, createFields, 3)
58+
assert.Len(t, addFields, 2)
59+
60+
assert.Equal(t, "field1", createFields[0].GetName())
61+
assert.Equal(t, "field2", createFields[1].GetName())
62+
assert.Equal(t, "field3", createFields[2].GetName())
63+
assert.Equal(t, "field5", addFields[0].GetName())
64+
assert.Equal(t, "field6", addFields[1].GetName())
65+
})
66+
67+
t.Run("WithoutDynamicField", func(t *testing.T) {
68+
ddlt := newTestCollectionDDLTask()
69+
70+
ddlt.collBackup = &backuppb.CollectionBackupInfo{
71+
Schema: &backuppb.CollectionSchema{
72+
Fields: []*backuppb.FieldSchema{{Name: "field1", FieldID: 1}, {Name: "field2", FieldID: 2}},
73+
EnableDynamicField: false,
74+
},
75+
}
76+
77+
createFields, addFields, err := ddlt.fields()
78+
assert.NoError(t, err)
79+
assert.Len(t, createFields, 2)
80+
assert.Empty(t, addFields)
81+
82+
assert.Equal(t, "field1", createFields[0].GetName())
83+
assert.Equal(t, "field2", createFields[1].GetName())
84+
})
85+
}
86+
3787
func TestCollectionTask_createColl(t *testing.T) {
3888
t.Run("Skip", func(t *testing.T) {
3989
ct := newTestCollectionDDLTask()

internal/client/milvus/grpc.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ type Grpc interface {
119119
CreateCollection(ctx context.Context, input CreateCollectionInput) error
120120
CreatePartition(ctx context.Context, db, collName, partitionName string) error
121121
HasPartition(ctx context.Context, db, collName, partitionName string) (bool, error)
122+
AddField(ctx context.Context, db, collName string, field *schemapb.FieldSchema) error
122123
CreateIndex(ctx context.Context, input CreateIndexInput) error
123124
DropIndex(ctx context.Context, db, collName, indexName string) error
124125
BackupRBAC(ctx context.Context) (*milvuspb.BackupRBACMetaResponse, error)
@@ -886,6 +887,23 @@ func (g *GrpcClient) HasPartition(ctx context.Context, db, collName string, part
886887
return resp.GetValue(), nil
887888
}
888889

890+
func (g *GrpcClient) AddField(ctx context.Context, db, collName string, field *schemapb.FieldSchema) error {
891+
ctx = g.newCtxWithDB(ctx, db)
892+
893+
bytes, err := proto.Marshal(field)
894+
if err != nil {
895+
return fmt.Errorf("client: add field marshal proto: %w", err)
896+
}
897+
898+
in := &milvuspb.AddCollectionFieldRequest{CollectionName: collName, Schema: bytes}
899+
resp, err := g.srv.AddCollectionField(ctx, in)
900+
if err := checkResponse(resp, err); err != nil {
901+
return fmt.Errorf("client: add field failed: %w", err)
902+
}
903+
904+
return nil
905+
}
906+
889907
func mapKvPairs(m map[string]string) []*commonpb.KeyValuePair {
890908
pairs := make([]*commonpb.KeyValuePair, 0, len(m))
891909
for k, v := range m {

internal/client/milvus/grpc_mock.go

Lines changed: 70 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)