@@ -9,6 +9,13 @@ import (
99 "sync/atomic"
1010)
1111
12+ const (
13+ // Min64BitSigned - Minimum 64 bit value
14+ Min64BitSigned = - 9223372036854775808
15+ // Max64BitSigned - Maximum 64 bit value
16+ Max64BitSigned = 9223372036854775807
17+ )
18+
1219// BSI is at its simplest is an array of bitmaps that represent an encoded
1320// binary value. The advantage of a BSI is that comparisons can be made
1421// across ranges of values whereas a bitmap can only represent the existence
@@ -236,6 +243,10 @@ const (
236243 GT
237244 // RANGE range
238245 RANGE
246+ // MIN find minimum
247+ MIN
248+ // MAX find maximum
249+ MAX
239250)
240251
241252type task struct {
@@ -393,6 +404,123 @@ func compareValue(e *task, batch []uint32, resultsChan chan *roaring.Bitmap, wg
393404 resultsChan <- results
394405}
395406
407+ // MinMax - Find minimum or maximum value.
408+ func (b * BSI ) MinMax (parallelism int , op Operation , foundSet * roaring.Bitmap ) int64 {
409+
410+ var n int = parallelism
411+ if n == 0 {
412+ n = runtime .NumCPU ()
413+ }
414+
415+ resultsChan := make (chan int64 , n )
416+
417+ card := foundSet .GetCardinality ()
418+ x := card / uint64 (n )
419+
420+ remainder := card - (x * uint64 (n ))
421+ var batch []uint32
422+ var wg sync.WaitGroup
423+ iter := foundSet .ManyIterator ()
424+ for i := 0 ; i < n ; i ++ {
425+ if i == n - 1 {
426+ batch = make ([]uint32 , x + remainder )
427+ } else {
428+ batch = make ([]uint32 , x )
429+ }
430+ iter .NextMany (batch )
431+ wg .Add (1 )
432+ go b .minOrMax (op , batch , resultsChan , & wg )
433+ }
434+
435+ wg .Wait ()
436+
437+ close (resultsChan )
438+ var minMax int64
439+ if op == MAX {
440+ minMax = Min64BitSigned
441+ } else {
442+ minMax = Max64BitSigned
443+ }
444+
445+ for val := range resultsChan {
446+ if (op == MAX && val > minMax ) || (op == MIN && val < minMax ) {
447+ minMax = val
448+ }
449+ }
450+ return minMax
451+ }
452+
453+ func (b * BSI ) minOrMax (op Operation , batch []uint32 , resultsChan chan int64 , wg * sync.WaitGroup ) {
454+
455+ defer wg .Done ()
456+
457+ x := b .BitCount ()
458+ var value int64 = Max64BitSigned
459+ if op == MAX {
460+ value = Min64BitSigned
461+ }
462+
463+ for i := 0 ; i < len (batch ); i ++ {
464+ cID := batch [i ]
465+ eq := true
466+ lt , gt := false , false
467+ j := b .BitCount () - 1
468+ var cVal int64
469+ valueIsNegative := uint64 (value )& (1 << uint64 (x - 1 )) > 0 && bits .Len64 (uint64 (value )) == 64
470+ isNegative := false
471+ if x == 64 {
472+ isNegative = b .bA [j ].Contains (cID )
473+ if isNegative {
474+ cVal |= 1 << uint64 (j )
475+ }
476+ j --
477+ }
478+ compValue := value
479+ if isNegative != valueIsNegative {
480+ compValue = ^ value + 1
481+ }
482+ for ; j >= 0 ; j -- {
483+ sliceContainsBit := b .bA [j ].Contains (cID )
484+ if sliceContainsBit {
485+ cVal |= 1 << uint64 (j )
486+ }
487+ if uint64 (compValue )& (1 << uint64 (j )) > 0 {
488+ // BIT in value is SET
489+ if ! sliceContainsBit {
490+ if eq {
491+ eq = false
492+ if op == MAX && valueIsNegative && ! isNegative {
493+ gt = true
494+ break
495+ }
496+ if op == MIN && (! valueIsNegative || (valueIsNegative == isNegative )) {
497+ lt = true
498+ }
499+ }
500+ }
501+ } else {
502+ // BIT in value is CLEAR
503+ if sliceContainsBit {
504+ if eq {
505+ eq = false
506+ if op == MIN && isNegative && ! valueIsNegative {
507+ lt = true
508+ }
509+ if op == MAX && (valueIsNegative || (valueIsNegative == isNegative )) {
510+ gt = true
511+ }
512+ }
513+ }
514+ }
515+ }
516+ if lt || gt {
517+ value = cVal
518+ }
519+ }
520+
521+ resultsChan <- value
522+ }
523+
396524// Sum all values contained within the foundSet. As a convenience, the cardinality of the foundSet
397525// is also returned (for calculating the average).
398526//
0 commit comments