|
|
|
@ -41,8 +41,8 @@ const (
|
|
|
|
|
entrySize int = 1000 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func parseBrokenJson(brokenJson []byte) (bool, string) { |
|
|
|
|
queries := strings.ReplaceAll(string(brokenJson), "\x00", "") |
|
|
|
|
func parseBrokenJSON(brokenJSON []byte) (bool, string) { |
|
|
|
|
queries := strings.ReplaceAll(string(brokenJSON), "\x00", "") |
|
|
|
|
if len(queries) > 0 { |
|
|
|
|
queries = queries[:len(queries)-1] + "]" |
|
|
|
|
} |
|
|
|
@ -63,14 +63,14 @@ func logUnfinishedQueries(filename string, filesize int, logger log.Logger) {
|
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
brokenJson := make([]byte, filesize) |
|
|
|
|
_, err = fd.Read(brokenJson) |
|
|
|
|
brokenJSON := make([]byte, filesize) |
|
|
|
|
_, err = fd.Read(brokenJSON) |
|
|
|
|
if err != nil { |
|
|
|
|
level.Error(logger).Log("msg", "Failed to read query log file", "err", err) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
queriesExist, queries := parseBrokenJson(brokenJson) |
|
|
|
|
queriesExist, queries := parseBrokenJSON(brokenJSON) |
|
|
|
|
if !queriesExist { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
@ -141,7 +141,7 @@ func trimStringByBytes(str string, size int) string {
|
|
|
|
|
return string(bytesStr[:trimIndex]) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func _newJsonEntry(query string, timestamp int64, logger log.Logger) []byte { |
|
|
|
|
func _newJSONEntry(query string, timestamp int64, logger log.Logger) []byte { |
|
|
|
|
entry := Entry{query, timestamp} |
|
|
|
|
jsonEntry, err := json.Marshal(entry) |
|
|
|
|
|
|
|
|
@ -153,12 +153,12 @@ func _newJsonEntry(query string, timestamp int64, logger log.Logger) []byte {
|
|
|
|
|
return jsonEntry |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newJsonEntry(query string, logger log.Logger) []byte { |
|
|
|
|
func newJSONEntry(query string, logger log.Logger) []byte { |
|
|
|
|
timestamp := time.Now().Unix() |
|
|
|
|
minEntryJson := _newJsonEntry("", timestamp, logger) |
|
|
|
|
minEntryJSON := _newJSONEntry("", timestamp, logger) |
|
|
|
|
|
|
|
|
|
query = trimStringByBytes(query, entrySize-(len(minEntryJson)+1)) |
|
|
|
|
jsonEntry := _newJsonEntry(query, timestamp, logger) |
|
|
|
|
query = trimStringByBytes(query, entrySize-(len(minEntryJSON)+1)) |
|
|
|
|
jsonEntry := _newJSONEntry(query, timestamp, logger) |
|
|
|
|
|
|
|
|
|
return jsonEntry |
|
|
|
|
} |
|
|
|
@ -176,7 +176,7 @@ func (tracker ActiveQueryTracker) Delete(insertIndex int) {
|
|
|
|
|
|
|
|
|
|
func (tracker ActiveQueryTracker) Insert(query string) int { |
|
|
|
|
i, fileBytes := <-tracker.getNextIndex, tracker.mmapedFile |
|
|
|
|
entry := newJsonEntry(query, tracker.logger) |
|
|
|
|
entry := newJSONEntry(query, tracker.logger) |
|
|
|
|
start, end := i, i+entrySize |
|
|
|
|
|
|
|
|
|
copy(fileBytes[start:], entry) |
|
|
|
|