Update dependencies

This commit is contained in:
Ken-Håvard Lieng 2020-05-24 07:21:47 +02:00
parent e97c7f2ada
commit 71b2136a9e
138 changed files with 7864 additions and 968 deletions

View file

@ -1,10 +1,13 @@
# ![bleve](docs/bleve.png) bleve
[![Build Status](https://travis-ci.org/blevesearch/bleve.svg?branch=master)](https://travis-ci.org/blevesearch/bleve) [![Coverage Status](https://coveralls.io/repos/github/blevesearch/bleve/badge.svg?branch=master)](https://coveralls.io/github/blevesearch/bleve?branch=master) [![GoDoc](https://godoc.org/github.com/blevesearch/bleve?status.svg)](https://godoc.org/github.com/blevesearch/bleve)
[![Tests](https://github.com/blevesearch/bleve/workflows/Tests/badge.svg?branch=master&event=push)](https://github.com/blevesearch/bleve/actions?query=workflow%3ATests+event%3Apush+branch%3Amaster)
[![Coverage Status](https://coveralls.io/repos/github/blevesearch/bleve/badge.svg?branch=master)](https://coveralls.io/github/blevesearch/bleve?branch=master)
[![GoDoc](https://godoc.org/github.com/blevesearch/bleve?status.svg)](https://godoc.org/github.com/blevesearch/bleve)
[![Join the chat at https://gitter.im/blevesearch/bleve](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/blevesearch/bleve?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![codebeat](https://codebeat.co/badges/38a7cbc9-9cf5-41c0-a315-0746178230f4)](https://codebeat.co/projects/github-com-blevesearch-bleve)
[![Go Report Card](https://goreportcard.com/badge/blevesearch/bleve)](https://goreportcard.com/report/blevesearch/bleve)
[![Sourcegraph](https://sourcegraph.com/github.com/blevesearch/bleve/-/badge.svg)](https://sourcegraph.com/github.com/blevesearch/bleve?badge) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Sourcegraph](https://sourcegraph.com/github.com/blevesearch/bleve/-/badge.svg)](https://sourcegraph.com/github.com/blevesearch/bleve?badge)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
modern text indexing in go - [blevesearch.com](http://www.blevesearch.com/)

94
vendor/github.com/blevesearch/bleve/builder.go generated vendored Normal file
View file

@ -0,0 +1,94 @@
// Copyright (c) 2019 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package bleve
import (
"encoding/json"
"fmt"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch"
"github.com/blevesearch/bleve/mapping"
)
type builderImpl struct {
b index.IndexBuilder
m mapping.IndexMapping
}
func (b *builderImpl) Index(id string, data interface{}) error {
if id == "" {
return ErrorEmptyID
}
doc := document.NewDocument(id)
err := b.m.MapDocument(doc, data)
if err != nil {
return err
}
err = b.b.Index(doc)
return err
}
func (b *builderImpl) Close() error {
return b.b.Close()
}
func newBuilder(path string, mapping mapping.IndexMapping, config map[string]interface{}) (Builder, error) {
if path == "" {
return nil, fmt.Errorf("builder requires path")
}
err := mapping.Validate()
if err != nil {
return nil, err
}
if config == nil {
config = map[string]interface{}{}
}
// the builder does not have an API to interact with internal storage
// however we can pass k/v pairs through the config
mappingBytes, err := json.Marshal(mapping)
if err != nil {
return nil, err
}
config["internal"] = map[string][]byte{
string(mappingInternalKey): mappingBytes,
}
// do not use real config, as these are options for the builder,
// not the resulting index
meta := newIndexMeta(scorch.Name, scorch.Name, map[string]interface{}{})
err = meta.Save(path)
if err != nil {
return nil, err
}
config["path"] = indexStorePath(path)
b, err := scorch.NewBuilder(config)
if err != nil {
return nil, err
}
rv := &builderImpl{
b: b,
m: mapping,
}
return rv, nil
}

View file

@ -8,8 +8,9 @@ require (
github.com/blevesearch/go-porterstemmer v1.0.3
github.com/blevesearch/segment v0.9.0
github.com/blevesearch/snowballstem v0.9.0
github.com/blevesearch/zap/v11 v11.0.7
github.com/blevesearch/zap/v12 v12.0.7
github.com/blevesearch/zap/v11 v11.0.8
github.com/blevesearch/zap/v12 v12.0.8
github.com/blevesearch/zap/v13 v13.0.0
github.com/couchbase/ghistogram v0.1.0 // indirect
github.com/couchbase/moss v0.1.0
github.com/couchbase/vellum v1.0.1

View file

@ -293,3 +293,17 @@ func Open(path string) (Index, error) {
func OpenUsing(path string, runtimeConfig map[string]interface{}) (Index, error) {
return openIndexUsing(path, runtimeConfig)
}
// Builder is a limited interface, used to build indexes in an offline mode.
// Items cannot be updated or deleted, and the caller MUST ensure a document is
// indexed only once.
type Builder interface {
Index(id string, data interface{}) error
Close() error
}
// NewBuilder creates a builder, which will build an index at the specified path,
// using the specified mapping and options.
func NewBuilder(path string, mapping mapping.IndexMapping, config map[string]interface{}) (Builder, error) {
return newBuilder(path, mapping, config)
}

View file

@ -367,3 +367,10 @@ type OptimizableContext interface {
type DocValueReader interface {
VisitDocValues(id IndexInternalID, visitor DocumentFieldTermVisitor) error
}
// IndexBuilder is an interface supported by some index schemes
// to allow direct write-only index building
type IndexBuilder interface {
Index(doc *document.Document) error
Close() error
}

View file

@ -0,0 +1,333 @@
// Copyright (c) 2019 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scorch
import (
"fmt"
"io/ioutil"
"os"
"sync"
"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/document"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
bolt "go.etcd.io/bbolt"
)
const DefaultBuilderBatchSize = 1000
const DefaultBuilderMergeMax = 10
type Builder struct {
m sync.Mutex
segCount uint64
path string
buildPath string
segPaths []string
batchSize int
mergeMax int
batch *index.Batch
internal map[string][]byte
segPlugin segment.Plugin
}
func NewBuilder(config map[string]interface{}) (*Builder, error) {
path, ok := config["path"].(string)
if !ok {
return nil, fmt.Errorf("must specify path")
}
buildPathPrefix, _ := config["buildPathPrefix"].(string)
buildPath, err := ioutil.TempDir(buildPathPrefix, "scorch-offline-build")
if err != nil {
return nil, err
}
rv := &Builder{
path: path,
buildPath: buildPath,
mergeMax: DefaultBuilderMergeMax,
batchSize: DefaultBuilderBatchSize,
batch: index.NewBatch(),
segPlugin: defaultSegmentPlugin,
}
err = rv.parseConfig(config)
if err != nil {
return nil, fmt.Errorf("error parsing builder config: %v", err)
}
return rv, nil
}
func (o *Builder) parseConfig(config map[string]interface{}) (err error) {
if v, ok := config["mergeMax"]; ok {
var t int
if t, err = parseToInteger(v); err != nil {
return fmt.Errorf("mergeMax parse err: %v", err)
}
if t > 0 {
o.mergeMax = t
}
}
if v, ok := config["batchSize"]; ok {
var t int
if t, err = parseToInteger(v); err != nil {
return fmt.Errorf("batchSize parse err: %v", err)
}
if t > 0 {
o.batchSize = t
}
}
if v, ok := config["internal"]; ok {
if vinternal, ok := v.(map[string][]byte); ok {
o.internal = vinternal
}
}
forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
if err != nil {
return err
}
if forcedSegmentType != "" && forcedSegmentVersion != 0 {
segPlugin, err := chooseSegmentPlugin(forcedSegmentType,
uint32(forcedSegmentVersion))
if err != nil {
o.segPlugin = segPlugin
}
}
return nil
}
// Index will place the document into the index.
// It is invalid to index the same document multiple times.
func (o *Builder) Index(doc *document.Document) error {
o.m.Lock()
defer o.m.Unlock()
o.batch.Update(doc)
return o.maybeFlushBatchLOCKED(o.batchSize)
}
func (o *Builder) maybeFlushBatchLOCKED(moreThan int) error {
if len(o.batch.IndexOps) >= moreThan {
defer o.batch.Reset()
return o.executeBatchLOCKED(o.batch)
}
return nil
}
func (o *Builder) executeBatchLOCKED(batch *index.Batch) (err error) {
analysisResults := make([]*index.AnalysisResult, 0, len(batch.IndexOps))
for _, doc := range batch.IndexOps {
if doc != nil {
// insert _id field
doc.AddField(document.NewTextFieldCustom("_id", nil, []byte(doc.ID), document.IndexField|document.StoreField, nil))
// perform analysis directly
analysisResult := analyze(doc)
analysisResults = append(analysisResults, analysisResult)
}
}
seg, _, err := o.segPlugin.New(analysisResults)
if err != nil {
return fmt.Errorf("error building segment base: %v", err)
}
filename := zapFileName(o.segCount)
o.segCount++
path := o.buildPath + string(os.PathSeparator) + filename
if segUnpersisted, ok := seg.(segment.UnpersistedSegment); ok {
err = segUnpersisted.Persist(path)
if err != nil {
return fmt.Errorf("error persisting segment base to %s: %v", path, err)
}
o.segPaths = append(o.segPaths, path)
return nil
}
return fmt.Errorf("new segment does not implement unpersisted: %T", seg)
}
func (o *Builder) doMerge() error {
// as long as we have more than 1 segment, keep merging
for len(o.segPaths) > 1 {
// merge the next <mergeMax> number of segments into one new one
// or, if there are fewer than <mergeMax> remaining, merge them all
mergeCount := o.mergeMax
if mergeCount > len(o.segPaths) {
mergeCount = len(o.segPaths)
}
mergePaths := o.segPaths[0:mergeCount]
o.segPaths = o.segPaths[mergeCount:]
// open each of the segments to be merged
mergeSegs := make([]segment.Segment, 0, mergeCount)
// closeOpenedSegs attempts to close all opened
// segments even if an error occurs, in which case
// the first error is returned
closeOpenedSegs := func() error {
var err error
for _, seg := range mergeSegs {
clErr := seg.Close()
if clErr != nil && err == nil {
err = clErr
}
}
return err
}
for _, mergePath := range mergePaths {
seg, err := o.segPlugin.Open(mergePath)
if err != nil {
_ = closeOpenedSegs()
return fmt.Errorf("error opening segment (%s) for merge: %v", mergePath, err)
}
mergeSegs = append(mergeSegs, seg)
}
// do the merge
mergedSegPath := o.buildPath + string(os.PathSeparator) + zapFileName(o.segCount)
drops := make([]*roaring.Bitmap, mergeCount)
_, _, err := o.segPlugin.Merge(mergeSegs, drops, mergedSegPath, nil, nil)
if err != nil {
_ = closeOpenedSegs()
return fmt.Errorf("error merging segments (%v): %v", mergePaths, err)
}
o.segCount++
o.segPaths = append(o.segPaths, mergedSegPath)
// close segments opened for merge
err = closeOpenedSegs()
if err != nil {
return fmt.Errorf("error closing opened segments: %v", err)
}
// remove merged segments
for _, mergePath := range mergePaths {
err = os.RemoveAll(mergePath)
if err != nil {
return fmt.Errorf("error removing segment %s after merge: %v", mergePath, err)
}
}
}
return nil
}
func (o *Builder) Close() error {
o.m.Lock()
defer o.m.Unlock()
// see if there is a partial batch
err := o.maybeFlushBatchLOCKED(1)
if err != nil {
return fmt.Errorf("error flushing batch before close: %v", err)
}
// perform all the merging
err = o.doMerge()
if err != nil {
return fmt.Errorf("error while merging: %v", err)
}
// ensure the store path exists
err = os.MkdirAll(o.path, 0700)
if err != nil {
return err
}
// move final segment into place
// segment id 2 is chosen to match the behavior of a scorch
// index which indexes a single batch of data
finalSegPath := o.path + string(os.PathSeparator) + zapFileName(2)
err = os.Rename(o.segPaths[0], finalSegPath)
if err != nil {
return fmt.Errorf("error moving final segment into place: %v", err)
}
// remove the buildPath, as it is no longer needed
err = os.RemoveAll(o.buildPath)
if err != nil {
return fmt.Errorf("error removing build path: %v", err)
}
// prepare wrapping
seg, err := o.segPlugin.Open(finalSegPath)
if err != nil {
return fmt.Errorf("error opening final segment")
}
// create a segment snapshot for this segment
ss := &SegmentSnapshot{
segment: seg,
}
is := &IndexSnapshot{
epoch: 3, // chosen to match scorch behavior when indexing a single batch
segment: []*SegmentSnapshot{ss},
creator: "scorch-builder",
internal: o.internal,
}
// create the root bolt
rootBoltPath := o.path + string(os.PathSeparator) + "root.bolt"
rootBolt, err := bolt.Open(rootBoltPath, 0600, nil)
if err != nil {
return err
}
// start a write transaction
tx, err := rootBolt.Begin(true)
if err != nil {
return err
}
// fill the root bolt with this fake index snapshot
_, _, err = prepareBoltSnapshot(is, tx, o.path, o.segPlugin)
if err != nil {
_ = tx.Rollback()
_ = rootBolt.Close()
return fmt.Errorf("error preparing bolt snapshot in root.bolt: %v", err)
}
// commit bolt data
err = tx.Commit()
if err != nil {
_ = rootBolt.Close()
return fmt.Errorf("error committing bolt tx in root.bolt: %v", err)
}
// close bolt
err = rootBolt.Close()
if err != nil {
return fmt.Errorf("error closing root.bolt: %v", err)
}
// close final segment
err = seg.Close()
if err != nil {
return fmt.Errorf("error closing final segment: %v", err)
}
return nil
}

View file

@ -45,13 +45,7 @@ type epochWatcher struct {
notifyCh notificationChan
}
type snapshotReversion struct {
snapshot *IndexSnapshot
applied chan error
persisted chan error
}
func (s *Scorch) mainLoop() {
func (s *Scorch) introducerLoop() {
var epochWatchers []*epochWatcher
OUTER:
for {

View file

@ -15,6 +15,7 @@
package scorch
import (
"context"
"encoding/json"
"fmt"
"os"
@ -29,12 +30,16 @@ import (
func (s *Scorch) mergerLoop() {
var lastEpochMergePlanned uint64
var ctrlMsg *mergerCtrl
mergePlannerOptions, err := s.parseMergePlannerOptions()
if err != nil {
s.fireAsyncError(fmt.Errorf("mergePlannerOption json parsing err: %v", err))
s.asyncTasks.Done()
return
}
ctrlMsgDflt := &mergerCtrl{ctx: context.Background(),
options: mergePlannerOptions,
doneCh: nil}
OUTER:
for {
@ -53,16 +58,30 @@ OUTER:
atomic.StoreUint64(&s.iStats.mergeEpoch, ourSnapshot.epoch)
s.rootLock.Unlock()
if ourSnapshot.epoch != lastEpochMergePlanned {
if ctrlMsg == nil && ourSnapshot.epoch != lastEpochMergePlanned {
ctrlMsg = ctrlMsgDflt
}
if ctrlMsg != nil {
startTime := time.Now()
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot, mergePlannerOptions)
err := s.planMergeAtSnapshot(ctrlMsg.ctx, ctrlMsg.options,
ourSnapshot)
if err != nil {
atomic.StoreUint64(&s.iStats.mergeEpoch, 0)
if err == segment.ErrClosed {
// index has been closed
_ = ourSnapshot.DecRef()
// continue the workloop on a user triggered cancel
if ctrlMsg.doneCh != nil {
close(ctrlMsg.doneCh)
ctrlMsg = nil
continue OUTER
}
// exit the workloop on index closure
ctrlMsg = nil
break OUTER
}
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
@ -70,6 +89,12 @@ OUTER:
atomic.AddUint64(&s.stats.TotFileMergeLoopErr, 1)
continue OUTER
}
if ctrlMsg.doneCh != nil {
close(ctrlMsg.doneCh)
}
ctrlMsg = nil
lastEpochMergePlanned = ourSnapshot.epoch
atomic.StoreUint64(&s.stats.LastMergedEpoch, ourSnapshot.epoch)
@ -90,6 +115,8 @@ OUTER:
case <-s.closeCh:
break OUTER
case s.persisterNotifier <- ew:
case ctrlMsg = <-s.forceMergeRequestCh:
continue OUTER
}
// now wait for persister (but also detect close)
@ -97,6 +124,7 @@ OUTER:
case <-s.closeCh:
break OUTER
case <-ew.notifyCh:
case ctrlMsg = <-s.forceMergeRequestCh:
}
}
@ -106,6 +134,58 @@ OUTER:
s.asyncTasks.Done()
}
type mergerCtrl struct {
ctx context.Context
options *mergeplan.MergePlanOptions
doneCh chan struct{}
}
// ForceMerge helps users trigger a merge operation on
// an online scorch index.
func (s *Scorch) ForceMerge(ctx context.Context,
mo *mergeplan.MergePlanOptions) error {
// check whether force merge is already under processing
s.rootLock.Lock()
if s.stats.TotFileMergeForceOpsStarted >
s.stats.TotFileMergeForceOpsCompleted {
s.rootLock.Unlock()
return fmt.Errorf("force merge already in progress")
}
s.stats.TotFileMergeForceOpsStarted++
s.rootLock.Unlock()
if mo != nil {
err := mergeplan.ValidateMergePlannerOptions(mo)
if err != nil {
return err
}
} else {
// assume the default single segment merge policy
mo = &mergeplan.SingleSegmentMergePlanOptions
}
msg := &mergerCtrl{options: mo,
doneCh: make(chan struct{}),
ctx: ctx,
}
// request the merger perform a force merge
select {
case s.forceMergeRequestCh <- msg:
case <-s.closeCh:
return nil
}
// wait for the force merge operation completion
select {
case <-msg.doneCh:
atomic.AddUint64(&s.stats.TotFileMergeForceOpsCompleted, 1)
case <-s.closeCh:
}
return nil
}
func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
error) {
mergePlannerOptions := mergeplan.DefaultMergePlanOptions
@ -128,8 +208,39 @@ func (s *Scorch) parseMergePlannerOptions() (*mergeplan.MergePlanOptions,
return &mergePlannerOptions, nil
}
func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
options *mergeplan.MergePlanOptions) error {
type closeChWrapper struct {
ch1 chan struct{}
ctx context.Context
closeCh chan struct{}
}
func newCloseChWrapper(ch1 chan struct{},
ctx context.Context) *closeChWrapper {
return &closeChWrapper{ch1: ch1,
ctx: ctx,
closeCh: make(chan struct{})}
}
func (w *closeChWrapper) close() {
select {
case <-w.closeCh:
default:
close(w.closeCh)
}
}
func (w *closeChWrapper) listen() {
select {
case <-w.ch1:
w.close()
case <-w.ctx.Done():
w.close()
case <-w.closeCh:
}
}
func (s *Scorch) planMergeAtSnapshot(ctx context.Context,
options *mergeplan.MergePlanOptions, ourSnapshot *IndexSnapshot) error {
// build list of persisted segments in this snapshot
var onlyPersistedSnapshots []mergeplan.Segment
for _, segmentSnapshot := range ourSnapshot.segment {
@ -158,6 +269,11 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
// process tasks in serial for now
var filenames []string
cw := newCloseChWrapper(s.closeCh, ctx)
defer cw.close()
go cw.listen()
for _, task := range resultMergePlan.Tasks {
if len(task.Segments) == 0 {
atomic.AddUint64(&s.stats.TotFileMergePlanTasksSegmentsEmpty, 1)
@ -203,7 +319,7 @@ func (s *Scorch) planMergeAtSnapshot(ourSnapshot *IndexSnapshot,
atomic.AddUint64(&s.stats.TotFileMergeZapBeg, 1)
newDocNums, _, err := s.segPlugin.Merge(segmentsToMerge, docsToDrop, path,
s.closeCh, s)
cw.closeCh, s)
atomic.AddUint64(&s.stats.TotFileMergeZapEnd, 1)
fileMergeZapTime := uint64(time.Since(fileMergeZapStartTime))

View file

@ -134,6 +134,17 @@ var DefaultMergePlanOptions = MergePlanOptions{
ReclaimDeletesWeight: 2.0,
}
// SingleSegmentMergePlanOptions helps in creating a
// single segment index.
var SingleSegmentMergePlanOptions = MergePlanOptions{
MaxSegmentsPerTier: 1,
MaxSegmentSize: 1 << 30,
TierGrowth: 1.0,
SegmentsPerMergeTask: 10,
FloorSegmentSize: 1 << 30,
ReclaimDeletesWeight: 2.0,
}
// -------------------------------------------
func plan(segmentsIn []Segment, o *MergePlanOptions) (*MergePlan, error) {

View file

@ -428,6 +428,100 @@ func (s *Scorch) persistSnapshotMaybeMerge(snapshot *IndexSnapshot) (
return true, nil
}
func prepareBoltSnapshot(snapshot *IndexSnapshot, tx *bolt.Tx, path string,
segPlugin segment.Plugin) ([]string, map[uint64]string, error) {
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
if err != nil {
return nil, nil, err
}
newSnapshotKey := segment.EncodeUvarintAscending(nil, snapshot.epoch)
snapshotBucket, err := snapshotsBucket.CreateBucketIfNotExists(newSnapshotKey)
if err != nil {
return nil, nil, err
}
// persist meta values
metaBucket, err := snapshotBucket.CreateBucketIfNotExists(boltMetaDataKey)
if err != nil {
return nil, nil, err
}
err = metaBucket.Put(boltMetaDataSegmentTypeKey, []byte(segPlugin.Type()))
if err != nil {
return nil, nil, err
}
buf := make([]byte, binary.MaxVarintLen32)
binary.BigEndian.PutUint32(buf, segPlugin.Version())
err = metaBucket.Put(boltMetaDataSegmentVersionKey, buf)
if err != nil {
return nil, nil, err
}
// persist internal values
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
if err != nil {
return nil, nil, err
}
// TODO optimize writing these in order?
for k, v := range snapshot.internal {
err = internalBucket.Put([]byte(k), v)
if err != nil {
return nil, nil, err
}
}
var filenames []string
newSegmentPaths := make(map[uint64]string)
// first ensure that each segment in this snapshot has been persisted
for _, segmentSnapshot := range snapshot.segment {
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id)
snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
if err != nil {
return nil, nil, err
}
switch seg := segmentSnapshot.segment.(type) {
case segment.PersistedSegment:
segPath := seg.Path()
filename := strings.TrimPrefix(segPath, path+string(os.PathSeparator))
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
return nil, nil, err
}
filenames = append(filenames, filename)
case segment.UnpersistedSegment:
// need to persist this to disk
filename := zapFileName(segmentSnapshot.id)
path := path + string(os.PathSeparator) + filename
err = seg.Persist(path)
if err != nil {
return nil, nil, fmt.Errorf("error persisting segment: %v", err)
}
newSegmentPaths[segmentSnapshot.id] = path
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
return nil, nil, err
}
filenames = append(filenames, filename)
default:
return nil, nil, fmt.Errorf("unknown segment type: %T", seg)
}
// store current deleted bits
var roaringBuf bytes.Buffer
if segmentSnapshot.deleted != nil {
_, err = segmentSnapshot.deleted.WriteTo(&roaringBuf)
if err != nil {
return nil, nil, fmt.Errorf("error persisting roaring bytes: %v", err)
}
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
if err != nil {
return nil, nil, err
}
}
}
return filenames, newSegmentPaths, nil
}
func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
// start a write transaction
tx, err := s.rootBolt.Begin(true)
@ -441,95 +535,10 @@ func (s *Scorch) persistSnapshotDirect(snapshot *IndexSnapshot) (err error) {
}
}()
snapshotsBucket, err := tx.CreateBucketIfNotExists(boltSnapshotsBucket)
filenames, newSegmentPaths, err := prepareBoltSnapshot(snapshot, tx, s.path, s.segPlugin)
if err != nil {
return err
}
newSnapshotKey := segment.EncodeUvarintAscending(nil, snapshot.epoch)
snapshotBucket, err := snapshotsBucket.CreateBucketIfNotExists(newSnapshotKey)
if err != nil {
return err
}
// persist meta values
metaBucket, err := snapshotBucket.CreateBucketIfNotExists(boltMetaDataKey)
if err != nil {
return err
}
err = metaBucket.Put(boltMetaDataSegmentTypeKey, []byte(s.segPlugin.Type()))
if err != nil {
return err
}
buf := make([]byte, binary.MaxVarintLen32)
binary.BigEndian.PutUint32(buf, s.segPlugin.Version())
err = metaBucket.Put(boltMetaDataSegmentVersionKey, buf)
if err != nil {
return err
}
// persist internal values
internalBucket, err := snapshotBucket.CreateBucketIfNotExists(boltInternalKey)
if err != nil {
return err
}
// TODO optimize writing these in order?
for k, v := range snapshot.internal {
err = internalBucket.Put([]byte(k), v)
if err != nil {
return err
}
}
var filenames []string
newSegmentPaths := make(map[uint64]string)
// first ensure that each segment in this snapshot has been persisted
for _, segmentSnapshot := range snapshot.segment {
snapshotSegmentKey := segment.EncodeUvarintAscending(nil, segmentSnapshot.id)
snapshotSegmentBucket, err := snapshotBucket.CreateBucketIfNotExists(snapshotSegmentKey)
if err != nil {
return err
}
switch seg := segmentSnapshot.segment.(type) {
case segment.PersistedSegment:
path := seg.Path()
filename := strings.TrimPrefix(path, s.path+string(os.PathSeparator))
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
return err
}
filenames = append(filenames, filename)
case segment.UnpersistedSegment:
// need to persist this to disk
filename := zapFileName(segmentSnapshot.id)
path := s.path + string(os.PathSeparator) + filename
err = seg.Persist(path)
if err != nil {
return fmt.Errorf("error persisting segment: %v", err)
}
newSegmentPaths[segmentSnapshot.id] = path
err = snapshotSegmentBucket.Put(boltPathKey, []byte(filename))
if err != nil {
return err
}
filenames = append(filenames, filename)
default:
return fmt.Errorf("unknown segment type: %T", seg)
}
// store current deleted bits
var roaringBuf bytes.Buffer
if segmentSnapshot.deleted != nil {
_, err = segmentSnapshot.deleted.WriteTo(&roaringBuf)
if err != nil {
return fmt.Errorf("error persisting roaring bytes: %v", err)
}
err = snapshotSegmentBucket.Put(boltDeletedKey, roaringBuf.Bytes())
if err != nil {
return err
}
}
}
// we need to swap in a new root only when we've persisted 1 or
// more segments -- whereby the new root would have 1-for-1
@ -780,12 +789,6 @@ func (s *Scorch) loadSegment(segmentBucket *bolt.Bucket) (*SegmentSnapshot, erro
return rv, nil
}
type uint64Descending []uint64
func (p uint64Descending) Len() int { return len(p) }
func (p uint64Descending) Less(i, j int) bool { return p[i] > p[j] }
func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (s *Scorch) removeOldData() {
removed, err := s.removeOldBoltSnapshots()
if err != nil {

View file

@ -77,6 +77,8 @@ type Scorch struct {
pauseCount uint64
forceMergeRequestCh chan *mergerCtrl
segPlugin segment.Plugin
}
@ -101,18 +103,15 @@ func NewScorch(storeName string,
nextSnapshotEpoch: 1,
closeCh: make(chan struct{}),
ineligibleForRemoval: map[string]bool{},
forceMergeRequestCh: make(chan *mergerCtrl, 1),
segPlugin: defaultSegmentPlugin,
}
// check if the caller has requested a specific segment type/version
forcedSegmentVersion, ok := config["forceSegmentVersion"].(int)
if ok {
forcedSegmentType, ok2 := config["forceSegmentType"].(string)
if !ok2 {
return nil, fmt.Errorf(
"forceSegmentVersion set to %d, must also specify forceSegmentType", forcedSegmentVersion)
}
forcedSegmentType, forcedSegmentVersion, err := configForceSegmentTypeVersion(config)
if err != nil {
return nil, err
}
if forcedSegmentType != "" && forcedSegmentVersion != 0 {
err := rv.loadSegmentPlugin(forcedSegmentType,
uint32(forcedSegmentVersion))
if err != nil {
@ -140,6 +139,22 @@ func NewScorch(storeName string,
return rv, nil
}
// configForceSegmentTypeVersion checks if the caller has requested a
// specific segment type/version
func configForceSegmentTypeVersion(config map[string]interface{}) (string, uint32, error) {
forcedSegmentVersion, ok := config["forceSegmentVersion"].(int)
if ok {
forcedSegmentType, ok2 := config["forceSegmentType"].(string)
if !ok2 {
return "", 0, fmt.Errorf(
"forceSegmentVersion set to %d, must also specify forceSegmentType", forcedSegmentVersion)
}
return forcedSegmentType, uint32(forcedSegmentVersion), nil
}
return "", 0, nil
}
func (s *Scorch) paused() uint64 {
s.pauseLock.Lock()
pc := s.pauseCount
@ -181,7 +196,7 @@ func (s *Scorch) Open() error {
}
s.asyncTasks.Add(1)
go s.mainLoop()
go s.introducerLoop()
if !s.readOnly && s.path != "" {
s.asyncTasks.Add(1)
@ -241,6 +256,7 @@ func (s *Scorch) openBolt() error {
s.introducerNotifier = make(chan *epochWatcher, 1)
s.persisterNotifier = make(chan *epochWatcher, 1)
s.closeCh = make(chan struct{})
s.forceMergeRequestCh = make(chan *mergerCtrl, 1)
if !s.readOnly && s.path != "" {
err := s.removeOldZapFiles() // Before persister or merger create any new files.
@ -567,6 +583,10 @@ func (s *Scorch) StatsMap() map[string]interface{} {
}
func (s *Scorch) Analyze(d *document.Document) *index.AnalysisResult {
return analyze(d)
}
func analyze(d *document.Document) *index.AnalysisResult {
rv := &index.AnalysisResult{
Document: d,
Analyzed: make([]analysis.TokenFrequencies, len(d.Fields)+len(d.CompositeFields)),

View file

@ -21,6 +21,7 @@ import (
zapv11 "github.com/blevesearch/zap/v11"
zapv12 "github.com/blevesearch/zap/v12"
zapv13 "github.com/blevesearch/zap/v13"
)
var supportedSegmentPlugins map[string]map[uint32]segment.Plugin
@ -28,6 +29,7 @@ var defaultSegmentPlugin segment.Plugin
func init() {
ResetPlugins()
RegisterPlugin(zapv13.Plugin(), false)
RegisterPlugin(zapv12.Plugin(), false)
RegisterPlugin(zapv11.Plugin(), true)
}
@ -60,18 +62,28 @@ func SupportedSegmentTypeVersions(typ string) (rv []uint32) {
return rv
}
func (s *Scorch) loadSegmentPlugin(forcedSegmentType string,
forcedSegmentVersion uint32) error {
func chooseSegmentPlugin(forcedSegmentType string,
forcedSegmentVersion uint32) (segment.Plugin, error) {
if versions, ok := supportedSegmentPlugins[forcedSegmentType]; ok {
if segPlugin, ok := versions[uint32(forcedSegmentVersion)]; ok {
s.segPlugin = segPlugin
return nil
return segPlugin, nil
}
return fmt.Errorf(
return nil, fmt.Errorf(
"unsupported version %d for segment type: %s, supported: %v",
forcedSegmentVersion, forcedSegmentType,
SupportedSegmentTypeVersions(forcedSegmentType))
}
return fmt.Errorf("unsupported segment type: %s, supported: %v",
return nil, fmt.Errorf("unsupported segment type: %s, supported: %v",
forcedSegmentType, SupportedSegmentTypes())
}
func (s *Scorch) loadSegmentPlugin(forcedSegmentType string,
forcedSegmentVersion uint32) error {
segPlugin, err := chooseSegmentPlugin(forcedSegmentType,
forcedSegmentVersion)
if err != nil {
return err
}
s.segPlugin = segPlugin
return nil
}

View file

@ -82,6 +82,9 @@ type Stats struct {
TotFileMergeLoopErr uint64
TotFileMergeLoopEnd uint64
TotFileMergeForceOpsStarted uint64
TotFileMergeForceOpsCompleted uint64
TotFileMergePlan uint64
TotFileMergePlanErr uint64
TotFileMergePlanNone uint64

View file

@ -44,6 +44,16 @@ func NewIndexAlias(indexes ...Index) *indexAliasImpl {
}
}
// VisitIndexes invokes the visit callback on every
// indexes included in the index alias.
func (i *indexAliasImpl) VisitIndexes(visit func(Index)) {
i.mutex.RLock()
for _, idx := range i.indexes {
visit(idx)
}
i.mutex.RUnlock()
}
func (i *indexAliasImpl) isAliasToSingleIndex() error {
if len(i.indexes) < 1 {
return ErrorAliasEmpty

View file

@ -251,7 +251,6 @@ func (dm *DocumentMapping) AddFieldMapping(fm *FieldMapping) {
// UnmarshalJSON offers custom unmarshaling with optional strict validation
func (dm *DocumentMapping) UnmarshalJSON(data []byte) error {
var tmp map[string]json.RawMessage
err := json.Unmarshal(data, &tmp)
if err != nil {
@ -308,8 +307,8 @@ func (dm *DocumentMapping) UnmarshalJSON(data []byte) error {
}
func (dm *DocumentMapping) defaultAnalyzerName(path []string) string {
rv := ""
current := dm
rv := current.DefaultAnalyzer
for _, pathElement := range path {
var ok bool
current, ok = current.Properties[pathElement]

View file

@ -233,7 +233,11 @@ func (so SortOrder) Compare(cachedScoring, cachedDesc []bool, i, j *DocumentMatc
} else {
iVal := i.Sort[x]
jVal := j.Sort[x]
c = strings.Compare(iVal, jVal)
if iVal < jVal {
c = -1
} else if iVal > jVal {
c = 1
}
}
if c == 0 {