mirror of https://github.com/prometheus/prometheus
Merge pull request #2695 from prometheus/nolock
Allow disabling of lock file for TSDBpull/2700/head
commit
ff0ee264ae
|
@ -125,6 +125,10 @@ func init() {
|
|||
&cfg.localStoragePath, "storage.local.path", "data",
|
||||
"Base path for metrics storage.",
|
||||
)
|
||||
cfg.fs.BoolVar(
|
||||
&cfg.tsdb.NoLockfile, "storage.tsdb.no-lockfile", false,
|
||||
"Disable lock file usage.",
|
||||
)
|
||||
cfg.fs.DurationVar(
|
||||
&cfg.tsdb.MinBlockDuration, "storage.tsdb.min-block-duration", 2*time.Hour,
|
||||
"Minimum duration of a data block before being persisted.",
|
||||
|
|
|
@ -50,6 +50,9 @@ type Options struct {
|
|||
|
||||
// Duration for how long to retain data.
|
||||
Retention time.Duration
|
||||
|
||||
// Disable creation and consideration of lockfile.
|
||||
NoLockfile bool
|
||||
}
|
||||
|
||||
// Open returns a new storage backed by a tsdb database.
|
||||
|
@ -60,6 +63,7 @@ func Open(path string, r prometheus.Registerer, opts *Options) (storage.Storage,
|
|||
MaxBlockDuration: uint64(opts.MaxBlockDuration.Seconds() * 1000),
|
||||
AppendableBlocks: opts.AppendableBlocks,
|
||||
RetentionDuration: uint64(opts.Retention.Seconds() * 1000),
|
||||
NoLockfile: opts.NoLockfile,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -0,0 +1,201 @@
|
|||
Apache License
|
||||
Version 2.0, January 2004
|
||||
http://www.apache.org/licenses/
|
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
|
||||
|
||||
1. Definitions.
|
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction,
|
||||
and distribution as defined by Sections 1 through 9 of this document.
|
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by
|
||||
the copyright owner that is granting the License.
|
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all
|
||||
other entities that control, are controlled by, or are under common
|
||||
control with that entity. For the purposes of this definition,
|
||||
"control" means (i) the power, direct or indirect, to cause the
|
||||
direction or management of such entity, whether by contract or
|
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the
|
||||
outstanding shares, or (iii) beneficial ownership of such entity.
|
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity
|
||||
exercising permissions granted by this License.
|
||||
|
||||
"Source" form shall mean the preferred form for making modifications,
|
||||
including but not limited to software source code, documentation
|
||||
source, and configuration files.
|
||||
|
||||
"Object" form shall mean any form resulting from mechanical
|
||||
transformation or translation of a Source form, including but
|
||||
not limited to compiled object code, generated documentation,
|
||||
and conversions to other media types.
|
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or
|
||||
Object form, made available under the License, as indicated by a
|
||||
copyright notice that is included in or attached to the work
|
||||
(an example is provided in the Appendix below).
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object
|
||||
form, that is based on (or derived from) the Work and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship. For the purposes
|
||||
of this License, Derivative Works shall not include works that remain
|
||||
separable from, or merely link (or bind by name) to the interfaces of,
|
||||
the Work and Derivative Works thereof.
|
||||
|
||||
"Contribution" shall mean any work of authorship, including
|
||||
the original version of the Work and any modifications or additions
|
||||
to that Work or Derivative Works thereof, that is intentionally
|
||||
submitted to Licensor for inclusion in the Work by the copyright owner
|
||||
or by an individual or Legal Entity authorized to submit on behalf of
|
||||
the copyright owner. For the purposes of this definition, "submitted"
|
||||
means any form of electronic, verbal, or written communication sent
|
||||
to the Licensor or its representatives, including but not limited to
|
||||
communication on electronic mailing lists, source code control systems,
|
||||
and issue tracking systems that are managed by, or on behalf of, the
|
||||
Licensor for the purpose of discussing and improving the Work, but
|
||||
excluding communication that is conspicuously marked or otherwise
|
||||
designated in writing by the copyright owner as "Not a Contribution."
|
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity
|
||||
on behalf of whom a Contribution has been received by Licensor and
|
||||
subsequently incorporated within the Work.
|
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
copyright license to reproduce, prepare Derivative Works of,
|
||||
publicly display, publicly perform, sublicense, and distribute the
|
||||
Work and such Derivative Works in Source or Object form.
|
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of
|
||||
this License, each Contributor hereby grants to You a perpetual,
|
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
|
||||
(except as stated in this section) patent license to make, have made,
|
||||
use, offer to sell, sell, import, and otherwise transfer the Work,
|
||||
where such license applies only to those patent claims licensable
|
||||
by such Contributor that are necessarily infringed by their
|
||||
Contribution(s) alone or by combination of their Contribution(s)
|
||||
with the Work to which such Contribution(s) was submitted. If You
|
||||
institute patent litigation against any entity (including a
|
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work
|
||||
or a Contribution incorporated within the Work constitutes direct
|
||||
or contributory patent infringement, then any patent licenses
|
||||
granted to You under this License for that Work shall terminate
|
||||
as of the date such litigation is filed.
|
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the
|
||||
Work or Derivative Works thereof in any medium, with or without
|
||||
modifications, and in Source or Object form, provided that You
|
||||
meet the following conditions:
|
||||
|
||||
(a) You must give any other recipients of the Work or
|
||||
Derivative Works a copy of this License; and
|
||||
|
||||
(b) You must cause any modified files to carry prominent notices
|
||||
stating that You changed the files; and
|
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works
|
||||
that You distribute, all copyright, patent, trademark, and
|
||||
attribution notices from the Source form of the Work,
|
||||
excluding those notices that do not pertain to any part of
|
||||
the Derivative Works; and
|
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its
|
||||
distribution, then any Derivative Works that You distribute must
|
||||
include a readable copy of the attribution notices contained
|
||||
within such NOTICE file, excluding those notices that do not
|
||||
pertain to any part of the Derivative Works, in at least one
|
||||
of the following places: within a NOTICE text file distributed
|
||||
as part of the Derivative Works; within the Source form or
|
||||
documentation, if provided along with the Derivative Works; or,
|
||||
within a display generated by the Derivative Works, if and
|
||||
wherever such third-party notices normally appear. The contents
|
||||
of the NOTICE file are for informational purposes only and
|
||||
do not modify the License. You may add Your own attribution
|
||||
notices within Derivative Works that You distribute, alongside
|
||||
or as an addendum to the NOTICE text from the Work, provided
|
||||
that such additional attribution notices cannot be construed
|
||||
as modifying the License.
|
||||
|
||||
You may add Your own copyright statement to Your modifications and
|
||||
may provide additional or different license terms and conditions
|
||||
for use, reproduction, or distribution of Your modifications, or
|
||||
for any such Derivative Works as a whole, provided Your use,
|
||||
reproduction, and distribution of the Work otherwise complies with
|
||||
the conditions stated in this License.
|
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise,
|
||||
any Contribution intentionally submitted for inclusion in the Work
|
||||
by You to the Licensor shall be under the terms and conditions of
|
||||
this License, without any additional terms or conditions.
|
||||
Notwithstanding the above, nothing herein shall supersede or modify
|
||||
the terms of any separate license agreement you may have executed
|
||||
with Licensor regarding such Contributions.
|
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade
|
||||
names, trademarks, service marks, or product names of the Licensor,
|
||||
except as required for reasonable and customary use in describing the
|
||||
origin of the Work and reproducing the content of the NOTICE file.
|
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or
|
||||
agreed to in writing, Licensor provides the Work (and each
|
||||
Contributor provides its Contributions) on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
implied, including, without limitation, any warranties or conditions
|
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
|
||||
PARTICULAR PURPOSE. You are solely responsible for determining the
|
||||
appropriateness of using or redistributing the Work and assume any
|
||||
risks associated with Your exercise of permissions under this License.
|
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory,
|
||||
whether in tort (including negligence), contract, or otherwise,
|
||||
unless required by applicable law (such as deliberate and grossly
|
||||
negligent acts) or agreed to in writing, shall any Contributor be
|
||||
liable to You for damages, including any direct, indirect, special,
|
||||
incidental, or consequential damages of any character arising as a
|
||||
result of this License or out of the use or inability to use the
|
||||
Work (including but not limited to damages for loss of goodwill,
|
||||
work stoppage, computer failure or malfunction, or any and all
|
||||
other commercial damages or losses), even if such Contributor
|
||||
has been advised of the possibility of such damages.
|
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing
|
||||
the Work or Derivative Works thereof, You may choose to offer,
|
||||
and charge a fee for, acceptance of support, warranty, indemnity,
|
||||
or other liability obligations and/or rights consistent with this
|
||||
License. However, in accepting such obligations, You may act only
|
||||
on Your own behalf and on Your sole responsibility, not on behalf
|
||||
of any other Contributor, and only if You agree to indemnify,
|
||||
defend, and hold each Contributor harmless for any liability
|
||||
incurred by, or claims asserted against, such Contributor by reason
|
||||
of your accepting any such warranty or additional liability.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
APPENDIX: How to apply the Apache License to your work.
|
||||
|
||||
To apply the Apache License to your work, attach the following
|
||||
boilerplate notice, with the fields enclosed by brackets "[]"
|
||||
replaced with your own identifying information. (Don't include
|
||||
the brackets!) The text should be enclosed in the appropriate
|
||||
comment syntax for the file format. We also recommend that a
|
||||
file or class name and description of purpose be included on the
|
||||
same "printed page" as the copyright notice for easier
|
||||
identification within third-party archives.
|
||||
|
||||
Copyright [yyyy] [name of copyright owner]
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
|
@ -0,0 +1,5 @@
|
|||
# TSDB
|
||||
|
||||
This repository contains the new Prometheus storage layer that will be used in its 2.0 release.
|
||||
|
||||
A writeup of its design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/).
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
@ -6,7 +19,6 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
||||
"github.com/oklog/ulid"
|
||||
"github.com/pkg/errors"
|
||||
|
@ -23,7 +35,7 @@ type DiskBlock interface {
|
|||
// Index returns an IndexReader over the block's data.
|
||||
Index() IndexReader
|
||||
|
||||
// Series returns a SeriesReader over the block's data.
|
||||
// Chunks returns a ChunkReader over the block's data.
|
||||
Chunks() ChunkReader
|
||||
|
||||
// Close releases all underlying resources of the block.
|
||||
|
@ -125,8 +137,10 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
|
|||
enc := json.NewEncoder(f)
|
||||
enc.SetIndent("", "\t")
|
||||
|
||||
if err := enc.Encode(&blockMeta{Version: 1, BlockMeta: meta}); err != nil {
|
||||
return err
|
||||
var merr MultiError
|
||||
if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil {
|
||||
merr.Add(f.Close())
|
||||
return merr
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
return err
|
||||
|
@ -228,30 +242,3 @@ func (f *mmapFile) Close() error {
|
|||
}
|
||||
return err1
|
||||
}
|
||||
|
||||
// A skiplist maps offsets to values. The values found in the data at an
|
||||
// offset are strictly greater than the indexed value.
|
||||
type skiplist interface {
|
||||
// offset returns the offset to data containing values of x and lower.
|
||||
offset(x int64) (uint32, bool)
|
||||
}
|
||||
|
||||
// simpleSkiplist is a slice of plain value/offset pairs.
|
||||
type simpleSkiplist []skiplistPair
|
||||
|
||||
type skiplistPair struct {
|
||||
value int64
|
||||
offset uint32
|
||||
}
|
||||
|
||||
func (sl simpleSkiplist) offset(x int64) (uint32, bool) {
|
||||
// Search for the first offset that contains data greater than x.
|
||||
i := sort.Search(len(sl), func(i int) bool { return sl[i].value >= x })
|
||||
|
||||
// If no element was found return false. If the first element is found,
|
||||
// there's no previous offset actually containing values that are x or lower.
|
||||
if i == len(sl) || i == 0 {
|
||||
return 0, false
|
||||
}
|
||||
return sl[i-1].offset, true
|
||||
}
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
@ -15,7 +28,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// MagicChunks is 4 bytes at the head of series file.
|
||||
// MagicChunks is 4 bytes at the head of a series file.
|
||||
MagicChunks = 0x85BD40DD
|
||||
)
|
||||
|
||||
|
@ -30,12 +43,23 @@ type ChunkMeta struct {
|
|||
MinTime, MaxTime int64 // time range the data covers
|
||||
}
|
||||
|
||||
// writeHash writes the chunk encoding and raw data into the provided hash.
|
||||
func (cm *ChunkMeta) writeHash(h hash.Hash) error {
|
||||
if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ChunkWriter serializes a time block of chunked series data.
|
||||
type ChunkWriter interface {
|
||||
// WriteChunks writes several chunks. The data field of the ChunkMetas
|
||||
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
|
||||
// must be populated.
|
||||
// After returning successfully, the Ref fields in the ChunkMetas
|
||||
// is set and can be used to retrieve the chunks from the written data.
|
||||
// are set and can be used to retrieve the chunks from the written data.
|
||||
WriteChunks(chunks ...*ChunkMeta) error
|
||||
|
||||
// Close writes any required finalization and closes the resources
|
||||
|
@ -112,7 +136,9 @@ func (w *chunkWriter) finalizeTail() error {
|
|||
|
||||
func (w *chunkWriter) cut() error {
|
||||
// Sync current tail to disk and close.
|
||||
w.finalizeTail()
|
||||
if err := w.finalizeTail(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
|
||||
if err != nil {
|
||||
|
@ -150,8 +176,8 @@ func (w *chunkWriter) cut() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (w *chunkWriter) write(wr io.Writer, b []byte) error {
|
||||
n, err := wr.Write(b)
|
||||
func (w *chunkWriter) write(b []byte) error {
|
||||
n, err := w.wbuf.Write(b)
|
||||
w.n += int64(n)
|
||||
return err
|
||||
}
|
||||
|
@ -159,9 +185,9 @@ func (w *chunkWriter) write(wr io.Writer, b []byte) error {
|
|||
func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
|
||||
// Calculate maximum space we need and cut a new segment in case
|
||||
// we don't fit into the current one.
|
||||
maxLen := int64(binary.MaxVarintLen32)
|
||||
maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
|
||||
for _, c := range chks {
|
||||
maxLen += binary.MaxVarintLen32 + 1
|
||||
maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
|
||||
maxLen += int64(len(c.Chunk.Bytes()))
|
||||
}
|
||||
newsz := w.n + maxLen
|
||||
|
@ -172,14 +198,10 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Write chunks sequentially and set the reference field in the ChunkMeta.
|
||||
w.crc32.Reset()
|
||||
wr := io.MultiWriter(w.crc32, w.wbuf)
|
||||
|
||||
b := make([]byte, binary.MaxVarintLen32)
|
||||
n := binary.PutUvarint(b, uint64(len(chks)))
|
||||
|
||||
if err := w.write(wr, b[:n]); err != nil {
|
||||
if err := w.write(b[:n]); err != nil {
|
||||
return err
|
||||
}
|
||||
seq := uint64(w.seq()) << 32
|
||||
|
@ -189,21 +211,25 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
|
|||
|
||||
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
|
||||
|
||||
if err := w.write(wr, b[:n]); err != nil {
|
||||
if err := w.write(b[:n]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil {
|
||||
if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.write(wr, chk.Chunk.Bytes()); err != nil {
|
||||
if err := w.write(chk.Chunk.Bytes()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.crc32.Reset()
|
||||
if err := chk.writeHash(w.crc32); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.write(w.crc32.Sum(nil)); err != nil {
|
||||
return err
|
||||
}
|
||||
chk.Chunk = nil
|
||||
}
|
||||
|
||||
if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,8 +1,49 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// The code in this file was largely written by Damian Gryski as part of
|
||||
// https://github.com/dgryski/go-tsz and published under the license below.
|
||||
// It received minor modifications to suit Prometheus's needs.
|
||||
|
||||
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
|
||||
// All rights reserved.
|
||||
|
||||
// Redistribution and use in source and binary forms, with or without
|
||||
// modification, are permitted provided that the following conditions are met:
|
||||
|
||||
// * Redistributions of source code must retain the above copyright notice,
|
||||
// this list of conditions and the following disclaimer.
|
||||
//
|
||||
// * Redistributions in binary form must reproduce the above copyright notice,
|
||||
// this list of conditions and the following disclaimer in the documentation
|
||||
// and/or other materials provided with the distribution.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
package chunks
|
||||
|
||||
import "io"
|
||||
|
||||
// bstream is a stream of bits
|
||||
// bstream is a stream of bits.
|
||||
type bstream struct {
|
||||
stream []byte // the data stream
|
||||
count uint8 // how many bits are valid in current byte
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package chunks
|
||||
|
||||
import (
|
||||
|
@ -5,7 +18,7 @@ import (
|
|||
"fmt"
|
||||
)
|
||||
|
||||
// Encoding is the identifier for a chunk encoding
|
||||
// Encoding is the identifier for a chunk encoding.
|
||||
type Encoding uint8
|
||||
|
||||
func (e Encoding) String() string {
|
||||
|
|
|
@ -1,3 +1,46 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// The code in this file was largely written by Damian Gryski as part of
|
||||
// https://github.com/dgryski/go-tsz and published under the license below.
|
||||
// It was modified to accomodate reading from byte slices without modifying
|
||||
// the underlying bytes, which would panic when reading from mmaped
|
||||
// read-only byte slices.
|
||||
|
||||
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
|
||||
// All rights reserved.
|
||||
|
||||
// Redistribution and use in source and binary forms, with or without
|
||||
// modification, are permitted provided that the following conditions are met:
|
||||
|
||||
// * Redistributions of source code must retain the above copyright notice,
|
||||
// this list of conditions and the following disclaimer.
|
||||
//
|
||||
// * Redistributions in binary form must reproduce the above copyright notice,
|
||||
// this list of conditions and the following disclaimer in the documentation
|
||||
// and/or other materials provided with the distribution.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
package chunks
|
||||
|
||||
import (
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package tsdb implements a time series storage for float64 sample data.
|
||||
package tsdb
|
||||
|
||||
|
@ -33,6 +46,7 @@ var DefaultOptions = &Options{
|
|||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
||||
AppendableBlocks: 2,
|
||||
NoLockfile: false,
|
||||
}
|
||||
|
||||
// Options of the DB storage.
|
||||
|
@ -56,10 +70,15 @@ type Options struct {
|
|||
// After a new block is started for timestamp t0 or higher, appends with
|
||||
// timestamps as early as t0 - (n-1) * MinBlockDuration are valid.
|
||||
AppendableBlocks int
|
||||
|
||||
// NoLockfile disables creation and consideration of a lock file.
|
||||
NoLockfile bool
|
||||
}
|
||||
|
||||
// Appender allows appending a batch of data. It must be completed with a
|
||||
// call to Commit or Rollback and must not be reused afterwards.
|
||||
//
|
||||
// Operations on the Appender interface are not goroutine-safe.
|
||||
type Appender interface {
|
||||
// Add adds a sample pair for the given series. A reference number is
|
||||
// returned which can be used to add further samples in the same or later
|
||||
|
@ -80,13 +99,11 @@ type Appender interface {
|
|||
Rollback() error
|
||||
}
|
||||
|
||||
const sep = '\xff'
|
||||
|
||||
// DB handles reads and writes of time series falling into
|
||||
// a hashed partition of a seriedb.
|
||||
type DB struct {
|
||||
dir string
|
||||
lockf lockfile.Lockfile
|
||||
lockf *lockfile.Lockfile
|
||||
|
||||
logger log.Logger
|
||||
metrics *dbMetrics
|
||||
|
@ -146,13 +163,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := lockf.TryLock(); err != nil {
|
||||
return nil, errors.Wrapf(err, "open DB in %s", dir)
|
||||
}
|
||||
|
||||
if l == nil {
|
||||
l = log.NewLogfmtLogger(os.Stdout)
|
||||
|
@ -168,7 +178,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
|
||||
db = &DB{
|
||||
dir: dir,
|
||||
lockf: lockf,
|
||||
logger: l,
|
||||
metrics: newDBMetrics(r),
|
||||
opts: opts,
|
||||
|
@ -176,6 +185,17 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
donec: make(chan struct{}),
|
||||
stopc: make(chan struct{}),
|
||||
}
|
||||
if !opts.NoLockfile {
|
||||
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := lockf.TryLock(); err != nil {
|
||||
return nil, errors.Wrapf(err, "open DB in %s", dir)
|
||||
}
|
||||
db.lockf = &lockf
|
||||
}
|
||||
|
||||
db.compactor = newCompactor(r, l, &compactorOptions{
|
||||
maxBlockRange: opts.MaxBlockDuration,
|
||||
})
|
||||
|
@ -452,7 +472,9 @@ func (db *DB) Close() error {
|
|||
var merr MultiError
|
||||
|
||||
merr.Add(g.Wait())
|
||||
merr.Add(db.lockf.Unlock())
|
||||
if db.lockf != nil {
|
||||
merr.Add(db.lockf.Unlock())
|
||||
}
|
||||
|
||||
return merr.Err()
|
||||
}
|
||||
|
@ -505,8 +527,8 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error)
|
|||
return 0, err
|
||||
}
|
||||
a.samples++
|
||||
// Store last byte of sequence number in 3rd byte of refernece.
|
||||
return ref | (uint64(h.meta.Sequence^0xff) << 40), nil
|
||||
// Store last byte of sequence number in 3rd byte of reference.
|
||||
return ref | (uint64(h.meta.Sequence&0xff) << 40), nil
|
||||
}
|
||||
|
||||
func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||
|
@ -519,7 +541,7 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
|
|||
return err
|
||||
}
|
||||
// If the last byte of the sequence does not add up, the reference is not valid.
|
||||
if uint64(h.meta.Sequence^0xff) != gen {
|
||||
if uint64(h.meta.Sequence&0xff) != gen {
|
||||
return ErrNotFound
|
||||
}
|
||||
if err := h.app.AddFast(ref, t, v); err != nil {
|
||||
|
@ -641,7 +663,7 @@ func (a *dbAppender) Rollback() error {
|
|||
var g errgroup.Group
|
||||
|
||||
for _, h := range a.heads {
|
||||
g.Go(h.app.Commit)
|
||||
g.Go(h.app.Rollback)
|
||||
}
|
||||
|
||||
return g.Wait()
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// +build !windows,!plan9,!solaris
|
||||
|
||||
package tsdb
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
|
|
@ -0,0 +1,157 @@
|
|||
package tsdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"hash"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// enbuf is a helper type to populate a byte slice with various types.
|
||||
type encbuf struct {
|
||||
b []byte
|
||||
c [binary.MaxVarintLen64]byte
|
||||
}
|
||||
|
||||
func (e *encbuf) reset() { e.b = e.b[:0] }
|
||||
func (e *encbuf) get() []byte { return e.b }
|
||||
func (e *encbuf) len() int { return len(e.b) }
|
||||
|
||||
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
|
||||
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
|
||||
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
|
||||
|
||||
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
|
||||
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
|
||||
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
|
||||
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
|
||||
|
||||
func (e *encbuf) putBE32(x uint32) {
|
||||
binary.BigEndian.PutUint32(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:4]...)
|
||||
}
|
||||
|
||||
func (e *encbuf) putBE64(x uint64) {
|
||||
binary.BigEndian.PutUint64(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:8]...)
|
||||
}
|
||||
|
||||
func (e *encbuf) putUvarint64(x uint64) {
|
||||
n := binary.PutUvarint(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:n]...)
|
||||
}
|
||||
|
||||
func (e *encbuf) putVarint64(x int64) {
|
||||
n := binary.PutVarint(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:n]...)
|
||||
}
|
||||
|
||||
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
|
||||
func (e *encbuf) putUvarintStr(s string) {
|
||||
b := *(*[]byte)(unsafe.Pointer(&s))
|
||||
e.putUvarint(len(b))
|
||||
e.putString(s)
|
||||
}
|
||||
|
||||
// putHash appends a hash over the buffers current contents to the buffer.
|
||||
func (e *encbuf) putHash(h hash.Hash) {
|
||||
h.Reset()
|
||||
_, err := h.Write(e.b)
|
||||
if err != nil {
|
||||
panic(err) // The CRC32 implementation does not error
|
||||
}
|
||||
e.b = h.Sum(e.b)
|
||||
}
|
||||
|
||||
// decbuf provides safe methods to extract data from a byte slice. It does all
|
||||
// necessary bounds checking and advancing of the byte slice.
|
||||
// Several datums can be extracted without checking for errors. However, before using
|
||||
// any datum, the err() method must be checked.
|
||||
type decbuf struct {
|
||||
b []byte
|
||||
e error
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
|
||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||
|
||||
func (d *decbuf) uvarintStr() string {
|
||||
l := d.uvarint64()
|
||||
if d.e != nil {
|
||||
return ""
|
||||
}
|
||||
if len(d.b) < int(l) {
|
||||
d.e = errInvalidSize
|
||||
return ""
|
||||
}
|
||||
s := yoloString(d.b[:l])
|
||||
d.b = d.b[l:]
|
||||
return s
|
||||
}
|
||||
|
||||
func (d *decbuf) varint64() int64 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
x, n := binary.Varint(d.b)
|
||||
if n < 1 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
d.b = d.b[n:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarint64() uint64 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
x, n := binary.Uvarint(d.b)
|
||||
if n < 1 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
d.b = d.b[n:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) be64() uint64 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.b) < 4 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := binary.BigEndian.Uint64(d.b)
|
||||
d.b = d.b[8:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) be32() uint32 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.b) < 4 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := binary.BigEndian.Uint32(d.b)
|
||||
d.b = d.b[4:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) decbuf(l int) decbuf {
|
||||
if d.e != nil {
|
||||
return decbuf{e: d.e}
|
||||
}
|
||||
if l > len(d.b) {
|
||||
return decbuf{e: errInvalidSize}
|
||||
}
|
||||
r := decbuf{b: d.b[:l]}
|
||||
d.b = d.b[l:]
|
||||
return r
|
||||
}
|
||||
|
||||
func (d *decbuf) err() error { return d.e }
|
||||
func (d *decbuf) len() int { return len(d.b) }
|
||||
func (d *decbuf) get() []byte { return d.b }
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
@ -283,6 +296,10 @@ type refdSample struct {
|
|||
}
|
||||
|
||||
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||
if !a.inBounds(t) {
|
||||
return 0, ErrOutOfBounds
|
||||
}
|
||||
|
||||
hash := lset.Hash()
|
||||
|
||||
if ms := a.get(hash, lset); ms != nil {
|
||||
|
@ -344,7 +361,10 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
|
|||
if t < c.maxTime {
|
||||
return ErrOutOfOrderSample
|
||||
}
|
||||
if c.maxTime == t && ms.lastValue != v {
|
||||
|
||||
// We are allowing exact duplicates as we can encounter them in valid cases
|
||||
// like federation and erroring out at that time would be extremely noisy.
|
||||
if c.maxTime == t && math.Float64bits(ms.lastValue) != math.Float64bits(v) {
|
||||
return ErrAmendSample
|
||||
}
|
||||
}
|
||||
|
@ -410,23 +430,12 @@ func (a *headAppender) Commit() error {
|
|||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
total = uint64(len(a.samples))
|
||||
mint = int64(math.MaxInt64)
|
||||
maxt = int64(math.MinInt64)
|
||||
)
|
||||
total := uint64(len(a.samples))
|
||||
|
||||
for _, s := range a.samples {
|
||||
if !a.series[s.ref].append(s.t, s.v) {
|
||||
total--
|
||||
}
|
||||
|
||||
if s.t < mint {
|
||||
mint = s.t
|
||||
}
|
||||
if s.t > maxt {
|
||||
maxt = s.t
|
||||
}
|
||||
}
|
||||
|
||||
a.mtx.RUnlock()
|
||||
|
@ -632,8 +641,8 @@ func (s *memSeries) append(t int64, v float64) bool {
|
|||
c.minTime = t
|
||||
} else {
|
||||
c = s.head()
|
||||
// Skip duplicate samples.
|
||||
if c.maxTime == t && s.lastValue != v {
|
||||
// Skip duplicate and out of order samples.
|
||||
if c.maxTime >= t {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
@ -12,6 +25,8 @@ import (
|
|||
"sort"
|
||||
"strings"
|
||||
|
||||
"math"
|
||||
|
||||
"github.com/coreos/etcd/pkg/fileutil"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
|
@ -26,10 +41,48 @@ const (
|
|||
|
||||
const compactionPageBytes = minSectorSize * 64
|
||||
|
||||
// IndexWriter serialized the index for a block of series data.
|
||||
// The methods must generally be called in order they are specified.
|
||||
type indexWriterSeries struct {
|
||||
labels labels.Labels
|
||||
chunks []*ChunkMeta // series file offset of chunks
|
||||
offset uint32 // index file offset of series reference
|
||||
}
|
||||
|
||||
type indexWriterSeriesSlice []*indexWriterSeries
|
||||
|
||||
func (s indexWriterSeriesSlice) Len() int { return len(s) }
|
||||
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
||||
func (s indexWriterSeriesSlice) Less(i, j int) bool {
|
||||
return labels.Compare(s[i].labels, s[j].labels) < 0
|
||||
}
|
||||
|
||||
type indexWriterStage uint8
|
||||
|
||||
const (
|
||||
idxStagePopulate indexWriterStage = iota
|
||||
idxStageLabelIndex
|
||||
idxStagePostings
|
||||
idxStageDone
|
||||
)
|
||||
|
||||
func (s indexWriterStage) String() string {
|
||||
switch s {
|
||||
case idxStagePopulate:
|
||||
return "populate"
|
||||
case idxStageLabelIndex:
|
||||
return "label index"
|
||||
case idxStagePostings:
|
||||
return "postings"
|
||||
case idxStageDone:
|
||||
return "done"
|
||||
}
|
||||
return "<unknown>"
|
||||
}
|
||||
|
||||
// IndexWriter serializes the index for a block of series data.
|
||||
// The methods must generally be called in the order they are specified in.
|
||||
type IndexWriter interface {
|
||||
// AddSeries populates the index writer witha series and its offsets
|
||||
// AddSeries populates the index writer with a series and its offsets
|
||||
// of chunks that the index can reference.
|
||||
// The reference number is used to resolve a series against the postings
|
||||
// list iterator. It only has to be available during the write processing.
|
||||
|
@ -48,22 +101,19 @@ type IndexWriter interface {
|
|||
Close() error
|
||||
}
|
||||
|
||||
type indexWriterSeries struct {
|
||||
labels labels.Labels
|
||||
chunks []*ChunkMeta // series file offset of chunks
|
||||
offset uint32 // index file offset of series reference
|
||||
}
|
||||
|
||||
// indexWriter implements the IndexWriter interface for the standard
|
||||
// serialization format.
|
||||
type indexWriter struct {
|
||||
f *os.File
|
||||
bufw *bufio.Writer
|
||||
n int64
|
||||
started bool
|
||||
f *os.File
|
||||
fbuf *bufio.Writer
|
||||
pos uint64
|
||||
|
||||
toc indexTOC
|
||||
stage indexWriterStage
|
||||
|
||||
// Reusable memory.
|
||||
b []byte
|
||||
buf1 encbuf
|
||||
buf2 encbuf
|
||||
uint32s []uint32
|
||||
|
||||
series map[uint32]*indexWriterSeries
|
||||
|
@ -74,6 +124,15 @@ type indexWriter struct {
|
|||
crc32 hash.Hash
|
||||
}
|
||||
|
||||
type indexTOC struct {
|
||||
symbols uint64
|
||||
series uint64
|
||||
labelIndices uint64
|
||||
labelIndicesTable uint64
|
||||
postings uint64
|
||||
postingsTable uint64
|
||||
}
|
||||
|
||||
func newIndexWriter(dir string) (*indexWriter, error) {
|
||||
df, err := fileutil.OpenDir(dir)
|
||||
if err != nil {
|
||||
|
@ -88,12 +147,14 @@ func newIndexWriter(dir string) (*indexWriter, error) {
|
|||
}
|
||||
|
||||
iw := &indexWriter{
|
||||
f: f,
|
||||
bufw: bufio.NewWriterSize(f, 1<<22),
|
||||
n: 0,
|
||||
f: f,
|
||||
fbuf: bufio.NewWriterSize(f, 1<<22),
|
||||
pos: 0,
|
||||
stage: idxStagePopulate,
|
||||
|
||||
// Reusable memory.
|
||||
b: make([]byte, 0, 1<<23),
|
||||
buf1: encbuf{b: make([]byte, 0, 1<<22)},
|
||||
buf2: encbuf{b: make([]byte, 0, 1<<22)},
|
||||
uint32s: make([]uint32, 0, 1<<15),
|
||||
|
||||
// Caches.
|
||||
|
@ -107,40 +168,87 @@ func newIndexWriter(dir string) (*indexWriter, error) {
|
|||
return iw, nil
|
||||
}
|
||||
|
||||
func (w *indexWriter) write(wr io.Writer, b []byte) error {
|
||||
n, err := wr.Write(b)
|
||||
w.n += int64(n)
|
||||
return err
|
||||
}
|
||||
|
||||
// section writes a CRC32 checksummed section of length l and guarded by flag.
|
||||
func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error {
|
||||
w.crc32.Reset()
|
||||
wr := io.MultiWriter(w.crc32, w.bufw)
|
||||
|
||||
b := [5]byte{flag, 0, 0, 0, 0}
|
||||
binary.BigEndian.PutUint32(b[1:], uint32(l))
|
||||
|
||||
if err := w.write(wr, b[:]); err != nil {
|
||||
return errors.Wrap(err, "writing header")
|
||||
}
|
||||
|
||||
if err := f(wr); err != nil {
|
||||
return errors.Wrap(err, "write contents")
|
||||
}
|
||||
if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil {
|
||||
return errors.Wrap(err, "writing checksum")
|
||||
func (w *indexWriter) write(bufs ...[]byte) error {
|
||||
for _, b := range bufs {
|
||||
n, err := w.fbuf.Write(b)
|
||||
w.pos += uint64(n)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// For now the index file must not grow beyond 4GiB. Some of the fixed-sized
|
||||
// offset references in v1 are only 4 bytes large.
|
||||
// Once we move to compressed/varint representations in those areas, this limitation
|
||||
// can be lifted.
|
||||
if w.pos > math.MaxUint32 {
|
||||
return errors.Errorf("exceeding max size of 4GiB")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// addPadding adds zero byte padding until the file size is a multiple of n.
|
||||
func (w *indexWriter) addPadding(n int) error {
|
||||
p := n - (int(w.pos) % n)
|
||||
if p == 0 {
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(w.write(make([]byte, p)), "add padding")
|
||||
}
|
||||
|
||||
// ensureStage handles transitions between write stages and ensures that IndexWriter
|
||||
// methods are called in an order valid for the implementation.
|
||||
func (w *indexWriter) ensureStage(s indexWriterStage) error {
|
||||
if w.stage == s {
|
||||
return nil
|
||||
}
|
||||
if w.stage > s {
|
||||
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
|
||||
}
|
||||
|
||||
// Complete population stage by writing symbols and series.
|
||||
if w.stage == idxStagePopulate {
|
||||
w.toc.symbols = w.pos
|
||||
if err := w.writeSymbols(); err != nil {
|
||||
return err
|
||||
}
|
||||
w.toc.series = w.pos
|
||||
if err := w.writeSeries(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Mark start of sections in table of contents.
|
||||
switch s {
|
||||
case idxStageLabelIndex:
|
||||
w.toc.labelIndices = w.pos
|
||||
|
||||
case idxStagePostings:
|
||||
w.toc.labelIndicesTable = w.pos
|
||||
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
|
||||
return err
|
||||
}
|
||||
w.toc.postings = w.pos
|
||||
|
||||
case idxStageDone:
|
||||
w.toc.postingsTable = w.pos
|
||||
if err := w.writeOffsetTable(w.postings); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.writeTOC(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
w.stage = s
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *indexWriter) writeMeta() error {
|
||||
b := [8]byte{}
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32(MagicIndex)
|
||||
w.buf1.putByte(indexFormatV1)
|
||||
|
||||
binary.BigEndian.PutUint32(b[:4], MagicIndex)
|
||||
b[4] = flagStd
|
||||
|
||||
return w.write(w.bufw, b[:])
|
||||
return w.write(w.buf1.get())
|
||||
}
|
||||
|
||||
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
|
||||
|
@ -168,33 +276,27 @@ func (w *indexWriter) writeSymbols() error {
|
|||
}
|
||||
sort.Strings(symbols)
|
||||
|
||||
// The start of the section plus a 5 byte section header are our base.
|
||||
// TODO(fabxc): switch to relative offsets and hold sections in a TOC.
|
||||
base := uint32(w.n) + 5
|
||||
const headerSize = 4
|
||||
|
||||
buf := [binary.MaxVarintLen32]byte{}
|
||||
w.b = append(w.b[:0], flagStd)
|
||||
w.buf1.reset()
|
||||
w.buf2.reset()
|
||||
|
||||
w.buf2.putBE32int(len(symbols))
|
||||
|
||||
for _, s := range symbols {
|
||||
w.symbols[s] = base + uint32(len(w.b))
|
||||
w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
|
||||
|
||||
n := binary.PutUvarint(buf[:], uint64(len(s)))
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
w.b = append(w.b, s...)
|
||||
// NOTE: len(s) gives the number of runes, not the number of bytes.
|
||||
// Therefore the read-back length for strings with unicode characters will
|
||||
// be off when not using putCstr.
|
||||
w.buf2.putUvarintStr(s)
|
||||
}
|
||||
|
||||
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
|
||||
return w.write(wr, w.b)
|
||||
})
|
||||
}
|
||||
w.buf1.putBE32int(w.buf2.len())
|
||||
w.buf2.putHash(w.crc32)
|
||||
|
||||
type indexWriterSeriesSlice []*indexWriterSeries
|
||||
|
||||
func (s indexWriterSeriesSlice) Len() int { return len(s) }
|
||||
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
||||
func (s indexWriterSeriesSlice) Less(i, j int) bool {
|
||||
return labels.Compare(s[i].labels, s[j].labels) < 0
|
||||
err := w.write(w.buf1.get(), w.buf2.get())
|
||||
return errors.Wrap(err, "write symbols")
|
||||
}
|
||||
|
||||
func (w *indexWriter) writeSeries() error {
|
||||
|
@ -206,64 +308,52 @@ func (w *indexWriter) writeSeries() error {
|
|||
}
|
||||
sort.Sort(series)
|
||||
|
||||
// Current end of file plus 5 bytes for section header.
|
||||
// TODO(fabxc): switch to relative offsets.
|
||||
base := uint32(w.n) + 5
|
||||
// Header holds number of series.
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(len(series))
|
||||
|
||||
w.b = w.b[:0]
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
if err := w.write(w.buf1.get()); err != nil {
|
||||
return errors.Wrap(err, "write series count")
|
||||
}
|
||||
|
||||
for _, s := range series {
|
||||
// Write label set symbol references.
|
||||
s.offset = base + uint32(len(w.b))
|
||||
s.offset = uint32(w.pos)
|
||||
|
||||
n := binary.PutUvarint(buf, uint64(len(s.labels)))
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
w.buf2.reset()
|
||||
w.buf2.putUvarint(len(s.labels))
|
||||
|
||||
for _, l := range s.labels {
|
||||
n = binary.PutUvarint(buf, uint64(w.symbols[l.Name]))
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
n = binary.PutUvarint(buf, uint64(w.symbols[l.Value]))
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
w.buf2.putUvarint32(w.symbols[l.Name])
|
||||
w.buf2.putUvarint32(w.symbols[l.Value])
|
||||
}
|
||||
|
||||
// Write chunks meta data including reference into chunk file.
|
||||
n = binary.PutUvarint(buf, uint64(len(s.chunks)))
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
w.buf2.putUvarint(len(s.chunks))
|
||||
|
||||
for _, c := range s.chunks {
|
||||
n = binary.PutVarint(buf, c.MinTime)
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
n = binary.PutVarint(buf, c.MaxTime)
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
w.buf2.putVarint64(c.MinTime)
|
||||
w.buf2.putVarint64(c.MaxTime)
|
||||
w.buf2.putUvarint64(c.Ref)
|
||||
}
|
||||
|
||||
n = binary.PutUvarint(buf, uint64(c.Ref))
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
w.buf1.reset()
|
||||
w.buf1.putUvarint(w.buf2.len())
|
||||
|
||||
w.buf2.putHash(w.crc32)
|
||||
|
||||
if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
|
||||
return errors.Wrap(err, "write series data")
|
||||
}
|
||||
}
|
||||
|
||||
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
|
||||
return w.write(wr, w.b)
|
||||
})
|
||||
}
|
||||
|
||||
func (w *indexWriter) init() error {
|
||||
if err := w.writeSymbols(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.writeSeries(); err != nil {
|
||||
return err
|
||||
}
|
||||
w.started = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||
if !w.started {
|
||||
if err := w.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(values)%len(names) != 0 {
|
||||
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
|
||||
}
|
||||
if err := w.ensureStage(idxStageLabelIndex); err != nil {
|
||||
return errors.Wrap(err, "ensure stage")
|
||||
}
|
||||
|
||||
valt, err := newStringTuples(values, len(names))
|
||||
|
@ -272,45 +362,84 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
|||
}
|
||||
sort.Sort(valt)
|
||||
|
||||
// Align beginning to 4 bytes for more efficient index list scans.
|
||||
if err := w.addPadding(4); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.labelIndexes = append(w.labelIndexes, hashEntry{
|
||||
name: strings.Join(names, string(sep)),
|
||||
offset: uint32(w.n),
|
||||
keys: names,
|
||||
offset: w.pos,
|
||||
})
|
||||
|
||||
buf := make([]byte, binary.MaxVarintLen32)
|
||||
n := binary.PutUvarint(buf, uint64(len(names)))
|
||||
w.buf2.reset()
|
||||
w.buf2.putBE32int(len(names))
|
||||
w.buf2.putBE32int(valt.Len())
|
||||
|
||||
l := n + len(values)*4
|
||||
for _, v := range valt.s {
|
||||
w.buf2.putBE32(w.symbols[v])
|
||||
}
|
||||
|
||||
return w.section(l, flagStd, func(wr io.Writer) error {
|
||||
// First byte indicates tuple size for index.
|
||||
if err := w.write(wr, buf[:n]); err != nil {
|
||||
return err
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(w.buf2.len())
|
||||
|
||||
w.buf2.putHash(w.crc32)
|
||||
|
||||
err = w.write(w.buf1.get(), w.buf2.get())
|
||||
return errors.Wrap(err, "write label index")
|
||||
}
|
||||
|
||||
// writeOffsetTable writes a sequence of readable hash entries.
|
||||
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(len(entries))
|
||||
|
||||
w.buf2.reset()
|
||||
|
||||
for _, e := range entries {
|
||||
w.buf2.putUvarint(len(e.keys))
|
||||
for _, k := range e.keys {
|
||||
w.buf2.putUvarintStr(k)
|
||||
}
|
||||
w.buf2.putUvarint64(e.offset)
|
||||
}
|
||||
|
||||
for _, v := range valt.s {
|
||||
binary.BigEndian.PutUint32(buf, w.symbols[v])
|
||||
w.buf1.putBE32int(w.buf2.len())
|
||||
w.buf2.putHash(w.crc32)
|
||||
|
||||
if err := w.write(wr, buf[:4]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return w.write(w.buf1.get(), w.buf2.get())
|
||||
}
|
||||
|
||||
const indexTOCLen = 6*8 + 4
|
||||
|
||||
func (w *indexWriter) writeTOC() error {
|
||||
w.buf1.reset()
|
||||
|
||||
w.buf1.putBE64(w.toc.symbols)
|
||||
w.buf1.putBE64(w.toc.series)
|
||||
w.buf1.putBE64(w.toc.labelIndices)
|
||||
w.buf1.putBE64(w.toc.labelIndicesTable)
|
||||
w.buf1.putBE64(w.toc.postings)
|
||||
w.buf1.putBE64(w.toc.postingsTable)
|
||||
|
||||
w.buf1.putHash(w.crc32)
|
||||
|
||||
return w.write(w.buf1.get())
|
||||
}
|
||||
|
||||
func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
||||
if !w.started {
|
||||
if err := w.init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.ensureStage(idxStagePostings); err != nil {
|
||||
return errors.Wrap(err, "ensure stage")
|
||||
}
|
||||
|
||||
key := name + string(sep) + value
|
||||
// Align beginning to 4 bytes for more efficient postings list scans.
|
||||
if err := w.addPadding(4); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.postings = append(w.postings, hashEntry{
|
||||
name: key,
|
||||
offset: uint32(w.n),
|
||||
keys: []string{name, value},
|
||||
offset: w.pos,
|
||||
})
|
||||
|
||||
// Order of the references in the postings list does not imply order
|
||||
|
@ -328,22 +457,22 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
|||
if err := it.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sort.Sort(uint32slice(refs))
|
||||
|
||||
w.b = w.b[:0]
|
||||
buf := make([]byte, 4)
|
||||
w.buf2.reset()
|
||||
w.buf2.putBE32int(len(refs))
|
||||
|
||||
for _, r := range refs {
|
||||
binary.BigEndian.PutUint32(buf, r)
|
||||
w.b = append(w.b, buf...)
|
||||
w.buf2.putBE32(r)
|
||||
}
|
||||
|
||||
w.uint32s = refs[:0]
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(w.buf2.len())
|
||||
|
||||
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
|
||||
return w.write(wr, w.b)
|
||||
})
|
||||
w.buf2.putHash(w.crc32)
|
||||
|
||||
err := w.write(w.buf1.get(), w.buf2.get())
|
||||
return errors.Wrap(err, "write postings")
|
||||
}
|
||||
|
||||
type uint32slice []uint32
|
||||
|
@ -353,56 +482,15 @@ func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
|||
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
|
||||
|
||||
type hashEntry struct {
|
||||
name string
|
||||
offset uint32
|
||||
}
|
||||
|
||||
func (w *indexWriter) writeHashmap(h []hashEntry) error {
|
||||
w.b = w.b[:0]
|
||||
buf := [binary.MaxVarintLen32]byte{}
|
||||
|
||||
for _, e := range h {
|
||||
n := binary.PutUvarint(buf[:], uint64(len(e.name)))
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
w.b = append(w.b, e.name...)
|
||||
|
||||
n = binary.PutUvarint(buf[:], uint64(e.offset))
|
||||
w.b = append(w.b, buf[:n]...)
|
||||
}
|
||||
|
||||
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
|
||||
return w.write(wr, w.b)
|
||||
})
|
||||
}
|
||||
|
||||
func (w *indexWriter) finalize() error {
|
||||
// Write out hash maps to jump to correct label index and postings sections.
|
||||
lo := uint32(w.n)
|
||||
if err := w.writeHashmap(w.labelIndexes); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
po := uint32(w.n)
|
||||
if err := w.writeHashmap(w.postings); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Terminate index file with offsets to hashmaps. This is the entry Pointer
|
||||
// for any index query.
|
||||
// TODO(fabxc): also store offset to series section to allow plain
|
||||
// iteration over all existing series?
|
||||
b := [8]byte{}
|
||||
binary.BigEndian.PutUint32(b[:4], lo)
|
||||
binary.BigEndian.PutUint32(b[4:], po)
|
||||
|
||||
return w.write(w.bufw, b[:])
|
||||
keys []string
|
||||
offset uint64
|
||||
}
|
||||
|
||||
func (w *indexWriter) Close() error {
|
||||
if err := w.finalize(); err != nil {
|
||||
if err := w.ensureStage(idxStageDone); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.bufw.Flush(); err != nil {
|
||||
if err := w.fbuf.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := fileutil.Fsync(w.f); err != nil {
|
||||
|
@ -440,7 +528,8 @@ type StringTuples interface {
|
|||
|
||||
type indexReader struct {
|
||||
// The underlying byte slice holding the encoded series data.
|
||||
b []byte
|
||||
b []byte
|
||||
toc indexTOC
|
||||
|
||||
// Close that releases the underlying resources of the byte slice.
|
||||
c io.Closer
|
||||
|
@ -471,57 +560,77 @@ func newIndexReader(dir string) (*indexReader, error) {
|
|||
return nil, errors.Errorf("invalid magic number %x", m)
|
||||
}
|
||||
|
||||
// The last two 4 bytes hold the pointers to the hashmaps.
|
||||
loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4])
|
||||
poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:])
|
||||
if err := r.readTOC(); err != nil {
|
||||
return nil, errors.Wrap(err, "read TOC")
|
||||
}
|
||||
|
||||
flag, b, err := r.section(loff)
|
||||
r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff)
|
||||
return nil, errors.Wrap(err, "read label index table")
|
||||
}
|
||||
if r.labels, err = readHashmap(flag, b); err != nil {
|
||||
return nil, errors.Wrap(err, "read label index hashmap")
|
||||
}
|
||||
flag, b, err = r.section(poff)
|
||||
r.postings, err = r.readOffsetTable(r.toc.postingsTable)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff)
|
||||
}
|
||||
if r.postings, err = readHashmap(flag, b); err != nil {
|
||||
return nil, errors.Wrap(err, "read postings hashmap")
|
||||
return nil, errors.Wrap(err, "read postings table")
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func readHashmap(flag byte, b []byte) (map[string]uint32, error) {
|
||||
if flag != flagStd {
|
||||
return nil, errInvalidFlag
|
||||
func (r *indexReader) readTOC() error {
|
||||
d := r.decbufAt(len(r.b) - indexTOCLen)
|
||||
|
||||
r.toc.symbols = d.be64()
|
||||
r.toc.series = d.be64()
|
||||
r.toc.labelIndices = d.be64()
|
||||
r.toc.labelIndicesTable = d.be64()
|
||||
r.toc.postings = d.be64()
|
||||
r.toc.postingsTable = d.be64()
|
||||
|
||||
// TODO(fabxc): validate checksum.
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *indexReader) decbufAt(off int) decbuf {
|
||||
if len(r.b) < off {
|
||||
return decbuf{e: errInvalidSize}
|
||||
}
|
||||
h := make(map[string]uint32, 512)
|
||||
return decbuf{b: r.b[off:]}
|
||||
}
|
||||
|
||||
for len(b) > 0 {
|
||||
l, n := binary.Uvarint(b)
|
||||
if n < 1 {
|
||||
return nil, errors.Wrap(errInvalidSize, "read key length")
|
||||
}
|
||||
b = b[n:]
|
||||
|
||||
if len(b) < int(l) {
|
||||
return nil, errors.Wrap(errInvalidSize, "read key")
|
||||
}
|
||||
s := string(b[:l])
|
||||
b = b[l:]
|
||||
|
||||
o, n := binary.Uvarint(b)
|
||||
if n < 1 {
|
||||
return nil, errors.Wrap(errInvalidSize, "read offset value")
|
||||
}
|
||||
b = b[n:]
|
||||
|
||||
h[s] = uint32(o)
|
||||
// readOffsetTable reads an offset table at the given position and returns a map
|
||||
// with the key strings concatenated by the 0xff unicode non-character.
|
||||
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
|
||||
// A table might not have been written at all, in which case the position
|
||||
// is zeroed out.
|
||||
if off == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return h, nil
|
||||
const sep = "\xff"
|
||||
|
||||
var (
|
||||
d1 = r.decbufAt(int(off))
|
||||
cnt = d1.be32()
|
||||
d2 = d1.decbuf(d1.be32int())
|
||||
)
|
||||
|
||||
res := make(map[string]uint32, 512)
|
||||
|
||||
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
|
||||
keyCount := int(d2.uvarint())
|
||||
keys := make([]string, 0, keyCount)
|
||||
|
||||
for i := 0; i < keyCount; i++ {
|
||||
keys = append(keys, d2.uvarintStr())
|
||||
}
|
||||
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
|
||||
|
||||
cnt--
|
||||
}
|
||||
|
||||
// TODO(fabxc): verify checksum from remainer of d1.
|
||||
return res, d2.err()
|
||||
}
|
||||
|
||||
func (r *indexReader) Close() error {
|
||||
|
@ -548,25 +657,19 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
|
|||
}
|
||||
|
||||
func (r *indexReader) lookupSymbol(o uint32) (string, error) {
|
||||
if int(o) > len(r.b) {
|
||||
return "", errors.Errorf("invalid symbol offset %d", o)
|
||||
}
|
||||
l, n := binary.Uvarint(r.b[o:])
|
||||
if n < 0 {
|
||||
return "", errors.New("reading symbol length failed")
|
||||
}
|
||||
d := r.decbufAt(int(o))
|
||||
|
||||
end := int(o) + n + int(l)
|
||||
if end > len(r.b) {
|
||||
return "", errors.New("invalid length")
|
||||
s := d.uvarintStr()
|
||||
if d.err() != nil {
|
||||
return "", errors.Wrapf(d.err(), "read symbol at %d", o)
|
||||
}
|
||||
b := r.b[int(o)+n : end]
|
||||
|
||||
return yoloString(b), nil
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||
key := strings.Join(names, string(sep))
|
||||
const sep = "\xff"
|
||||
|
||||
key := strings.Join(names, sep)
|
||||
off, ok := r.labels[key]
|
||||
if !ok {
|
||||
// XXX(fabxc): hot fix. Should return a partial data error and handle cases
|
||||
|
@ -575,21 +678,21 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
|||
//return nil, fmt.Errorf("label index doesn't exist")
|
||||
}
|
||||
|
||||
flag, b, err := r.section(off)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "section at %d", off)
|
||||
}
|
||||
if flag != flagStd {
|
||||
return nil, errInvalidFlag
|
||||
}
|
||||
l, n := binary.Uvarint(b)
|
||||
if n < 1 {
|
||||
return nil, errors.Wrap(errInvalidSize, "read label index size")
|
||||
d1 := r.decbufAt(int(off))
|
||||
d2 := d1.decbuf(d1.be32int())
|
||||
|
||||
nc := d2.be32int()
|
||||
d2.be32() // consume unused value entry count.
|
||||
|
||||
if d2.err() != nil {
|
||||
return nil, errors.Wrap(d2.err(), "read label value index")
|
||||
}
|
||||
|
||||
// TODO(fabxc): verify checksum in 4 remaining bytes of d1.
|
||||
|
||||
st := &serializedStringTuples{
|
||||
l: int(l),
|
||||
b: b[n:],
|
||||
l: nc,
|
||||
b: d2.get(),
|
||||
lookup: r.lookupSymbol,
|
||||
}
|
||||
return st, nil
|
||||
|
@ -601,110 +704,89 @@ func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
|
|||
func (emptyStringTuples) Len() int { return 0 }
|
||||
|
||||
func (r *indexReader) LabelIndices() ([][]string, error) {
|
||||
const sep = "\xff"
|
||||
|
||||
res := [][]string{}
|
||||
|
||||
for s := range r.labels {
|
||||
res = append(res, strings.Split(s, string(sep)))
|
||||
res = append(res, strings.Split(s, sep))
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
|
||||
k, n := binary.Uvarint(r.b[ref:])
|
||||
if n < 1 {
|
||||
return nil, nil, errors.Wrap(errInvalidSize, "number of labels")
|
||||
}
|
||||
d1 := r.decbufAt(int(ref))
|
||||
d2 := d1.decbuf(int(d1.uvarint()))
|
||||
|
||||
b := r.b[int(ref)+n:]
|
||||
k := int(d2.uvarint())
|
||||
lbls := make(labels.Labels, 0, k)
|
||||
|
||||
for i := 0; i < 2*int(k); i += 2 {
|
||||
o, m := binary.Uvarint(b)
|
||||
if m < 1 {
|
||||
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
|
||||
}
|
||||
n, err := r.lookupSymbol(uint32(o))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "symbol lookup")
|
||||
}
|
||||
b = b[m:]
|
||||
for i := 0; i < k; i++ {
|
||||
lno := uint32(d2.uvarint())
|
||||
lvo := uint32(d2.uvarint())
|
||||
|
||||
o, m = binary.Uvarint(b)
|
||||
if m < 1 {
|
||||
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
|
||||
if d2.err() != nil {
|
||||
return nil, nil, errors.Wrap(d2.err(), "read series label offsets")
|
||||
}
|
||||
v, err := r.lookupSymbol(uint32(o))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "symbol lookup")
|
||||
}
|
||||
b = b[m:]
|
||||
|
||||
lbls = append(lbls, labels.Label{
|
||||
Name: n,
|
||||
Value: v,
|
||||
})
|
||||
ln, err := r.lookupSymbol(lno)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "lookup label name")
|
||||
}
|
||||
lv, err := r.lookupSymbol(lvo)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "lookup label value")
|
||||
}
|
||||
|
||||
lbls = append(lbls, labels.Label{Name: ln, Value: lv})
|
||||
}
|
||||
|
||||
// Read the chunks meta data.
|
||||
k, n = binary.Uvarint(b)
|
||||
if n < 1 {
|
||||
return nil, nil, errors.Wrap(errInvalidSize, "number of chunks")
|
||||
}
|
||||
|
||||
b = b[n:]
|
||||
k = int(d2.uvarint())
|
||||
chunks := make([]*ChunkMeta, 0, k)
|
||||
|
||||
for i := 0; i < int(k); i++ {
|
||||
firstTime, n := binary.Varint(b)
|
||||
if n < 1 {
|
||||
return nil, nil, errors.Wrap(errInvalidSize, "first time")
|
||||
}
|
||||
b = b[n:]
|
||||
for i := 0; i < k; i++ {
|
||||
mint := d2.varint64()
|
||||
maxt := d2.varint64()
|
||||
off := d2.uvarint64()
|
||||
|
||||
lastTime, n := binary.Varint(b)
|
||||
if n < 1 {
|
||||
return nil, nil, errors.Wrap(errInvalidSize, "last time")
|
||||
if d2.err() != nil {
|
||||
return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i)
|
||||
}
|
||||
b = b[n:]
|
||||
|
||||
o, n := binary.Uvarint(b)
|
||||
if n < 1 {
|
||||
return nil, nil, errors.Wrap(errInvalidSize, "chunk offset")
|
||||
}
|
||||
b = b[n:]
|
||||
|
||||
chunks = append(chunks, &ChunkMeta{
|
||||
Ref: o,
|
||||
MinTime: firstTime,
|
||||
MaxTime: lastTime,
|
||||
Ref: off,
|
||||
MinTime: mint,
|
||||
MaxTime: maxt,
|
||||
})
|
||||
}
|
||||
|
||||
// TODO(fabxc): verify CRC32.
|
||||
|
||||
return lbls, chunks, nil
|
||||
}
|
||||
|
||||
func (r *indexReader) Postings(name, value string) (Postings, error) {
|
||||
key := name + string(sep) + value
|
||||
const sep = "\xff"
|
||||
key := strings.Join([]string{name, value}, sep)
|
||||
|
||||
off, ok := r.postings[key]
|
||||
if !ok {
|
||||
return emptyPostings, nil
|
||||
}
|
||||
|
||||
flag, b, err := r.section(off)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "section at %d", off)
|
||||
d1 := r.decbufAt(int(off))
|
||||
d2 := d1.decbuf(d1.be32int())
|
||||
|
||||
d2.be32() // consume unused postings list length.
|
||||
|
||||
if d2.err() != nil {
|
||||
return nil, errors.Wrap(d2.err(), "get postings bytes")
|
||||
}
|
||||
|
||||
if flag != flagStd {
|
||||
return nil, errors.Wrapf(errInvalidFlag, "section at %d", off)
|
||||
}
|
||||
// TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify.
|
||||
|
||||
// Add iterator over the bytes.
|
||||
if len(b)%4 != 0 {
|
||||
return nil, errors.Wrap(errInvalidSize, "plain postings entry")
|
||||
}
|
||||
return newBigEndianPostings(b), nil
|
||||
return newBigEndianPostings(d2.get()), nil
|
||||
}
|
||||
|
||||
type stringTuples struct {
|
||||
|
@ -753,7 +835,6 @@ type serializedStringTuples struct {
|
|||
}
|
||||
|
||||
func (t *serializedStringTuples) Len() int {
|
||||
// TODO(fabxc): Cache this?
|
||||
return len(t.b) / (4 * t.l)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package labels
|
||||
|
||||
import (
|
||||
|
@ -71,7 +84,7 @@ func (ls Labels) Equals(o Labels) bool {
|
|||
return false
|
||||
}
|
||||
for i, l := range ls {
|
||||
if l.Name != o[i].Name || l.Value != o[i].Value {
|
||||
if o[i] != l {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package labels
|
||||
|
||||
import "regexp"
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
@ -139,30 +152,30 @@ func Merge(its ...Postings) Postings {
|
|||
a := its[0]
|
||||
|
||||
for _, b := range its[1:] {
|
||||
a = newMergePostings(a, b)
|
||||
a = newMergedPostings(a, b)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
type mergePostings struct {
|
||||
type mergedPostings struct {
|
||||
a, b Postings
|
||||
aok, bok bool
|
||||
cur uint32
|
||||
}
|
||||
|
||||
func newMergePostings(a, b Postings) *mergePostings {
|
||||
it := &mergePostings{a: a, b: b}
|
||||
func newMergedPostings(a, b Postings) *mergedPostings {
|
||||
it := &mergedPostings{a: a, b: b}
|
||||
it.aok = it.a.Next()
|
||||
it.bok = it.b.Next()
|
||||
|
||||
return it
|
||||
}
|
||||
|
||||
func (it *mergePostings) At() uint32 {
|
||||
func (it *mergedPostings) At() uint32 {
|
||||
return it.cur
|
||||
}
|
||||
|
||||
func (it *mergePostings) Next() bool {
|
||||
func (it *mergedPostings) Next() bool {
|
||||
if !it.aok && !it.bok {
|
||||
return false
|
||||
}
|
||||
|
@ -197,13 +210,14 @@ func (it *mergePostings) Next() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (it *mergePostings) Seek(id uint32) bool {
|
||||
func (it *mergedPostings) Seek(id uint32) bool {
|
||||
it.aok = it.a.Seek(id)
|
||||
it.bok = it.b.Seek(id)
|
||||
|
||||
return it.Next()
|
||||
}
|
||||
|
||||
func (it *mergePostings) Err() error {
|
||||
func (it *mergedPostings) Err() error {
|
||||
if it.a.Err() != nil {
|
||||
return it.a.Err()
|
||||
}
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
@ -153,6 +166,9 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
|
|||
mint: q.mint,
|
||||
maxt: q.maxt,
|
||||
},
|
||||
|
||||
mint: q.mint,
|
||||
maxt: q.maxt,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -397,11 +413,15 @@ func (s *populatedChunkSeries) Next() bool {
|
|||
for s.set.Next() {
|
||||
lset, chks := s.set.At()
|
||||
|
||||
for i, c := range chks {
|
||||
if c.MaxTime < s.mint {
|
||||
chks = chks[1:]
|
||||
continue
|
||||
for len(chks) > 0 {
|
||||
if chks[0].MaxTime >= s.mint {
|
||||
break
|
||||
}
|
||||
chks = chks[1:]
|
||||
}
|
||||
|
||||
// Break out at the first chunk that has no overlap with mint, maxt.
|
||||
for i, c := range chks {
|
||||
if c.MinTime > s.maxt {
|
||||
chks = chks[:i]
|
||||
break
|
||||
|
@ -411,6 +431,7 @@ func (s *populatedChunkSeries) Next() bool {
|
|||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if len(chks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
@ -431,12 +452,14 @@ type blockSeriesSet struct {
|
|||
set chunkSeriesSet
|
||||
err error
|
||||
cur Series
|
||||
|
||||
mint, maxt int64
|
||||
}
|
||||
|
||||
func (s *blockSeriesSet) Next() bool {
|
||||
for s.set.Next() {
|
||||
lset, chunks := s.set.At()
|
||||
s.cur = &chunkSeries{labels: lset, chunks: chunks}
|
||||
s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt}
|
||||
return true
|
||||
}
|
||||
if s.set.Err() != nil {
|
||||
|
@ -453,6 +476,8 @@ func (s *blockSeriesSet) Err() error { return s.err }
|
|||
type chunkSeries struct {
|
||||
labels labels.Labels
|
||||
chunks []*ChunkMeta // in-order chunk refs
|
||||
|
||||
mint, maxt int64
|
||||
}
|
||||
|
||||
func (s *chunkSeries) Labels() labels.Labels {
|
||||
|
@ -460,14 +485,14 @@ func (s *chunkSeries) Labels() labels.Labels {
|
|||
}
|
||||
|
||||
func (s *chunkSeries) Iterator() SeriesIterator {
|
||||
return newChunkSeriesIterator(s.chunks)
|
||||
return newChunkSeriesIterator(s.chunks, s.mint, s.maxt)
|
||||
}
|
||||
|
||||
// SeriesIterator iterates over the data of a time series.
|
||||
type SeriesIterator interface {
|
||||
// Seek advances the iterator forward to the given timestamp.
|
||||
// If there's no value exactly at ts, it advances to the last value
|
||||
// before tt.
|
||||
// If there's no value exactly at t, it advances to the first value
|
||||
// after t.
|
||||
Seek(t int64) bool
|
||||
// At returns the current timestamp/value pair.
|
||||
At() (t int64, v float64)
|
||||
|
@ -488,7 +513,7 @@ func (s *chainedSeries) Labels() labels.Labels {
|
|||
}
|
||||
|
||||
func (s *chainedSeries) Iterator() SeriesIterator {
|
||||
return &chainedSeriesIterator{series: s.series}
|
||||
return newChainedSeriesIterator(s.series...)
|
||||
}
|
||||
|
||||
// chainedSeriesIterator implements a series iterater over a list
|
||||
|
@ -500,6 +525,14 @@ type chainedSeriesIterator struct {
|
|||
cur SeriesIterator
|
||||
}
|
||||
|
||||
func newChainedSeriesIterator(s ...Series) *chainedSeriesIterator {
|
||||
return &chainedSeriesIterator{
|
||||
series: s,
|
||||
i: 0,
|
||||
cur: s[0].Iterator(),
|
||||
}
|
||||
}
|
||||
|
||||
func (it *chainedSeriesIterator) Seek(t int64) bool {
|
||||
// We just scan the chained series sequentially as they are already
|
||||
// pre-selected by relevant time and should be accessed sequentially anyway.
|
||||
|
@ -516,9 +549,6 @@ func (it *chainedSeriesIterator) Seek(t int64) bool {
|
|||
}
|
||||
|
||||
func (it *chainedSeriesIterator) Next() bool {
|
||||
if it.cur == nil {
|
||||
it.cur = it.series[it.i].Iterator()
|
||||
}
|
||||
if it.cur.Next() {
|
||||
return true
|
||||
}
|
||||
|
@ -550,17 +580,35 @@ type chunkSeriesIterator struct {
|
|||
|
||||
i int
|
||||
cur chunks.Iterator
|
||||
|
||||
maxt, mint int64
|
||||
}
|
||||
|
||||
func newChunkSeriesIterator(cs []*ChunkMeta) *chunkSeriesIterator {
|
||||
func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator {
|
||||
return &chunkSeriesIterator{
|
||||
chunks: cs,
|
||||
i: 0,
|
||||
cur: cs[0].Chunk.Iterator(),
|
||||
|
||||
mint: mint,
|
||||
maxt: maxt,
|
||||
}
|
||||
}
|
||||
|
||||
func (it *chunkSeriesIterator) inBounds(t int64) bool {
|
||||
return t >= it.mint && t <= it.maxt
|
||||
}
|
||||
|
||||
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
|
||||
if t > it.maxt {
|
||||
return false
|
||||
}
|
||||
|
||||
// Seek to the first valid value after t.
|
||||
if t < it.mint {
|
||||
t = it.mint
|
||||
}
|
||||
|
||||
// Only do binary search forward to stay in line with other iterators
|
||||
// that can only move forward.
|
||||
x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t })
|
||||
|
@ -569,10 +617,10 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
|
|||
// If the timestamp was not found, it might be in the last chunk.
|
||||
if x == len(it.chunks) {
|
||||
x--
|
||||
}
|
||||
// Go to previous chunk if the chunk doesn't exactly start with t.
|
||||
// If we are already at the first chunk, we use it as it's the best we have.
|
||||
if x > 0 && it.chunks[x].MinTime > t {
|
||||
|
||||
// Go to previous chunk if the chunk doesn't exactly start with t.
|
||||
// If we are already at the first chunk, we use it as it's the best we have.
|
||||
} else if x > 0 && it.chunks[x].MinTime > t {
|
||||
x--
|
||||
}
|
||||
|
||||
|
@ -593,9 +641,13 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) {
|
|||
}
|
||||
|
||||
func (it *chunkSeriesIterator) Next() bool {
|
||||
if it.cur.Next() {
|
||||
return true
|
||||
for it.cur.Next() {
|
||||
t, _ := it.cur.At()
|
||||
if it.inBounds(t) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
if err := it.cur.Err(); err != nil {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -1,3 +1,16 @@
|
|||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
|
@ -134,7 +147,7 @@ func (w *WAL) initSegments() error {
|
|||
if len(fns) == 0 {
|
||||
return nil
|
||||
}
|
||||
// We must open all file in read mode as we may have to truncate along
|
||||
// We must open all files in read/write mode as we may have to truncate along
|
||||
// the way and any file may become the tail.
|
||||
for _, fn := range fns {
|
||||
f, err := os.OpenFile(fn, os.O_RDWR, 0666)
|
||||
|
@ -165,10 +178,10 @@ func (w *WAL) initSegments() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// cut finishes the currently active segments and open the next one.
|
||||
// cut finishes the currently active segments and opens the next one.
|
||||
// The encoder is reset to point to the new segment.
|
||||
func (w *WAL) cut() error {
|
||||
// Sync current tail to disc and close.
|
||||
// Sync current tail to disk and close.
|
||||
if tf := w.tail(); tf != nil {
|
||||
if err := w.sync(); err != nil {
|
||||
return err
|
||||
|
@ -263,7 +276,7 @@ func (w *WAL) run(interval time.Duration) {
|
|||
}
|
||||
}
|
||||
|
||||
// Close sync all data and closes the underlying resources.
|
||||
// Close syncs all data and closes the underlying resources.
|
||||
func (w *WAL) Close() error {
|
||||
close(w.stopc)
|
||||
<-w.donec
|
||||
|
@ -296,9 +309,10 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
|
|||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
|
||||
// Cut to the next segment if exceeds the file size unless it would also
|
||||
// Cut to the next segment if the entry exceeds the file size unless it would also
|
||||
// exceed the size of a new segment.
|
||||
var (
|
||||
// 6-byte header + 4-byte CRC32 + buf.
|
||||
sz = int64(6 + 4 + len(buf))
|
||||
newsz = w.curN + sz
|
||||
)
|
||||
|
|
|
@ -645,22 +645,22 @@
|
|||
"revisionTime": "2016-04-11T19:08:41Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "R7aNHvNnDiTb6t7z8FehPAo3PdM=",
|
||||
"checksumSHA1": "0wu/AzUWMurN/T5VBKCrvhf7c7E=",
|
||||
"path": "github.com/prometheus/tsdb",
|
||||
"revision": "721df536eb1a6e6f91785f138ca24917222f8461",
|
||||
"revisionTime": "2017-04-09T08:18:19Z"
|
||||
"revision": "44769c1654f699931b2d3a2928326ac2d02d9149",
|
||||
"revisionTime": "2017-05-09T10:52:47Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "Qwlzvcx5Lbo9Nzb75AGgiiGszZI=",
|
||||
"checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=",
|
||||
"path": "github.com/prometheus/tsdb/chunks",
|
||||
"revision": "10c7c9acbe0175a411bce90cd7a0d9d7a13d6a83",
|
||||
"revisionTime": "2017-04-04T09:27:26Z"
|
||||
"revision": "44769c1654f699931b2d3a2928326ac2d02d9149",
|
||||
"revisionTime": "2017-05-09T10:52:47Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "ed5dnejBTbr0FKdzKRAC91bHRgE=",
|
||||
"checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=",
|
||||
"path": "github.com/prometheus/tsdb/labels",
|
||||
"revision": "770d00800212502dfef71a6a7df23e3ced4459d9",
|
||||
"revisionTime": "2017-04-05T12:14:30Z"
|
||||
"revision": "44769c1654f699931b2d3a2928326ac2d02d9149",
|
||||
"revisionTime": "2017-05-09T10:52:47Z"
|
||||
},
|
||||
{
|
||||
"checksumSHA1": "+49Vr4Me28p3cR+gxX5SUQHbbas=",
|
||||
|
|
Loading…
Reference in New Issue