package main import ( "bytes" "context" "fmt" "io" "log" "math/rand" "net/http" "os" "os/signal" "syscall" "time" "github.com/gogo/protobuf/proto" // Using gogo/protobuf for compatibility with prompb "github.com/golang/snappy" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/prompb" ) // --- Configuration --- // Use environment variables for sensitive data const ( mimirPushURL = "https:///api/v1/push" mimirUsername = "your-tenant-id" // Replace or use env var mimirPassword = "your-api-key" // Replace or use env var pushInterval = 15 * time.Second ) // MimirPusher is responsible for pushing metrics to Mimir type MimirPusher struct { registry *prometheus.Registry client *http.Client } // NewMimirPusher creates a new pusher func NewMimirPusher(registry *prometheus.Registry) *MimirPusher { return &MimirPusher{ registry: registry, client: &http.Client{ Timeout: 10 * time.Second, }, } } // Start begins the periodic push of metrics. // It stops when the provided context is canceled. func (p *MimirPusher) Start(ctx context.Context) { ticker := time.NewTicker(pushInterval) defer ticker.Stop() log.Println("Starting Mimir pusher...") for { select { case <-ticker.C: if err := p.pushMetrics(); err != nil { log.Printf("ERROR: Failed to push metrics: %v", err) } else { log.Println("Successfully pushed metrics to Mimir.") } case <-ctx.Done(): log.Println("Stopping Mimir pusher...") // Final push on shutdown if err := p.pushMetrics(); err != nil { log.Printf("ERROR: Failed to push final metrics: %v", err) } return } } } // pushMetrics gathers, transforms, and sends metrics to Mimir. func (p *MimirPusher) pushMetrics() error { // 1. Gather metrics from the registry metricFamilies, err := p.registry.Gather() if err != nil { return fmt.Errorf("could not gather metrics: %w", err) } now := time.Now().UnixMilli() writeRequest := &prompb.WriteRequest{} // 2. Transform metrics to prompb.TimeSeries for _, mf := range metricFamilies { for _, m := range mf.GetMetric() { labels := make([]prompb.Label, 0, len(m.GetLabel())+1) labels = append(labels, prompb.Label{ Name: model.MetricNameLabel, Value: mf.GetName(), }) for _, l := range m.GetLabel() { labels = append(labels, prompb.Label{ Name: l.GetName(), Value: l.GetValue(), }) } var value float64 if m.GetGauge() != nil { value = m.GetGauge().GetValue() } else if m.GetCounter() != nil { value = m.GetCounter().GetValue() } else { // Skipping other types (Summary, Histogram) for simplicity continue } ts := prompb.TimeSeries{ Labels: labels, Samples: []prompb.Sample{ { Value: value, Timestamp: now, }, }, } writeRequest.Timeseries = append(writeRequest.Timeseries, ts) } } // 3. Marshal to protobuf and compress with Snappy data, err := proto.Marshal(writeRequest) if err != nil { return fmt.Errorf("could not marshal protobuf: %w", err) } compressedData := snappy.Encode(nil, data) // 4. Create and send the HTTP request req, err := http.NewRequest("POST", mimirPushURL, bytes.NewReader(compressedData)) if err != nil { return fmt.Errorf("could not create http request: %w", err) } // Set necessary headers req.Header.Set("Content-Type", "application/x-protobuf") req.Header.Set("Content-Encoding", "snappy") req.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") // Add authentication req.SetBasicAuth(mimirUsername, mimirPassword) resp, err := p.client.Do(req) if err != nil { return fmt.Errorf("http request failed: %w", err) } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("received non-2xx response: %s - %s", resp.Status, string(body)) } return nil } func main() { // Use a custom registry instead of the global one. This is best practice. registry := prometheus.NewRegistry() // Instrument your application with some metrics opsProcessed := promauto.With(registry).NewCounter(prometheus.CounterOpts{ Name: "myapp_processed_ops_total", Help: "The total number of processed operations.", }) temperature := promauto.With(registry).NewGaugeVec(prometheus.GaugeOpts{ Name: "myapp_temperature_celsius", Help: "The current temperature of a sensor.", }, []string{"sensor_id"}) // Simulate work in a separate goroutine go func() { for { opsProcessed.Inc() temperature.WithLabelValues("A1").Set(rand.Float64()*10 + 15) // Temp between 15-25 temperature.WithLabelValues("B2").Set(rand.Float64()*15 + 20) // Temp between 20-35 time.Sleep(2 * time.Second) } }() // Create and start the Mimir pusher pusher := NewMimirPusher(registry) // Set up graceful shutdown ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() go pusher.Start(ctx) <-ctx.Done() // Wait for shutdown signal log.Println("Application shutting down.") }