Update server dependencies

This commit is contained in:
Ken-Håvard Lieng 2016-01-15 19:48:03 +01:00
parent 6865bf2832
commit f1c3326af1
152 changed files with 16097 additions and 4318 deletions

View file

@ -29,6 +29,14 @@ type Tx struct {
pages map[pgid]*page
stats TxStats
commitHandlers []func()
// WriteFlag specifies the flag for write-related methods like WriteTo().
// Tx opens the database file with the specified flag to copy the data.
//
// By default, the flag is unset, which works well for mostly in-memory
// workloads. For databases that are much larger than available RAM,
// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
WriteFlag int
}
// init initializes the transaction.
@ -87,18 +95,21 @@ func (tx *Tx) Stats() TxStats {
// Bucket retrieves a bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (tx *Tx) Bucket(name []byte) *Bucket {
return tx.root.Bucket(name)
}
// CreateBucket creates a new bucket.
// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
return tx.root.CreateBucket(name)
}
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
return tx.root.CreateBucketIfNotExists(name)
}
@ -127,7 +138,8 @@ func (tx *Tx) OnCommit(fn func()) {
}
// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs.
// Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.
func (tx *Tx) Commit() error {
_assert(!tx.managed, "managed tx commit not allowed")
if tx.db == nil {
@ -156,6 +168,8 @@ func (tx *Tx) Commit() error {
// Free the old root bucket.
tx.meta.root.root = tx.root.root
opgid := tx.meta.pgid
// Free the freelist and allocate new pages for it. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
@ -170,6 +184,14 @@ func (tx *Tx) Commit() error {
}
tx.meta.freelist = p.id
// If the high water mark has moved up then attempt to grow the database.
if tx.meta.pgid > opgid {
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
tx.rollback()
return err
}
}
// Write dirty pages to disk.
startTime = time.Now()
if err := tx.write(); err != nil {
@ -203,7 +225,8 @@ func (tx *Tx) Commit() error {
return nil
}
// Rollback closes the transaction and ignores all previous updates.
// Rollback closes the transaction and ignores all previous updates. Read-only
// transactions must be rolled back and not committed.
func (tx *Tx) Rollback() error {
_assert(!tx.managed, "managed tx rollback not allowed")
if tx.db == nil {
@ -234,7 +257,8 @@ func (tx *Tx) close() {
var freelistPendingN = tx.db.freelist.pending_count()
var freelistAlloc = tx.db.freelist.size()
// Remove writer lock.
// Remove transaction ref & writer lock.
tx.db.rwtx = nil
tx.db.rwlock.Unlock()
// Merge statistics.
@ -248,41 +272,47 @@ func (tx *Tx) close() {
} else {
tx.db.removeTx(tx)
}
// Clear all references.
tx.db = nil
tx.meta = nil
tx.root = Bucket{tx: tx}
tx.pages = nil
}
// Copy writes the entire database to a writer.
// A reader transaction is maintained during the copy so it is safe to continue
// using the database while a copy is in progress.
// Copy will write exactly tx.Size() bytes into the writer.
// This function exists for backwards compatibility. Use WriteTo() instead.
func (tx *Tx) Copy(w io.Writer) error {
var f *os.File
var err error
_, err := tx.WriteTo(w)
return err
}
// Attempt to open reader directly.
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY|odirect, 0); err != nil {
// Fallback to a regular open if that doesn't work.
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY, 0); err != nil {
return err
}
// WriteTo writes the entire database to a writer.
// If err == nil then exactly tx.Size() bytes will be written into the writer.
func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
// Attempt to open reader with WriteFlag
f, err := os.OpenFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
if err != nil {
return 0, err
}
defer func() { _ = f.Close() }()
// Copy the meta pages.
tx.db.metalock.Lock()
_, err = io.CopyN(w, f, int64(tx.db.pageSize*2))
n, err = io.CopyN(w, f, int64(tx.db.pageSize*2))
tx.db.metalock.Unlock()
if err != nil {
_ = f.Close()
return fmt.Errorf("meta copy: %s", err)
return n, fmt.Errorf("meta copy: %s", err)
}
// Copy data pages.
if _, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2)); err != nil {
_ = f.Close()
return err
wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
n += wn
if err != nil {
return n, err
}
return f.Close()
return n, f.Close()
}
// CopyFile copies the entire database to file at the given path.
@ -416,15 +446,39 @@ func (tx *Tx) write() error {
// Write pages to disk in order.
for _, p := range pages {
size := (int(p.overflow) + 1) * tx.db.pageSize
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
offset := int64(p.id) * int64(tx.db.pageSize)
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}
// Update statistics.
tx.stats.Write++
// Write out page in "max allocation" sized chunks.
ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
for {
// Limit our write to our max allocation size.
sz := size
if sz > maxAllocSize-1 {
sz = maxAllocSize - 1
}
// Write chunk to disk.
buf := ptr[:sz]
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}
// Update statistics.
tx.stats.Write++
// Exit inner for loop if we've written all the chunks.
size -= sz
if size == 0 {
break
}
// Otherwise move offset forward and move pointer to next chunk.
offset += int64(sz)
ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
}
}
// Ignore file sync if flag is set on DB.
if !tx.db.NoSync || IgnoreNoSync {
if err := fdatasync(tx.db); err != nil {
return err
@ -461,7 +515,7 @@ func (tx *Tx) writeMeta() error {
}
// page returns a reference to the page with a given id.
// If page has been written to then a temporary bufferred page is returned.
// If page has been written to then a temporary buffered page is returned.
func (tx *Tx) page(id pgid) *page {
// Check the dirty pages first.
if tx.pages != nil {