diff --git a/CHANGELOG.md b/CHANGELOG.md index 758e9a3..daa173a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ ## unreleased + +* [FEATURE] add support for all metric types; deprecated --metric-count flag; --*-interval flags set to 0 means no change; added OpenMetrics support. + +## 0.5.0 / 2024-09-15 + * [FEATURE] add two new operation modes for metrics generation and cycling; gradual-change between min and max and double-halve #64 +* (other changes, not captured due to lost 0.4.0 tag) ## 0.4.0 / 2022-03-08 diff --git a/README.md b/README.md index e1678dc..e2b87e4 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,11 @@ # avalanche -Avalanche serves a text-based [Prometheus metrics](https://prometheus.io/docs/instrumenting/exposition_formats/) endpoint for load testing [Prometheus](https://prometheus.io/) and possibly other [OpenMetrics](https://github.com/OpenObservability/OpenMetrics) consumers. +Avalanche is a load-testing binary capable of generating metrics that can be either: -Avalanche also supports load testing for services accepting data via Prometheus remote_write API such as [Thanos](https://github.com/improbable-eng/thanos), [Cortex](https://github.com/weaveworks/cortex), [M3DB](https://m3db.github.io/m3/integrations/prometheus/), [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/) and other services [listed here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage). +* scraped via [Prometheus scrape formats](https://prometheus.io/docs/instrumenting/exposition_formats/) (including [OpenMetrics](https://github.com/OpenObservability/OpenMetrics)) endpoint. +* written via Prometheus Remote Write (v1 only for now) to a target endpoint. + +This allows load testing services that can scrape (e.g. Prometheus, OpenTelemetry Collector and so), as well as, services accepting data via Prometheus remote_write API such as [Thanos](https://github.com/thanos-io/thanos), [Cortex](https://github.com/cortexproject/cortex), [M3DB](https://m3db.github.io/m3/integrations/prometheus/), [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/) and other services [listed here](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage). Metric names and unique series change over time to simulate series churn. diff --git a/cmd/avalanche.go b/cmd/avalanche.go index 7c5725c..b856fbd 100644 --- a/cmd/avalanche.go +++ b/cmd/avalanche.go @@ -14,51 +14,29 @@ package main import ( + "context" "crypto/tls" "fmt" "log" "math/rand" + "net/http" "os" "strconv" "sync" + "syscall" "time" + "github.com/nelkinda/health-go" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/version" - kingpin "gopkg.in/alecthomas/kingpin.v2" + "gopkg.in/alecthomas/kingpin.v2" "github.com/prometheus-community/avalanche/metrics" "github.com/prometheus-community/avalanche/pkg/download" ) -var ( - metricCount = kingpin.Flag("metric-count", "Number of metrics to serve.").Default("500").Int() - labelCount = kingpin.Flag("label-count", "Number of labels per-metric.").Default("10").Int() - seriesCount = kingpin.Flag("series-count", "Number of series per-metric.").Default("100").Int() - seriesChangeRate = kingpin.Flag("series-change-rate", "The rate at which the number of active series changes over time. Applies to 'gradual-change' mode.").Default("100").Int() - maxSeriesCount = kingpin.Flag("max-series-count", "Maximum number of series to serve. Applies to 'gradual-change' mode.").Default("1000").Int() - minSeriesCount = kingpin.Flag("min-series-count", "Minimum number of series to serve. Applies to 'gradual-change' mode.").Default("100").Int() - spikeMultiplier = kingpin.Flag("spike-multiplier", "Multiplier for the spike mode.").Default("1.5").Float64() - metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int() - labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int() - constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings() - valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int() - labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int() - metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int() - seriesChangeInterval = kingpin.Flag("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change', 'double-halve' and 'spike' modes.").Default("30").Int() - seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve', 'spike'").Default("default").String() - port = kingpin.Flag("port", "Port to serve at").Default("9001").Int() - remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() - remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() - remotePprofInterval = kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles.When not provided it will download a profile once before the end of the test.").Duration() - remoteBatchSize = kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() - remoteRequestCount = kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int() - remoteReqsInterval = kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() - remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String() - tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool() - remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String() - outOfOrder = kingpin.Flag("out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").Bool() -) - func main() { kingpin.Version(version.Print("avalanche")) log.SetFlags(log.Ltime | log.Lshortfile) // Show file name and line in logs. @@ -84,27 +62,35 @@ func main() { " then returns it to the original count on the next tick. This pattern repeats indefinitely,\n" + " creating a spiking effect in the series count.\n" + cfg := metrics.NewConfigFromFlags(kingpin.Flag) + port := kingpin.Flag("port", "Port to serve at").Default("9001").Int() + remoteURL := kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() + // TODO(bwplotka): Kill pprof feature, you can install OSS continuous profiling easily instead. + remotePprofURLs := kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() + remotePprofInterval := kingpin.Flag("remote-pprof-interval", "how often to download pprof profiles. When not provided it will download a profile once before the end of the test.").Duration() + remoteBatchSize := kingpin.Flag("remote-batch-size", "how many samples to send with each remote_write API request.").Default("2000").Int() + remoteRequestCount := kingpin.Flag("remote-requests-count", "how many requests to send in total to the remote_write API.").Default("100").Int() + remoteReqsInterval := kingpin.Flag("remote-write-interval", "delay between each remote write request.").Default("100ms").Duration() + remoteTenant := kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String() + tlsClientInsecure := kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool() + remoteTenantHeader := kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String() + // TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time). + outOfOrder := kingpin.Flag("out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").Bool() + kingpin.Parse() - if *maxSeriesCount <= *minSeriesCount { - fmt.Fprintf(os.Stderr, "Error: --max-series-count (%d) must be greater than --min-series-count (%d)\n", *maxSeriesCount, *minSeriesCount) - os.Exit(1) - } - if *minSeriesCount < 0 { - fmt.Fprintf(os.Stderr, "Error: --min-series-count must be 0 or higher, got %d\n", *minSeriesCount) - os.Exit(1) - } - if *seriesChangeRate <= 0 { - fmt.Fprintf(os.Stderr, "Error: --series-change-rate must be greater than 0, got %d\n", *seriesChangeRate) - os.Exit(1) + if err := cfg.Validate(); err != nil { + kingpin.FatalUsage("configuration error: %v", err) } - stop := make(chan struct{}) - defer close(stop) - updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *spikeMultiplier, *seriesOperationMode, *constLabels, stop) - if err != nil { - log.Fatal(err) - } + collector := metrics.NewCollector(*cfg) + reg := prometheus.NewRegistry() + reg.MustRegister(collector) + + var g run.Group + g.Add(run.SignalHandler(context.Background(), os.Interrupt, syscall.SIGTERM)) + g.Add(collector.Run, collector.Stop) + // One-off remote write send mode. if *remoteURL != nil { if (**remoteURL).Host == "" || (**remoteURL).Scheme == "" { log.Fatal("remote host and scheme can't be empty") @@ -118,7 +104,7 @@ func main() { RequestInterval: *remoteReqsInterval, BatchSize: *remoteBatchSize, RequestCount: *remoteRequestCount, - UpdateNotify: updateNotify, + UpdateNotify: collector.UpdateNotifyCh(), Tenant: *remoteTenant, TLSClientConfig: tls.Config{ InsecureSkipVerify: *tlsClientInsecure, @@ -158,11 +144,10 @@ func main() { wg.Done() } }() - } + // First cut: just send the metrics once then exit - err := metrics.SendRemoteWrite(config) - if err != nil { + if err := metrics.SendRemoteWrite(config, reg); err != nil { log.Fatal(err) } if *remotePprofInterval > 0 { @@ -172,9 +157,16 @@ func main() { return } - fmt.Printf("Serving your metrics at localhost:%v/metrics\n", *port) - err = metrics.ServeMetrics(*port) - if err != nil { - log.Fatal(err) - } + // Standard mode for continuous exposure of metrics. + httpSrv := &http.Server{Addr: fmt.Sprintf(":%v", *port)} + g.Add(func() error { + fmt.Printf("Serving your metrics at :%v/metrics\n", *port) + http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{ + EnableOpenMetrics: true, + })) + http.HandleFunc("/health", health.New(health.Health{}).Handler) + return httpSrv.ListenAndServe() + }, func(_ error) { + _ = httpSrv.Shutdown(context.Background()) + }) } diff --git a/go.mod b/go.mod index a3e4382..76e4540 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 github.com/nelkinda/health-go v0.0.1 + github.com/oklog/run v1.1.0 github.com/prometheus/client_golang v1.20.3 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.57.0 diff --git a/go.sum b/go.sum index 7e2c816..c8de86a 100644 --- a/go.sum +++ b/go.sum @@ -92,6 +92,8 @@ github.com/nelkinda/health-go v0.0.1/go.mod h1:oNvFVrveHIH/xPW5DqjFfdtlyhLXHFmNz github.com/nelkinda/http-go v0.0.1 h1:RL3RttZzzs/kzQaVPmtv5dxMaWValqCqGNjCXyjPI1k= github.com/nelkinda/http-go v0.0.1/go.mod h1:DxPiZGVufTVSeO63nmVR5QO01TmSC0HHtEIZTHL5QEk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= +github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/metrics/serve.go b/metrics/serve.go index e0d033c..407e7ed 100644 --- a/metrics/serve.go +++ b/metrics/serve.go @@ -15,39 +15,238 @@ package metrics import ( "fmt" + "math" "math/rand" - "net/http" "strings" "sync" "time" - health "github.com/nelkinda/health-go" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" + "gopkg.in/alecthomas/kingpin.v2" ) -var ( - promRegistry = prometheus.NewRegistry() // local Registry so we don't get Go metrics, etc. - valGenerator = rand.New(rand.NewSource(time.Now().UnixNano())) - metrics = make([]*prometheus.GaugeVec, 0) - metricsMux = &sync.Mutex{} -) +type Collector struct { + cfg Config + valGen *rand.Rand + + gauges []*prometheus.GaugeVec + counters []*prometheus.CounterVec + histograms []*prometheus.HistogramVec + nativeHistograms []*prometheus.HistogramVec + summaries []*prometheus.SummaryVec + + updateNotifyCh chan struct{} + stopCh chan struct{} + valueTick *time.Ticker + seriesTick *time.Ticker + metricTick *time.Ticker + changeSeriesTick *time.Ticker + + mu sync.Mutex +} + +// NewCollector returns Prometheus collector that can be registered in registry +// that handles metric creation and changes, based on the given configuration. +func NewCollector(cfg Config) *Collector { + if cfg.GaugeMetricCount == 0 { + cfg.GaugeMetricCount = cfg.MetricCount // Handle deprecated field. + } -func registerMetrics(metricCount, metricLength, metricCycle int, labelKeys []string) { - metrics = make([]*prometheus.GaugeVec, metricCount) - for idx := 0; idx < metricCount; idx++ { - gauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Name: fmt.Sprintf("avalanche_metric_%s_%v_%v", strings.Repeat("m", metricLength), metricCycle, idx), - Help: "A tasty metric morsel", - }, append([]string{"series_id", "cycle_id"}, labelKeys...)) - promRegistry.MustRegister(gauge) - metrics[idx] = gauge + c := &Collector{ + cfg: cfg, + valGen: rand.New(rand.NewSource(time.Now().UnixNano())), + updateNotifyCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), } + + if cfg.ValueInterval > 0 { + c.valueTick = time.NewTicker(time.Duration(cfg.ValueInterval) * time.Second) + } + if cfg.SeriesInterval > 0 { + c.seriesTick = time.NewTicker(time.Duration(cfg.SeriesInterval) * time.Second) + } + if cfg.MetricInterval > 0 { + c.metricTick = time.NewTicker(time.Duration(cfg.MetricInterval) * time.Second) + } + if cfg.SeriesChangeInterval > 0 { + c.changeSeriesTick = time.NewTicker(time.Duration(cfg.SeriesChangeInterval) * time.Second) + } + return c } -func unregisterMetrics() { - for _, metric := range metrics { - promRegistry.Unregister(metric) +func (c *Collector) UpdateNotifyCh() chan struct{} { + return c.updateNotifyCh +} + +type Config struct { + MetricCount, GaugeMetricCount, CounterMetricCount, HistogramMetricCount, NativeHistogramMetricCount, SummaryMetricCount int + HistogramBuckets int + + LabelCount, SeriesCount int + MaxSeriesCount, MinSeriesCount int + MetricLength, LabelLength int + + ValueInterval, SeriesInterval, MetricInterval, SeriesChangeInterval, SeriesChangeRate int + + SpikeMultiplier float64 + SeriesOperationMode string + ConstLabels []string +} + +func NewConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *Config { + cfg := &Config{} + flagReg("metric-count", "Number of gauge metrics to serve. DEPRECATED use --gauge-metric-count instead").Default("0"). + IntVar(&cfg.MetricCount) + // 500 in total by default, just a healthy distribution of types. + flagReg("gauge-metric-count", "Number of gauge metrics to serve.").Default("200"). + IntVar(&cfg.GaugeMetricCount) + flagReg("counter-metric-count", "Number of counter metrics to serve.").Default("200"). + IntVar(&cfg.CounterMetricCount) + flagReg("histogram-metric-count", "Number of explicit (classic) histogram metrics to serve.").Default("10"). + IntVar(&cfg.HistogramMetricCount) + flagReg("histogram-metric-bucket-count", "Number of explicit buckets (classic) histogram metrics.").Default("8"). + IntVar(&cfg.HistogramMetricCount) + flagReg("native-histogram-metric-count", "Number of native (exponential) histogram metrics to serve.").Default("0"). + IntVar(&cfg.HistogramBuckets) + flagReg("summary-metric-count", "Number of summary metrics to serve.").Default("0"). + IntVar(&cfg.SummaryMetricCount) + + flagReg("label-count", "Number of labels per-metric.").Default("10"). + IntVar(&cfg.LabelCount) + flagReg("series-count", "Number of series per-metric. This excludes the extra series (e.g. _bucket) that will be added for complex types like classic histograms and summaries.").Default("100"). + IntVar(&cfg.SeriesCount) + flagReg("max-series-count", "Maximum number of series to serve. Applies to 'gradual-change' mode.").Default("1000"). + IntVar(&cfg.MaxSeriesCount) + flagReg("min-series-count", "Minimum number of series to serve. Applies to 'gradual-change' mode.").Default("100"). + IntVar(&cfg.MinSeriesCount) + flagReg("spike-multiplier", "Multiplier for the spike mode.").Default("1.5"). + Float64Var(&cfg.SpikeMultiplier) + flagReg("metricname-length", "Modify length of metric names.").Default("5"). + IntVar(&cfg.MetricLength) + flagReg("labelname-length", "Modify length of label names.").Default("5"). + IntVar(&cfg.LabelLength) + flagReg("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times."). + StringsVar(&cfg.ConstLabels) + + flagReg("value-interval", "Change series values every {interval} seconds. 0 means no change.").Default("30"). + IntVar(&cfg.ValueInterval) + flagReg("series-interval", "Change series_id label values every {interval} seconds. 0 means no change.").Default("60"). + IntVar(&cfg.SeriesInterval) + flagReg("metric-interval", "Change __name__ label values every {interval} seconds. 0 means no change.").Default("120"). + IntVar(&cfg.MetricInterval) + flagReg("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change', 'double-halve' and 'spike' modes. 0 means no change.").Default("30"). + IntVar(&cfg.SeriesChangeInterval) + flagReg("series-change-rate", "The rate at which the number of active series changes over time. Applies to 'gradual-change' mode.").Default("100"). + IntVar(&cfg.SeriesChangeRate) + + flagReg("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve', 'spike'").Default("default"). + StringVar(&cfg.SeriesOperationMode) + return cfg +} + +func (c Config) Validate() error { + if c.MaxSeriesCount <= c.MinSeriesCount { + return fmt.Errorf("--max-series-count (%d) must be greater than --min-series-count (%d)", c.MaxSeriesCount, c.MinSeriesCount) + } + if c.MinSeriesCount < 0 { + return fmt.Errorf("--min-series-count must be 0 or higher, got %d", c.MinSeriesCount) + } + if c.MetricCount > 0 && c.GaugeMetricCount > 0 { + return fmt.Errorf("--metric-count (set to %v) is deprecated and it means the same as --gauge-metric-count (set to %v); both can't be used in the same time", c.MetricCount, c.GaugeMetricCount) + } + for _, cLabel := range c.ConstLabels { + split := strings.Split(cLabel, "=") + if len(split) != 2 { + return fmt.Errorf("constant label argument must have format labelName=labelValue but got %s", cLabel) + } + } + if c.SeriesOperationMode == "gradual-change" && c.SeriesChangeRate <= 0 { + return fmt.Errorf("--series-change-rate must be greater than 0, got %d", c.SeriesChangeRate) + } + if c.SeriesOperationMode == "spike" && c.SpikeMultiplier < 1 { + return fmt.Errorf("--spike-multiplier must be greater than or equal to 1, got %f", c.SpikeMultiplier) + } + return nil +} + +// Describe is used when registering metrics. It's noop avoiding us to have an easier dynamicity. +// No descriptors allow this collector to be "unchecked", but more efficient with what we try to do here. +func (c *Collector) Describe(chan<- *prometheus.Desc) {} + +func (c *Collector) Collect(metricCh chan<- prometheus.Metric) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, m := range c.gauges { + m.Collect(metricCh) + } + for _, m := range c.counters { + m.Collect(metricCh) + } + for _, m := range c.histograms { + m.Collect(metricCh) + } + for _, m := range c.nativeHistograms { + m.Collect(metricCh) + } + for _, m := range c.summaries { + m.Collect(metricCh) + } +} + +func help(mName string) string { + return fmt.Sprintf("Metric %v is generated by https://github.com/prometheus-community/avalanche project allowing you to load test your Prometheus or Prometheus-compatible systems. It's not too long, not too short to simulate, often chunky descriptions on user metrics. It also contains metric name, so help is slighly different across metrics.", mName) +} + +func (c *Collector) recreateMetrics(metricCycle int, labelKeys []string) { + c.mu.Lock() + defer c.mu.Unlock() + for id := range c.gauges { + mName := fmt.Sprintf("avalanche_gauge_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id) + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: mName, Help: help(mName)}, + append([]string{"series_id", "cycle_id"}, labelKeys...), + ) + c.gauges[id] = gauge + } + for id := range c.counters { + mName := fmt.Sprintf("avalanche_counter_metric_%s_%v_%v_total", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id) + counter := prometheus.NewCounterVec( + prometheus.CounterOpts{Name: mName, Help: help(mName)}, + append([]string{"series_id", "cycle_id"}, labelKeys...), + ) + c.counters[id] = counter + } + + bkts := make([]float64, c.cfg.HistogramBuckets) + for i := range bkts { + bkts[i] = 0.0001 * math.Pow10(i) + } + for id := range c.histograms { + mName := fmt.Sprintf("avalanche_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id) + histogram := prometheus.NewHistogramVec( + prometheus.HistogramOpts{Name: mName, Help: help(mName), Buckets: bkts}, + append([]string{"series_id", "cycle_id"}, labelKeys...), + ) + c.histograms[id] = histogram + } + + for id := range c.nativeHistograms { + mName := fmt.Sprintf("avalanche_native_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id) + histogram := prometheus.NewHistogramVec( + prometheus.HistogramOpts{Name: mName, Help: help(mName), NativeHistogramBucketFactor: 1.1}, + append([]string{"series_id", "cycle_id"}, labelKeys...), + ) + c.nativeHistograms[id] = histogram + } + + for id := range c.summaries { + mName := fmt.Sprintf("avalanche_summary_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id) + summary := prometheus.NewSummaryVec( + prometheus.SummaryOpts{Name: mName, Help: help(mName)}, + append([]string{"series_id", "cycle_id"}, labelKeys...), + ) + c.summaries[id] = summary } } @@ -56,15 +255,17 @@ func seriesLabels(seriesID, cycleID int, labelKeys, labelValues []string) promet "series_id": fmt.Sprintf("%v", seriesID), "cycle_id": fmt.Sprintf("%v", cycleID), } - for idx, key := range labelKeys { labels[key] = labelValues[idx] } - return labels } -func deleteValues(labelKeys, labelValues []string, seriesCount, seriesCycle int) { +type seriesDeleter interface { + Delete(labels prometheus.Labels) bool +} + +func deleteValues[T seriesDeleter](metrics []T, labelKeys, labelValues []string, seriesCount, seriesCycle int) { for _, metric := range metrics { for idx := 0; idx < seriesCount; idx++ { labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues) @@ -73,55 +274,91 @@ func deleteValues(labelKeys, labelValues []string, seriesCount, seriesCycle int) } } -func cycleValues(labelKeys, labelValues []string, seriesCount, seriesCycle int) { - for _, metric := range metrics { +func (c *Collector) cycleValues(labelKeys, labelValues []string, seriesCount, seriesCycle int) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, metric := range c.gauges { for idx := 0; idx < seriesCount; idx++ { labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues) - metric.With(labels).Set(float64(valGenerator.Intn(100))) + metric.With(labels).Set(float64(c.valGen.Intn(100))) + } + } + for _, metric := range c.counters { + for idx := 0; idx < seriesCount; idx++ { + labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues) + metric.With(labels).Add(float64(c.valGen.Intn(100))) + } + } + for _, metric := range c.histograms { + for idx := 0; idx < seriesCount; idx++ { + labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues) + metric.With(labels).Observe(float64(c.valGen.Intn(100))) + } + } + for _, metric := range c.nativeHistograms { + for idx := 0; idx < seriesCount; idx++ { + labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues) + metric.With(labels).Observe(float64(c.valGen.Intn(100))) + } + } + for _, metric := range c.summaries { + for idx := 0; idx < seriesCount; idx++ { + labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues) + metric.With(labels).Observe(float64(c.valGen.Intn(100))) } } } -func handleValueTicks(labelKeys, labelValues *[]string, currentSeriesCount, seriesCycle *int, updateNotify chan struct{}, valueTick *time.Ticker) { - for tick := range valueTick.C { - metricsMux.Lock() +func (c *Collector) handleValueTicks(labelKeys, labelValues *[]string, currentSeriesCount, seriesCycle *int) { + if c.valueTick == nil { + return + } + for tick := range c.valueTick.C { fmt.Printf("%v: refreshing metric values\n", tick) - cycleValues(*labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) - metricsMux.Unlock() + c.cycleValues(*labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) select { - case updateNotify <- struct{}{}: + case c.updateNotifyCh <- struct{}{}: default: } } } -func handleSeriesTicks(labelKeys, labelValues *[]string, currentSeriesCount, seriesCycle *int, updateNotify chan struct{}, seriesTick *time.Ticker) { - for tick := range seriesTick.C { - metricsMux.Lock() +func (c *Collector) handleSeriesTicks(labelKeys, labelValues *[]string, currentSeriesCount, seriesCycle *int) { + if c.seriesTick == nil { + return + } + for tick := range c.seriesTick.C { + c.mu.Lock() fmt.Printf("%v: refreshing series cycle\n", tick) - deleteValues(*labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) + deleteValues(c.gauges, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) + deleteValues(c.counters, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) + deleteValues(c.histograms, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) + deleteValues(c.nativeHistograms, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) + deleteValues(c.summaries, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) (*seriesCycle)++ - cycleValues(*labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) - metricsMux.Unlock() + c.mu.Unlock() + c.cycleValues(*labelKeys, *labelValues, *currentSeriesCount, *seriesCycle) select { - case updateNotify <- struct{}{}: + case c.updateNotifyCh <- struct{}{}: default: } } } -func handleMetricTicks(metricCount, metricLength, metricCycle *int, labelKeys *[]string, updateNotify chan struct{}, metricTick *time.Ticker) { - for tick := range metricTick.C { - metricsMux.Lock() +func (c *Collector) handleMetricTicks(metricCycle *int, labelKeys *[]string) { + if c.metricTick == nil { + return + } + + for tick := range c.metricTick.C { fmt.Printf("%v: refreshing metric cycle\n", tick) (*metricCycle)++ - unregisterMetrics() - registerMetrics(*metricCount, *metricLength, *metricCycle, *labelKeys) - metricsMux.Unlock() + c.recreateMetrics(*metricCycle, *labelKeys) select { - case updateNotify <- struct{}{}: + case c.updateNotifyCh <- struct{}{}: default: } } @@ -156,56 +393,58 @@ func changeSeriesDoubleHalve(currentSeriesCount *int, seriesIncrease *bool) { *seriesIncrease = !*seriesIncrease } -func handleDoubleHalveMode(metricCount, metricLength, metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, changeSeriesChan <-chan time.Time, updateNotify chan struct{}) { +func (c *Collector) handleDoubleHalveMode(metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int) { + if c.changeSeriesTick == nil { + return + } + seriesIncrease := true - for tick := range changeSeriesChan { - metricsMux.Lock() - unregisterMetrics() - registerMetrics(metricCount, metricLength, metricCycle, labelKeys) - cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle) - metricsMux.Unlock() + for tick := range c.changeSeriesTick.C { + c.recreateMetrics(metricCycle, labelKeys) + c.cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle) changeSeriesDoubleHalve(currentSeriesCount, &seriesIncrease) fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount) select { - case updateNotify <- struct{}{}: + case c.updateNotifyCh <- struct{}{}: default: } } } -func handleGradualChangeMode(metricCount, metricLength, metricCycle, seriesCycle int, labelKeys, labelValues []string, seriesChangeRate, minSeriesCount, maxSeriesCount int, seriesCount *int, changeSeriesChan <-chan time.Time, updateNotify chan struct{}) { - *seriesCount = minSeriesCount - seriesIncrease := true +func (c *Collector) handleGradualChangeMode(metricCycle, seriesCycle int, labelKeys, labelValues []string, seriesCount *int) { + if c.changeSeriesTick == nil { + return + } - for tick := range changeSeriesChan { - metricsMux.Lock() - unregisterMetrics() - registerMetrics(metricCount, metricLength, metricCycle, labelKeys) - cycleValues(labelKeys, labelValues, *seriesCount, seriesCycle) - metricsMux.Unlock() + *seriesCount = c.cfg.MinSeriesCount + seriesIncrease := true + for tick := range c.changeSeriesTick.C { + c.recreateMetrics(metricCycle, labelKeys) + c.cycleValues(labelKeys, labelValues, *seriesCount, seriesCycle) - changeSeriesGradual(&seriesChangeRate, &maxSeriesCount, &minSeriesCount, seriesCount, &seriesIncrease) + changeSeriesGradual(&c.cfg.SeriesChangeRate, &c.cfg.MaxSeriesCount, &c.cfg.MinSeriesCount, seriesCount, &seriesIncrease) fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *seriesCount) select { - case updateNotify <- struct{}{}: + case c.updateNotifyCh <- struct{}{}: default: } } } -func handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64, changeSeriesChan <-chan time.Time, updateNotify chan struct{}) { +func (c *Collector) handleSpikeMode(metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64) { + if c.changeSeriesTick == nil { + return + } + initialSeriesCount := *currentSeriesCount - for tick := range changeSeriesChan { - metricsMux.Lock() - unregisterMetrics() - registerMetrics(metricCount, metricLength, metricCycle, labelKeys) - cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle) - metricsMux.Unlock() + for tick := range c.changeSeriesTick.C { + c.recreateMetrics(metricCycle, labelKeys) + c.cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle) if *currentSeriesCount > initialSeriesCount { *currentSeriesCount = initialSeriesCount @@ -216,102 +455,94 @@ func handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle int, la fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount) select { - case updateNotify <- struct{}{}: + case c.updateNotifyCh <- struct{}{}: default: } } } -// RunMetrics creates a set of Prometheus test series that update over time -func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, spikeMultiplier float64, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) { - labelKeys := make([]string, labelCount) - for idx := 0; idx < labelCount; idx++ { - labelKeys[idx] = fmt.Sprintf("label_key_%s_%v", strings.Repeat("k", labelLength), idx) +// Run creates a set of Prometheus test series that update over time. +// NOTE: Only one execution of RunMetrics is currently expected. +func (c *Collector) Run() error { + labelKeys := make([]string, c.cfg.LabelCount) + for idx := 0; idx < c.cfg.LabelCount; idx++ { + labelKeys[idx] = fmt.Sprintf("label_key_%s_%v", strings.Repeat("k", c.cfg.LabelLength), idx) } - labelValues := make([]string, labelCount) - for idx := 0; idx < labelCount; idx++ { - labelValues[idx] = fmt.Sprintf("label_val_%s_%v", strings.Repeat("v", labelLength), idx) + labelValues := make([]string, c.cfg.LabelCount) + for idx := 0; idx < c.cfg.LabelCount; idx++ { + labelValues[idx] = fmt.Sprintf("label_val_%s_%v", strings.Repeat("v", c.cfg.LabelLength), idx) } - for _, cLabel := range constLabels { + for _, cLabel := range c.cfg.ConstLabels { split := strings.Split(cLabel, "=") - if len(split) != 2 { - return make(chan struct{}, 1), fmt.Errorf("Constant label argument must have format labelName=labelValue but got %s", cLabel) - } labelKeys = append(labelKeys, split[0]) labelValues = append(labelValues, split[1]) } metricCycle := 0 seriesCycle := 0 - valueTick := time.NewTicker(time.Duration(valueInterval) * time.Second) - seriesTick := time.NewTicker(time.Duration(seriesInterval) * time.Second) - metricTick := time.NewTicker(time.Duration(metricInterval) * time.Second) - changeSeriesTick := time.NewTicker(time.Duration(seriesChangeInterval) * time.Second) - updateNotify := make(chan struct{}, 1) + currentSeriesCount := c.cfg.SeriesCount - currentSeriesCount := seriesCount + c.mu.Lock() + c.gauges = make([]*prometheus.GaugeVec, c.cfg.GaugeMetricCount) + c.counters = make([]*prometheus.CounterVec, c.cfg.CounterMetricCount) + c.histograms = make([]*prometheus.HistogramVec, c.cfg.HistogramMetricCount) + c.nativeHistograms = make([]*prometheus.HistogramVec, c.cfg.NativeHistogramMetricCount) + c.summaries = make([]*prometheus.SummaryVec, c.cfg.SummaryMetricCount) + c.mu.Unlock() - switch seriesOperationMode { + switch c.cfg.SeriesOperationMode { case "double-halve": - registerMetrics(metricCount, metricLength, metricCycle, labelKeys) - cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) - fmt.Printf("Starting double-halve mode; starting series: %d, change series interval: %d seconds\n", currentSeriesCount, seriesChangeInterval) - go handleDoubleHalveMode(metricCount, metricLength, metricCycle, seriesCycle, labelKeys, labelValues, ¤tSeriesCount, changeSeriesTick.C, updateNotify) - go handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, valueTick) - go handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, seriesTick) + c.recreateMetrics(metricCycle, labelKeys) + c.cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) + fmt.Printf("Starting double-halve mode; starting series: %d, change series interval: %d seconds\n", currentSeriesCount, c.cfg.SeriesChangeInterval) + go c.handleDoubleHalveMode(metricCycle, seriesCycle, labelKeys, labelValues, ¤tSeriesCount) + go c.handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle) + go c.handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle) + go c.handleMetricTicks(&metricCycle, &labelKeys) case "gradual-change": - if minSeriesCount >= maxSeriesCount { - return nil, fmt.Errorf("error: minSeriesCount must be less than maxSeriesCount, got %d and %d", minSeriesCount, maxSeriesCount) - } - fmt.Printf("Starting gradual-change mode; min series: %d, max series: %d, series change rate: %d, change series interval: %d seconds\n", minSeriesCount, maxSeriesCount, seriesChangeRate, seriesChangeInterval) + fmt.Printf("Starting gradual-change mode; min series: %d, max series: %d, series change rate: %d, change series interval: %d seconds\n", c.cfg.MinSeriesCount, c.cfg.MaxSeriesCount, c.cfg.SeriesChangeRate, c.cfg.SeriesChangeInterval) - registerMetrics(metricCount, metricLength, metricCycle, labelKeys) - cycleValues(labelKeys, labelValues, minSeriesCount, seriesCycle) - go handleGradualChangeMode(metricCount, metricLength, metricCycle, seriesCycle, labelKeys, labelValues, - seriesChangeRate, minSeriesCount, maxSeriesCount, ¤tSeriesCount, changeSeriesTick.C, updateNotify) - go handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, valueTick) - go handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, seriesTick) + c.recreateMetrics(metricCycle, labelKeys) + c.cycleValues(labelKeys, labelValues, c.cfg.MinSeriesCount, seriesCycle) + go c.handleGradualChangeMode(metricCycle, seriesCycle, labelKeys, labelValues, ¤tSeriesCount) + go c.handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle) + go c.handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle) + go c.handleMetricTicks(&metricCycle, &labelKeys) case "spike": - if spikeMultiplier < 1 { - return nil, fmt.Errorf("error: spikeMultiplier must be greater than or equal to 1, got %f", spikeMultiplier) - } - registerMetrics(metricCount, metricLength, metricCycle, labelKeys) - cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) - fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", currentSeriesCount, spikeMultiplier, seriesChangeInterval) - go handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle, labelKeys, labelValues, ¤tSeriesCount, spikeMultiplier, changeSeriesTick.C, updateNotify) - go handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, valueTick) - go handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, seriesTick) + c.recreateMetrics(metricCycle, labelKeys) + c.cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) + fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", currentSeriesCount, c.cfg.SpikeMultiplier, c.cfg.SeriesChangeInterval) + go c.handleSpikeMode(metricCycle, seriesCycle, labelKeys, labelValues, ¤tSeriesCount, c.cfg.SpikeMultiplier) + go c.handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle) + go c.handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle) + go c.handleMetricTicks(&metricCycle, &labelKeys) default: - registerMetrics(metricCount, metricLength, metricCycle, labelKeys) - cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) - go handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, valueTick) - go handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, seriesTick) - go handleMetricTicks(&metricCount, &metricLength, &metricCycle, &labelKeys, updateNotify, metricTick) - } - - go func() { - <-stop - valueTick.Stop() - seriesTick.Stop() - metricTick.Stop() - changeSeriesTick.Stop() - }() - - return updateNotify, nil -} - -// ServeMetrics serves a prometheus metrics endpoint with test series -func ServeMetrics(port int) error { - http.Handle("/metrics", promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{})) - h := health.New(health.Health{}) - http.HandleFunc("/health", h.Handler) - err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil) - if err != nil { - return err + c.recreateMetrics(metricCycle, labelKeys) + c.cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) + go c.handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle) + go c.handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle) + go c.handleMetricTicks(&metricCycle, &labelKeys) } + <-c.stopCh return nil } + +func (c *Collector) Stop(_ error) { + if c.valueTick != nil { + c.valueTick.Stop() + } + if c.seriesTick != nil { + c.seriesTick.Stop() + } + if c.metricTick != nil { + c.metricTick.Stop() + } + if c.changeSeriesTick != nil { + c.changeSeriesTick.Stop() + } + close(c.stopCh) +} diff --git a/metrics/serve_test.go b/metrics/serve_test.go index 357f82e..2ae58ac 100644 --- a/metrics/serve_test.go +++ b/metrics/serve_test.go @@ -19,102 +19,171 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + io_prometheus_client "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" ) // Helper function to count the series in the registry -func countSeries(t *testing.T, registry *prometheus.Registry) int { +func countSeries(t *testing.T, registry *prometheus.Registry) (seriesCount int) { metricsFamilies, err := registry.Gather() assert.NoError(t, err) - seriesCount := 0 for _, mf := range metricsFamilies { for range mf.Metric { seriesCount++ } } - return seriesCount } -func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) { - const ( - initialSeriesCount = 5 - metricCount = 1 - labelCount = 1 - maxSeriesCount = 10 - minSeriesCount = 1 - spikeMultiplier = 1.5 - seriesChangeRate = 1 - metricLength = 1 - labelLength = 1 - valueInterval = 100 - seriesInterval = 100 - metricInterval = 100 - seriesChangeInterval = 3 - operationMode = "double-halve" - constLabel = "constLabel=test" - ) - - stop := make(chan struct{}) - defer close(stop) - - promRegistry = prometheus.NewRegistry() - - _, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) +func countSeriesTypes(t *testing.T, registry *prometheus.Registry) (gauges, counters, histograms, nhistograms, summaries int) { + metricsFamilies, err := registry.Gather() assert.NoError(t, err) + + for _, mf := range metricsFamilies { + for _, m := range mf.Metric { + switch mf.GetType() { + case io_prometheus_client.MetricType_GAUGE: + gauges++ + case io_prometheus_client.MetricType_COUNTER: + counters++ + case io_prometheus_client.MetricType_HISTOGRAM: + if len(m.GetHistogram().Bucket) == 0 { + nhistograms++ + } else { + histograms++ + } + case io_prometheus_client.MetricType_SUMMARY: + summaries++ + default: + t.Fatalf("unknown metric type found %v", mf.GetType()) + } + } + } + return gauges, counters, histograms, nhistograms, summaries +} + +func TestRunMetrics(t *testing.T) { + testCfg := Config{ + GaugeMetricCount: 200, + CounterMetricCount: 200, + HistogramMetricCount: 10, + HistogramBuckets: 8, + NativeHistogramMetricCount: 10, + SummaryMetricCount: 10, + + MinSeriesCount: 0, + MaxSeriesCount: 1000, + LabelCount: 1, + SeriesCount: 10, + MetricLength: 1, + LabelLength: 1, + ConstLabels: []string{"constLabel=test"}, + } + assert.NoError(t, testCfg.Validate()) + + reg := prometheus.NewRegistry() + coll := NewCollector(testCfg) + reg.MustRegister(coll) + + go coll.Run() + t.Cleanup(func() { + coll.Stop(nil) + }) + + time.Sleep(2 * time.Second) + + g, c, h, nh, s := countSeriesTypes(t, reg) + assert.Equal(t, testCfg.GaugeMetricCount*testCfg.SeriesCount, g) + assert.Equal(t, testCfg.CounterMetricCount*testCfg.SeriesCount, c) + assert.Equal(t, testCfg.HistogramMetricCount*testCfg.SeriesCount, h) + assert.Equal(t, testCfg.NativeHistogramMetricCount*testCfg.SeriesCount, nh) + assert.Equal(t, testCfg.SummaryMetricCount*testCfg.SeriesCount, s) +} + +func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) { + testCfg := Config{ + GaugeMetricCount: 1, + LabelCount: 1, + SeriesCount: 5, // Initial. + MaxSeriesCount: 10, + MinSeriesCount: 1, + SpikeMultiplier: 1.5, + SeriesChangeRate: 1, + MetricLength: 1, + LabelLength: 1, + ValueInterval: 100, + SeriesInterval: 100, + MetricInterval: 100, + SeriesChangeInterval: 3, + SeriesOperationMode: "double-halve", + ConstLabels: []string{"constLabel=test"}, + } + assert.NoError(t, testCfg.Validate()) + + reg := prometheus.NewRegistry() + coll := NewCollector(testCfg) + reg.MustRegister(coll) + + go coll.Run() + t.Cleanup(func() { + coll.Stop(nil) + }) + time.Sleep(2 * time.Second) for i := 0; i < 4; i++ { - time.Sleep(time.Duration(seriesChangeInterval) * time.Second) + time.Sleep(time.Duration(testCfg.SeriesChangeInterval) * time.Second) if i%2 == 0 { // Expecting halved series count - currentCount := countSeries(t, promRegistry) - expectedCount := initialSeriesCount + currentCount := countSeries(t, reg) + expectedCount := testCfg.SeriesCount assert.Equal(t, expectedCount, currentCount, "Halved series count should be %d but got %d", expectedCount, currentCount) } else { // Expecting doubled series count - currentCount := countSeries(t, promRegistry) - expectedCount := initialSeriesCount * 2 + currentCount := countSeries(t, reg) + expectedCount := testCfg.SeriesCount * 2 assert.Equal(t, expectedCount, currentCount, "Doubled series count should be %d but got %d", expectedCount, currentCount) } } } func TestRunMetricsGradualChange(t *testing.T) { - const ( - metricCount = 1 - labelCount = 1 - seriesCount = 100 - maxSeriesCount = 30 - minSeriesCount = 10 - spikeMultiplier = 1.5 - seriesChangeRate = 10 - metricLength = 1 - labelLength = 1 - valueInterval = 100 - seriesInterval = 100 - metricInterval = 100 - seriesChangeInterval = 3 - operationMode = "gradual-change" - constLabel = "constLabel=test" - ) - - stop := make(chan struct{}) - defer close(stop) - - promRegistry = prometheus.NewRegistry() - - _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) - assert.NoError(t, err) + testCfg := Config{ + GaugeMetricCount: 1, + LabelCount: 1, + SeriesCount: 100, // Initial. + MaxSeriesCount: 30, + MinSeriesCount: 10, + SpikeMultiplier: 1.5, + SeriesChangeRate: 10, + MetricLength: 1, + LabelLength: 1, + ValueInterval: 100, + SeriesInterval: 100, + MetricInterval: 100, + SeriesChangeInterval: 3, + SeriesOperationMode: "gradual-change", + ConstLabels: []string{"constLabel=test"}, + } + assert.NoError(t, testCfg.Validate()) + + reg := prometheus.NewRegistry() + coll := NewCollector(testCfg) + reg.MustRegister(coll) + + go coll.Run() + t.Cleanup(func() { + coll.Stop(nil) + }) time.Sleep(2 * time.Second) - currentCount := countSeries(t, promRegistry) + currentCount := countSeries(t, reg) expectedInitialCount := currentCount assert.Equal(t, expectedInitialCount, currentCount, "Initial series count should be minSeriesCount %d but got %d", expectedInitialCount, currentCount) assert.Eventually(t, func() bool { - graduallyIncreasedCount := countSeries(t, promRegistry) + graduallyIncreasedCount := countSeries(t, reg) fmt.Println("seriesCount: ", graduallyIncreasedCount) - if graduallyIncreasedCount > maxSeriesCount { - t.Fatalf("Gradually increased series count should be less than maxSeriesCount %d but got %d", maxSeriesCount, graduallyIncreasedCount) + if graduallyIncreasedCount > testCfg.MaxSeriesCount { + t.Fatalf("Gradually increased series count should be less than maxSeriesCount %d but got %d", testCfg.MaxSeriesCount, graduallyIncreasedCount) } if currentCount > graduallyIncreasedCount { t.Fatalf("Gradually increased series count should be greater than initial series count %d but got %d", currentCount, graduallyIncreasedCount) @@ -122,87 +191,81 @@ func TestRunMetricsGradualChange(t *testing.T) { currentCount = graduallyIncreasedCount } - return graduallyIncreasedCount == maxSeriesCount - }, 15*time.Second, seriesChangeInterval*time.Second, "Did not receive update notification for series count gradual increase in time") + return graduallyIncreasedCount == testCfg.MaxSeriesCount + }, 15*time.Second, time.Duration(testCfg.SeriesChangeInterval)*time.Second, "Did not receive update notification for series count gradual increase in time") assert.Eventually(t, func() bool { - graduallyIncreasedCount := countSeries(t, promRegistry) + graduallyIncreasedCount := countSeries(t, reg) fmt.Println("seriesCount: ", graduallyIncreasedCount) - if graduallyIncreasedCount < minSeriesCount { - t.Fatalf("Gradually increased series count should be less than maxSeriesCount %d but got %d", maxSeriesCount, graduallyIncreasedCount) + if graduallyIncreasedCount < testCfg.MinSeriesCount { + t.Fatalf("Gradually increased series count should be less than maxSeriesCount %d but got %d", testCfg.MaxSeriesCount, graduallyIncreasedCount) } - return graduallyIncreasedCount == minSeriesCount - }, 15*time.Second, seriesChangeInterval*time.Second, "Did not receive update notification for series count gradual increase in time") + return graduallyIncreasedCount == testCfg.MinSeriesCount + }, 15*time.Second, time.Duration(testCfg.SeriesChangeInterval)*time.Second, "Did not receive update notification for series count gradual increase in time") } -// if min is bigger than maxSeriesCount, fail in GradualChange func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) { - const ( - metricCount = 1 - labelCount = 1 - seriesCount = 100 - maxSeriesCount = 10 - minSeriesCount = 100 - spikeMultiplier = 1.5 - seriesChangeRate = 10 - metricLength = 1 - labelLength = 1 - valueInterval = 100 - seriesInterval = 100 - metricInterval = 100 - seriesChangeInterval = 3 - operationMode = "gradual-change" - constLabel = "constLabel=test" - ) - - stop := make(chan struct{}) - defer close(stop) - - promRegistry = prometheus.NewRegistry() - - _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) - assert.Error(t, err) + testCfg := Config{ + GaugeMetricCount: 1, + LabelCount: 1, + SeriesCount: 100, + MaxSeriesCount: 10, + MinSeriesCount: 100, + SpikeMultiplier: 1.5, + SeriesChangeRate: 10, + MetricLength: 1, + LabelLength: 1, + ValueInterval: 100, + SeriesInterval: 100, + MetricInterval: 100, + SeriesChangeInterval: 3, + SeriesOperationMode: "gradual-change", + ConstLabels: []string{"constLabel=test"}, + } + assert.Error(t, testCfg.Validate()) } func TestRunMetricsSpikeChange(t *testing.T) { - const ( - metricCount = 1 - labelCount = 1 - initialSeriesCount = 100 - maxSeriesCount = 30 - minSeriesCount = 10 - spikeMultiplier = 1.5 - seriesChangeRate = 10 - metricLength = 1 - labelLength = 1 - valueInterval = 100 - seriesInterval = 100 - metricInterval = 100 - seriesChangeInterval = 10 - operationMode = "spike" - constLabel = "constLabel=test" - ) - - stop := make(chan struct{}) - defer close(stop) - - promRegistry = prometheus.NewRegistry() - - _, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) - assert.NoError(t, err) + testCfg := Config{ + GaugeMetricCount: 1, + LabelCount: 1, + SeriesCount: 100, + MaxSeriesCount: 30, + MinSeriesCount: 10, + SpikeMultiplier: 1.5, + SeriesChangeRate: 10, + MetricLength: 1, + LabelLength: 1, + ValueInterval: 100, + SeriesInterval: 100, + MetricInterval: 100, + SeriesChangeInterval: 10, + SeriesOperationMode: "spike", + ConstLabels: []string{"constLabel=test"}, + } + assert.NoError(t, testCfg.Validate()) + + reg := prometheus.NewRegistry() + coll := NewCollector(testCfg) + reg.MustRegister(coll) + + go coll.Run() + t.Cleanup(func() { + coll.Stop(nil) + }) time.Sleep(2 * time.Second) for i := 0; i < 4; i++ { - time.Sleep(time.Duration(seriesChangeInterval) * time.Second) + time.Sleep(time.Duration(testCfg.SeriesChangeInterval) * time.Second) if i%2 == 0 { - currentCount := countSeries(t, promRegistry) - expectedCount := initialSeriesCount + currentCount := countSeries(t, reg) + expectedCount := testCfg.SeriesCount assert.Equal(t, expectedCount, currentCount, fmt.Sprintf("Halved series count should be %d but got %d", expectedCount, currentCount)) } else { - currentCount := countSeries(t, promRegistry) - expectedCount := int(float64(initialSeriesCount) * spikeMultiplier) - assert.Equal(t, expectedCount, currentCount, fmt.Sprintf("Multiplied the series count by %.1f, should be %d but got %d", spikeMultiplier, expectedCount, currentCount)) + currentCount := countSeries(t, reg) + expectedCount := int(float64(testCfg.SeriesCount) * testCfg.SpikeMultiplier) + assert.Equal(t, expectedCount, currentCount, fmt.Sprintf("Multiplied the series count by %.1f, should be %d but got %d", testCfg.SpikeMultiplier, expectedCount, currentCount)) } } } diff --git a/metrics/write.go b/metrics/write.go index d185f26..d20af03 100644 --- a/metrics/write.go +++ b/metrics/write.go @@ -27,16 +27,15 @@ import ( "sync" "time" + "github.com/prometheus-community/avalanche/pkg/download" + "github.com/prometheus-community/avalanche/pkg/errors" + "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" - - "github.com/prometheus-community/avalanche/pkg/download" - - dto "github.com/prometheus/client_model/go" - - "github.com/prometheus-community/avalanche/pkg/errors" ) const maxErrMsgLen = 256 @@ -59,14 +58,15 @@ type ConfigWrite struct { // Client for the remote write requests. type Client struct { - client *http.Client - timeout time.Duration - config *ConfigWrite + client *http.Client + timeout time.Duration + config *ConfigWrite + gatherer prometheus.Gatherer } // SendRemoteWrite initializes a http client and // sends metrics to a prometheus compatible remote endpoint. -func SendRemoteWrite(config *ConfigWrite) error { +func SendRemoteWrite(config *ConfigWrite, gatherer prometheus.Gatherer) error { var rt http.RoundTripper = &http.Transport{ TLSClientConfig: &config.TLSClientConfig, } @@ -74,9 +74,10 @@ func SendRemoteWrite(config *ConfigWrite) error { httpClient := &http.Client{Transport: rt} c := Client{ - client: httpClient, - timeout: time.Minute, - config: config, + client: httpClient, + timeout: time.Minute, + config: config, + gatherer: gatherer, } return c.write() } @@ -109,7 +110,7 @@ func cloneRequest(r *http.Request) *http.Request { } func (c *Client) write() error { - tss, err := collectMetrics(c.config.OutOfOrder) + tss, err := collectMetrics(c.gatherer, c.config.OutOfOrder) if err != nil { return err } @@ -141,7 +142,7 @@ func (c *Client) write() error { select { case <-c.config.UpdateNotify: log.Println("updating remote write metrics") - tss, err = collectMetrics(c.config.OutOfOrder) + tss, err = collectMetrics(c.gatherer, c.config.OutOfOrder) if err != nil { merr.Add(err) } @@ -194,10 +195,8 @@ func updateTimetamps(tss []prompb.TimeSeries) []prompb.TimeSeries { return tss } -func collectMetrics(outOfOrder bool) ([]prompb.TimeSeries, error) { - metricsMux.Lock() - defer metricsMux.Unlock() - metricFamilies, err := promRegistry.Gather() +func collectMetrics(gatherer prometheus.Gatherer, outOfOrder bool) ([]prompb.TimeSeries, error) { + metricFamilies, err := gatherer.Gather() if err != nil { return nil, err } @@ -207,6 +206,7 @@ func collectMetrics(outOfOrder bool) ([]prompb.TimeSeries, error) { } return tss, nil } + func shuffleTimestamps(tss []prompb.TimeSeries) []prompb.TimeSeries { now := time.Now().UnixMilli() offsets := []int64{0, -60 * 1000, -5 * 60 * 1000}