@@ -27,7 +27,16 @@ import (
2727)
2828
2929const (
30- maxcomputeService = "maxcompute"
30+ maxcomputeService = "maxcompute"
31+ attributesDataSQL = "sql"
32+ attributesDataMaskingPolicy = "masking_policy"
33+ attributesDataProjectName = "project_name"
34+ attributesDataSchema = "schema"
35+ attributesDataType = "type"
36+ attributesDataResourceURL = "resource_url"
37+ attributesDataPartitionFields = "partition_fields"
38+ attributesDataLabel = "label"
39+ attributesDataResourceType = "resource_type"
3140)
3241
3342type Extractor struct {
@@ -80,7 +89,7 @@ type Client interface {
8089 ListTable (ctx context.Context , schemaName string ) ([]* odps.Table , error )
8190 GetTableSchema (ctx context.Context , table * odps.Table ) (string , * tableschema.TableSchema , error )
8291 GetTablePreview (ctx context.Context , partitionValue string , table * odps.Table , maxRows int ) ([]string , * structpb.ListValue , error )
83- GetMaskingPolicies (table * odps.Table ) ([] string , error )
92+ GetMaskingPolicies (table * odps.Table ) (map [client. Column ][]client. Policy , error )
8493}
8594
8695func New (logger log.Logger , clientFunc NewClientFunc , randFn randFn ) * Extractor {
@@ -217,7 +226,7 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema,
217226
218227 if tableType == config .TableTypeView {
219228 query := tableSchema .ViewText
220- tableAttributesData ["sql" ] = query
229+ tableAttributesData [attributesDataSQL ] = query
221230 if e .config .BuildViewLineage {
222231 upstreamResources := getUpstreamResources (query )
223232 asset .Lineage = & v1beta2.Lineage {
@@ -226,6 +235,11 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema,
226235 }
227236 }
228237
238+ maskingPolicy , err := e .client .GetMaskingPolicies (table )
239+ if err != nil {
240+ e .logger .Warn ("error getting masking policy" , "error" , err )
241+ }
242+
229243 var columns []* v1beta2.Column
230244 for i , col := range tableSchema .Columns {
231245 columnData := & v1beta2.Column {
@@ -236,6 +250,19 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema,
236250 Attributes : utils .TryParseMapToProto (buildColumnAttributesData (& tableSchema .Columns [i ])),
237251 Columns : buildColumns (col .Type ),
238252 }
253+
254+ if policies , found := maskingPolicy [col .Name ]; found {
255+ policyValues := make ([]* structpb.Value , 0 , len (policies ))
256+ for _ , policy := range policies {
257+ policyValues = append (policyValues , structpb .NewStringValue (policy ))
258+ }
259+ columnData .Attributes .Fields [attributesDataMaskingPolicy ] = & structpb.Value {
260+ Kind : & structpb.Value_ListValue {
261+ ListValue : & structpb.ListValue {Values : policyValues },
262+ },
263+ }
264+ }
265+
239266 columns = append (columns , columnData )
240267 }
241268
@@ -302,15 +329,15 @@ func buildColumns(dataType datatype.DataType) []*v1beta2.Column {
302329func (e * Extractor ) buildTableAttributesData (schemaName , tableType string , table * odps.Table , tableInfo * tableschema.TableSchema ) map [string ]interface {} {
303330 attributesData := map [string ]interface {}{}
304331
305- attributesData ["project_name" ] = e .config .ProjectName
306- attributesData ["schema" ] = schemaName
307- attributesData ["type" ] = tableType
332+ attributesData [attributesDataProjectName ] = e .config .ProjectName
333+ attributesData [attributesDataSchema ] = schemaName
334+ attributesData [attributesDataType ] = tableType
308335
309336 rb := common.ResourceBuilder {ProjectName : e .config .ProjectName }
310- attributesData ["resource_url" ] = rb .Table (schemaName , tableInfo .TableName )
337+ attributesData [attributesDataResourceURL ] = rb .Table (schemaName , tableInfo .TableName )
311338
312339 if tableInfo .ViewText != "" {
313- attributesData ["sql" ] = tableInfo .ViewText
340+ attributesData [attributesDataSQL ] = tableInfo .ViewText
314341 }
315342
316343 var partitionNames []interface {}
@@ -319,18 +346,8 @@ func (e *Extractor) buildTableAttributesData(schemaName, tableType string, table
319346 for i , column := range tableInfo .PartitionColumns {
320347 partitionNames [i ] = column .Name
321348 }
322- attributesData ["partition_fields" ] = partitionNames
323- }
324-
325- maskingPolicy , err := e .client .GetMaskingPolicies (table )
326- if err != nil {
327- e .logger .Warn ("error getting masking policy" , "error" , err )
328- }
329- maskingPolicyInterface := make ([]interface {}, len (maskingPolicy ))
330- for i , policy := range maskingPolicy {
331- maskingPolicyInterface [i ] = policy
349+ attributesData [attributesDataPartitionFields ] = partitionNames
332350 }
333- attributesData ["masking_policy" ] = maskingPolicyInterface
334351
335352 return attributesData
336353}
@@ -343,7 +360,7 @@ func buildColumnAttributesData(column *tableschema.Column) map[string]interface{
343360 }
344361
345362 if column .Label != "" {
346- attributesData ["label" ] = column .Label
363+ attributesData [attributesDataLabel ] = column .Label
347364 }
348365
349366 return attributesData
0 commit comments