mirror of https://github.com/prometheus/prometheus
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
236 lines
5.8 KiB
236 lines
5.8 KiB
// Copyright 2019 The Prometheus Authors |
|
// Licensed under the Apache License, Version 2.0 (the "License"); |
|
// you may not use this file except in compliance with the License. |
|
// You may obtain a copy of the License at |
|
// |
|
// http://www.apache.org/licenses/LICENSE-2.0 |
|
// |
|
// Unless required by applicable law or agreed to in writing, software |
|
// distributed under the License is distributed on an "AS IS" BASIS, |
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
// See the License for the specific language governing permissions and |
|
// limitations under the License. |
|
|
|
package promql |
|
|
|
import ( |
|
"context" |
|
"encoding/json" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"log/slog" |
|
"os" |
|
"path/filepath" |
|
"strings" |
|
"time" |
|
"unicode/utf8" |
|
|
|
"github.com/edsrzf/mmap-go" |
|
) |
|
|
|
type ActiveQueryTracker struct { |
|
mmappedFile []byte |
|
getNextIndex chan int |
|
logger *slog.Logger |
|
closer io.Closer |
|
maxConcurrent int |
|
} |
|
|
|
var _ io.Closer = &ActiveQueryTracker{} |
|
|
|
type Entry struct { |
|
Query string `json:"query"` |
|
Timestamp int64 `json:"timestamp_sec"` |
|
} |
|
|
|
const ( |
|
entrySize int = 1000 |
|
) |
|
|
|
func parseBrokenJSON(brokenJSON []byte) (string, bool) { |
|
queries := strings.ReplaceAll(string(brokenJSON), "\x00", "") |
|
if len(queries) > 0 { |
|
queries = queries[:len(queries)-1] + "]" |
|
} |
|
|
|
// Conditional because of implementation detail: len() = 1 implies file consisted of a single char: '['. |
|
if len(queries) <= 1 { |
|
return "[]", false |
|
} |
|
|
|
return queries, true |
|
} |
|
|
|
func logUnfinishedQueries(filename string, filesize int, logger *slog.Logger) { |
|
if _, err := os.Stat(filename); err == nil { |
|
fd, err := os.Open(filename) |
|
if err != nil { |
|
logger.Error("Failed to open query log file", "err", err) |
|
return |
|
} |
|
defer fd.Close() |
|
|
|
brokenJSON := make([]byte, filesize) |
|
_, err = fd.Read(brokenJSON) |
|
if err != nil { |
|
logger.Error("Failed to read query log file", "err", err) |
|
return |
|
} |
|
|
|
queries, queriesExist := parseBrokenJSON(brokenJSON) |
|
if !queriesExist { |
|
return |
|
} |
|
logger.Info("These queries didn't finish in prometheus' last run:", "queries", queries) |
|
} |
|
} |
|
|
|
type mmappedFile struct { |
|
f io.Closer |
|
m mmap.MMap |
|
} |
|
|
|
func (f *mmappedFile) Close() error { |
|
err := f.m.Unmap() |
|
if err != nil { |
|
err = fmt.Errorf("mmappedFile: unmapping: %w", err) |
|
} |
|
if fErr := f.f.Close(); fErr != nil { |
|
return errors.Join(fmt.Errorf("close mmappedFile.f: %w", fErr), err) |
|
} |
|
|
|
return err |
|
} |
|
|
|
func getMMappedFile(filename string, filesize int, logger *slog.Logger) ([]byte, io.Closer, error) { |
|
file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666) |
|
if err != nil { |
|
absPath, pathErr := filepath.Abs(filename) |
|
if pathErr != nil { |
|
absPath = filename |
|
} |
|
logger.Error("Error opening query log file", "file", absPath, "err", err) |
|
return nil, nil, err |
|
} |
|
|
|
err = file.Truncate(int64(filesize)) |
|
if err != nil { |
|
file.Close() |
|
logger.Error("Error setting filesize.", "filesize", filesize, "err", err) |
|
return nil, nil, err |
|
} |
|
|
|
fileAsBytes, err := mmap.Map(file, mmap.RDWR, 0) |
|
if err != nil { |
|
file.Close() |
|
logger.Error("Failed to mmap", "file", filename, "Attempted size", filesize, "err", err) |
|
return nil, nil, err |
|
} |
|
|
|
return fileAsBytes, &mmappedFile{f: file, m: fileAsBytes}, err |
|
} |
|
|
|
func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger *slog.Logger) *ActiveQueryTracker { |
|
err := os.MkdirAll(localStoragePath, 0o777) |
|
if err != nil { |
|
logger.Error("Failed to create directory for logging active queries") |
|
} |
|
|
|
filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxConcurrent*entrySize |
|
logUnfinishedQueries(filename, filesize, logger) |
|
|
|
fileAsBytes, closer, err := getMMappedFile(filename, filesize, logger) |
|
if err != nil { |
|
panic("Unable to create mmap-ed active query log") |
|
} |
|
|
|
copy(fileAsBytes, "[") |
|
activeQueryTracker := ActiveQueryTracker{ |
|
mmappedFile: fileAsBytes, |
|
closer: closer, |
|
getNextIndex: make(chan int, maxConcurrent), |
|
logger: logger, |
|
maxConcurrent: maxConcurrent, |
|
} |
|
|
|
activeQueryTracker.generateIndices(maxConcurrent) |
|
|
|
return &activeQueryTracker |
|
} |
|
|
|
func trimStringByBytes(str string, size int) string { |
|
bytesStr := []byte(str) |
|
|
|
trimIndex := len(bytesStr) |
|
if size < len(bytesStr) { |
|
for !utf8.RuneStart(bytesStr[size]) { |
|
size-- |
|
} |
|
trimIndex = size |
|
} |
|
|
|
return string(bytesStr[:trimIndex]) |
|
} |
|
|
|
func _newJSONEntry(query string, timestamp int64, logger *slog.Logger) []byte { |
|
entry := Entry{query, timestamp} |
|
jsonEntry, err := json.Marshal(entry) |
|
if err != nil { |
|
logger.Error("Cannot create json of query", "query", query) |
|
return []byte{} |
|
} |
|
|
|
return jsonEntry |
|
} |
|
|
|
func newJSONEntry(query string, logger *slog.Logger) []byte { |
|
timestamp := time.Now().Unix() |
|
minEntryJSON := _newJSONEntry("", timestamp, logger) |
|
|
|
query = trimStringByBytes(query, entrySize-(len(minEntryJSON)+1)) |
|
jsonEntry := _newJSONEntry(query, timestamp, logger) |
|
|
|
return jsonEntry |
|
} |
|
|
|
func (tracker ActiveQueryTracker) generateIndices(maxConcurrent int) { |
|
for i := 0; i < maxConcurrent; i++ { |
|
tracker.getNextIndex <- 1 + (i * entrySize) |
|
} |
|
} |
|
|
|
func (tracker ActiveQueryTracker) GetMaxConcurrent() int { |
|
return tracker.maxConcurrent |
|
} |
|
|
|
func (tracker ActiveQueryTracker) Delete(insertIndex int) { |
|
copy(tracker.mmappedFile[insertIndex:], strings.Repeat("\x00", entrySize)) |
|
tracker.getNextIndex <- insertIndex |
|
} |
|
|
|
func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) { |
|
select { |
|
case i := <-tracker.getNextIndex: |
|
fileBytes := tracker.mmappedFile |
|
entry := newJSONEntry(query, tracker.logger) |
|
start, end := i, i+entrySize |
|
|
|
copy(fileBytes[start:], entry) |
|
copy(fileBytes[end-1:], ",") |
|
return i, nil |
|
case <-ctx.Done(): |
|
return 0, ctx.Err() |
|
} |
|
} |
|
|
|
// Close closes tracker. |
|
func (tracker *ActiveQueryTracker) Close() error { |
|
if tracker == nil || tracker.closer == nil { |
|
return nil |
|
} |
|
if err := tracker.closer.Close(); err != nil { |
|
return fmt.Errorf("close ActiveQueryTracker.closer: %w", err) |
|
} |
|
return nil |
|
}
|
|
|