Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes #390

Merged
merged 3 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion collector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -183,13 +184,15 @@ func (c *MetricsClient) FetchMetricsIterator(target string) (*prompb.Iterator, e
}

if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
return nil, fmt.Errorf("collect node metrics for %s: %s", target, resp.Status)
}

br := resp.Body
if resp.Header.Get("Content-Encoding") == "gzip" {
br, err = gzip.NewReader(resp.Body)
br, err = NewGzipReaderWithClose(resp.Body)
if err != nil {
defer br.Close()
return nil, fmt.Errorf("collect node metrics for %s: %w", target, err)
}
}
Expand Down Expand Up @@ -267,3 +270,30 @@ func (c *MetricsClient) refreshToken() {
}
}
}

// gzipReaderWithClose wraps gzip.Reader and the underlying reader
type gzipReaderWithClose struct {
*gzip.Reader
underlying io.ReadCloser
}

// Close closes both the gzip.Reader and the underlying reader
func (gr *gzipReaderWithClose) Close() error {
err := gr.Reader.Close()
if err != nil {
return err
}
return gr.underlying.Close()
}

// NewGzipReaderWithClose creates a new gzipReaderWithClose
func NewGzipReaderWithClose(r io.ReadCloser) (*gzipReaderWithClose, error) {
gr, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
return &gzipReaderWithClose{
Reader: gr,
underlying: r,
}, nil
}
5 changes: 5 additions & 0 deletions collector/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (s *Scraper) scrapeTargets(ctx context.Context) {
scrapeTime := time.Now().UnixNano() / 1e6
wr := &prompb.WriteRequest{}
for _, target := range targets {
logger.Infof("Scraping %s", target.String())
iter, err := s.scrapeClient.FetchMetricsIterator(target.Addr)
if err != nil {
logger.Errorf("Failed to scrape %s: %s", target.Addr, err.Error())
Expand Down Expand Up @@ -257,6 +258,10 @@ func (s *Scraper) scrapeTargets(ctx context.Context) {
wr.Timeseries = append(wr.Timeseries, ts)
wr = s.flushBatchIfNecessary(ctx, wr)
}
if err := iter.Err(); err != nil {
logger.Errorf("Failed to scrape %s: %s", target.Addr, err.Error())
}

if err := iter.Close(); err != nil {
logger.Errorf("Failed to close iterator: %s", err.Error())
}
Expand Down
2 changes: 0 additions & 2 deletions ingestor/cluster/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,6 @@ func (b *batcher) BatchSegments() error {
// thresholds. In addition, the batches are ordered as oldest first to allow for prioritizing
// lagging segments over new ones.
func (b *batcher) processSegments() ([]*Batch, []*Batch, error) {
metrics.IngestorSegmentsTotal.Reset()

// Groups is b map of metrics name to b list of segments for that metric.
groups := make(map[string][]wal.SegmentInfo)

Expand Down
8 changes: 8 additions & 0 deletions pkg/prompb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func NewIterator(r io.ReadCloser) *Iterator {

func (i *Iterator) Next() bool {
if i.scanner.Scan() {
if i.scanner.Err() != nil {
return false
}

i.current = i.scanner.Text()
if i.isComment(i.current) || i.isSpace(i.current) {
return i.Next()
Expand Down Expand Up @@ -168,6 +172,10 @@ func parseLabels(labels Labels, line string) (Labels, string, error) {
}

key := line[:idx]
if len(key) == 0 {
return nil, "", fmt.Errorf("invalid label: no key: %s", orig)
}

line = line[idx+1:]

if len(line) == 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/prompb/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func TestIterator_TimeSeries_Malformed(t *testing.T) {
`http_requests_total {key="a",b="c"} 1 a`,
`http_requests_total {} 1 a`,
`http_requests_total {} `,
`http_requests_total {a="b",=} 1027`,
} {
t.Run(c, func(t *testing.T) {
iter := NewIterator(io.NopCloser(strings.NewReader(c)))
Expand Down
Loading