Merge pull request #14000 from aknuds1/arve/query-logger-munmap

promql.ActiveQueryTracker: Unmap mmapped file when done
pull/14154/head
Arve Knudsen 6 months ago committed by GitHub
commit 707e9d917e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -21,6 +21,7 @@ import (
"os" "os"
"sort" "sort"
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
@ -58,7 +59,9 @@ func TestQueryConcurrency(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
queryTracker := promql.NewActiveQueryTracker(dir, maxConcurrency, nil) queryTracker := promql.NewActiveQueryTracker(dir, maxConcurrency, nil)
t.Cleanup(queryTracker.Close) t.Cleanup(func() {
require.NoError(t, queryTracker.Close())
})
opts := promql.EngineOpts{ opts := promql.EngineOpts{
Logger: nil, Logger: nil,
@ -90,9 +93,14 @@ func TestQueryConcurrency(t *testing.T) {
return nil return nil
} }
var wg sync.WaitGroup
for i := 0; i < maxConcurrency; i++ { for i := 0; i < maxConcurrency; i++ {
q := engine.NewTestQuery(f) q := engine.NewTestQuery(f)
go q.Exec(ctx) wg.Add(1)
go func() {
q.Exec(ctx)
wg.Done()
}()
select { select {
case <-processing: case <-processing:
// Expected. // Expected.
@ -102,7 +110,11 @@ func TestQueryConcurrency(t *testing.T) {
} }
q := engine.NewTestQuery(f) q := engine.NewTestQuery(f)
go q.Exec(ctx) wg.Add(1)
go func() {
q.Exec(ctx)
wg.Done()
}()
select { select {
case <-processing: case <-processing:
@ -125,6 +137,8 @@ func TestQueryConcurrency(t *testing.T) {
for i := 0; i < maxConcurrency; i++ { for i := 0; i < maxConcurrency; i++ {
block <- struct{}{} block <- struct{}{}
} }
wg.Wait()
} }
// contextDone returns an error if the context was canceled or timed out. // contextDone returns an error if the context was canceled or timed out.

@ -16,6 +16,8 @@ package promql
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
@ -36,6 +38,8 @@ type ActiveQueryTracker struct {
maxConcurrent int maxConcurrent int
} }
var _ io.Closer = &ActiveQueryTracker{}
type Entry struct { type Entry struct {
Query string `json:"query"` Query string `json:"query"`
Timestamp int64 `json:"timestamp_sec"` Timestamp int64 `json:"timestamp_sec"`
@ -83,6 +87,23 @@ func logUnfinishedQueries(filename string, filesize int, logger log.Logger) {
} }
} }
type mmapedFile struct {
f io.Closer
m mmap.MMap
}
func (f *mmapedFile) Close() error {
err := f.m.Unmap()
if err != nil {
err = fmt.Errorf("mmapedFile: unmapping: %w", err)
}
if fErr := f.f.Close(); fErr != nil {
return errors.Join(fmt.Errorf("close mmapedFile.f: %w", fErr), err)
}
return err
}
func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io.Closer, error) { func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io.Closer, error) {
file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666) file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666)
if err != nil { if err != nil {
@ -108,7 +129,7 @@ func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, io
return nil, nil, err return nil, nil, err
} }
return fileAsBytes, file, err return fileAsBytes, &mmapedFile{f: file, m: fileAsBytes}, err
} }
func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker { func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker {
@ -204,9 +225,13 @@ func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int
} }
} }
func (tracker *ActiveQueryTracker) Close() { // Close closes tracker.
func (tracker *ActiveQueryTracker) Close() error {
if tracker == nil || tracker.closer == nil { if tracker == nil || tracker.closer == nil {
return return nil
}
if err := tracker.closer.Close(); err != nil {
return fmt.Errorf("close ActiveQueryTracker.closer: %w", err)
} }
tracker.closer.Close() return nil
} }

@ -16,6 +16,7 @@ package promql
import ( import (
"context" "context"
"os" "os"
"path/filepath"
"testing" "testing"
"github.com/grafana/regexp" "github.com/grafana/regexp"
@ -104,26 +105,26 @@ func TestIndexReuse(t *testing.T) {
} }
func TestMMapFile(t *testing.T) { func TestMMapFile(t *testing.T) {
file, err := os.CreateTemp("", "mmapedFile") dir := t.TempDir()
require.NoError(t, err) fpath := filepath.Join(dir, "mmapedFile")
const data = "ab"
filename := file.Name()
defer os.Remove(filename)
fileAsBytes, _, err := getMMapedFile(filename, 2, nil)
fileAsBytes, closer, err := getMMapedFile(fpath, 2, nil)
require.NoError(t, err) require.NoError(t, err)
copy(fileAsBytes, "ab") copy(fileAsBytes, data)
require.NoError(t, closer.Close())
f, err := os.Open(filename) f, err := os.Open(fpath)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() {
_ = f.Close()
})
bytes := make([]byte, 4) bytes := make([]byte, 4)
n, err := f.Read(bytes) n, err := f.Read(bytes)
require.Equal(t, 2, n)
require.NoError(t, err, "Unexpected error while reading file.") require.NoError(t, err, "Unexpected error while reading file.")
require.Equal(t, 2, n)
require.Equal(t, fileAsBytes, bytes[:2], "Mmap failed") require.Equal(t, []byte(data), bytes[:2], "Mmap failed")
} }
func TestParseBrokenJSON(t *testing.T) { func TestParseBrokenJSON(t *testing.T) {

Loading…
Cancel
Save