@@ -19,26 +19,32 @@ const ReverseTreeLevelOffset = 30000
1919
2020const DefaultTreeDate = "1970-02-12"
2121
22+ const (
23+ queryAuto = config .IndexAuto
24+ queryDirect = config .IndexDirect
25+ queryReversed = config .IndexReversed
26+ )
27+
2228type IndexFinder struct {
2329 url string // clickhouse dsn
2430 table string // graphite_tree table
2531 opts clickhouse.Options // timeout, connectTimeout
2632 dailyEnabled bool
27- reverseDepth int
28- revUse []* config.NValue
33+ confReverse uint8
34+ confReverses config.IndexReverses
35+ reverse uint8 // calculated in IndexFinder.useReverse only once
2936 body []byte // clickhouse response body
30- useReverse bool
3137 useDaily bool
3238}
3339
34- func NewIndex (url string , table string , dailyEnabled bool , reverseDepth int , reverseUse [] * config.NValue , opts clickhouse.Options ) Finder {
40+ func NewIndex (url string , table string , dailyEnabled bool , reverse string , reverses config.IndexReverses , opts clickhouse.Options ) Finder {
3541 return & IndexFinder {
3642 url : url ,
3743 table : table ,
3844 opts : opts ,
3945 dailyEnabled : dailyEnabled ,
40- reverseDepth : reverseDepth ,
41- revUse : reverseUse ,
46+ confReverse : config . IndexReverse [ reverse ] ,
47+ confReverses : reverses ,
4248 }
4349}
4450
@@ -53,83 +59,53 @@ func (idx *IndexFinder) where(query string, levelOffset int) *where.Where {
5359 return w
5460}
5561
56- func useReverse (query string ) bool {
57- p := strings .LastIndexByte (query , '.' )
58-
59- if ! where .HasWildcard (query ) || p < 0 || p >= len (query )- 1 || where .HasWildcard (query [p + 1 :]) {
60- return false
61- }
62- return true
63- }
64-
65- func reverseSuffixDepth (query string , defaultReverseDepth int , revUse []* config.NValue ) int {
66- for i := range revUse {
67- if len (revUse [i ].Prefix ) > 0 && ! strings .HasPrefix (query , revUse [i ].Prefix ) {
62+ func (idx * IndexFinder ) checkReverses (query string ) uint8 {
63+ for _ , rule := range idx .confReverses {
64+ if len (rule .Prefix ) > 0 && ! strings .HasPrefix (query , rule .Prefix ) {
6865 continue
6966 }
70- if len (revUse [ i ] .Suffix ) > 0 && ! strings .HasSuffix (query , revUse [ i ] .Suffix ) {
67+ if len (rule .Suffix ) > 0 && ! strings .HasSuffix (query , rule .Suffix ) {
7168 continue
7269 }
73- if revUse [ i ] .Regex != nil && revUse [ i ] .Regex .FindStringIndex (query ) == nil {
70+ if rule .Regex != nil && rule .Regex .FindStringIndex (query ) == nil {
7471 continue
7572 }
76- return revUse [ i ]. Value
73+ return config . IndexReverse [ rule . Reverse ]
7774 }
78- return defaultReverseDepth
75+ return idx . confReverse
7976}
8077
81- func useReverseDepth ( query string , reverseDepth int , revUse [] * config. NValue ) bool {
82- if reverseDepth == - 1 {
78+ func ( idx * IndexFinder ) useReverse ( query string ) bool {
79+ if idx . reverse == queryDirect {
8380 return false
81+ } else if idx .reverse == queryReversed {
82+ return true
8483 }
8584
86- w := where .IndexWildcardOrDot (query )
87- if w == - 1 {
88- return false
89- } else if query [w ] == '.' {
90- reverseDepth = reverseSuffixDepth (query , reverseDepth , revUse )
91- if reverseDepth == 0 {
92- return false
93- } else if reverseDepth == 1 {
94- return useReverse (query )
95- }
96- } else {
97- reverseDepth = 1
85+ if idx .reverse = idx .checkReverses (query ); idx .reverse != queryAuto {
86+ return idx .useReverse (query )
9887 }
9988
100- w = where .IndexReverseWildcard (query )
89+ w : = where .IndexWildcard (query )
10190 if w == - 1 {
102- return false
103- }
104- p := len (query )
105- if w == p - 1 {
106- return false
91+ idx .reverse = queryDirect
92+ return idx .useReverse (query )
10793 }
108- depth := 0
109-
110- for {
111- e := strings .LastIndexByte (query [w :p ], '.' )
112- if e < 0 {
113- break
114- } else if e < len (query )- 1 {
115- if where .HasWildcard (query [w + e + 1 : p ]) {
116- break
117- }
118- depth ++
119- if depth >= reverseDepth {
120- return true
121- }
122- if e == 0 {
123- break
124- }
125- }
126- p = w + e - 1
94+ firstWildcardNode := strings .Count (query [:w ], "." )
95+
96+ w = where .IndexLastWildcard (query )
97+ lastWildcardNode := strings .Count (query [w :], "." )
98+
99+ if firstWildcardNode < lastWildcardNode {
100+ idx .reverse = queryReversed
101+ return idx .useReverse (query )
127102 }
128- return false
103+ idx .reverse = queryDirect
104+ return idx .useReverse (query )
129105}
130106
131107func (idx * IndexFinder ) Execute (ctx context.Context , query string , from int64 , until int64 ) (err error ) {
132- idx .useReverse = useReverseDepth (query , idx . reverseDepth , idx . revUse )
108+ idx .useReverse (query )
133109
134110 if idx .dailyEnabled && from > 0 && until > 0 {
135111 idx .useDaily = true
@@ -139,18 +115,18 @@ func (idx *IndexFinder) Execute(ctx context.Context, query string, from int64, u
139115
140116 var levelOffset int
141117 if idx .useDaily {
142- if idx .useReverse {
118+ if idx .useReverse ( query ) {
143119 levelOffset = ReverseLevelOffset
144120 }
145121 } else {
146- if idx .useReverse {
122+ if idx .useReverse ( query ) {
147123 levelOffset = ReverseTreeLevelOffset
148124 } else {
149125 levelOffset = TreeLevelOffset
150126 }
151127 }
152128
153- if idx .useReverse {
129+ if idx .useReverse ( query ) {
154130 query = ReverseString (query )
155131 }
156132
@@ -205,7 +181,7 @@ func (idx *IndexFinder) makeList(onlySeries bool) [][]byte {
205181
206182 rows = rows [:len (rows )- skip ]
207183
208- if idx .useReverse {
184+ if idx .useReverse ( "" ) {
209185 for i := 0 ; i < len (rows ); i ++ {
210186 rows [i ] = ReverseBytes (rows [i ])
211187 }
0 commit comments