Untitled Diff
69 lines
// Copyright The OpenTelemetry Authors
// Copyright The OpenTelemetry Authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// See the License for the specific language governing permissions and
// limitations under the License.
// limitations under the License.
package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
import (
import (
"fmt"
"sync"
"sync"
"time"
"time"
"github.com/davecgh/go-spew/spew"
"go.opencensus.io/stats/view"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
)
)
const (
const (
exporterFailureView = "exporter/send_failed_requests"
exporterFailureView = "exporter/send_failed_requests"
)
)
// healthCheckExporter is a struct implement the exporter interface in open census that could export metrics
// healthCheckExporter is a struct implement the exporter interface in open census that could export metrics
type healthCheckExporter struct {
type healthCheckExporter struct {
mu sync.Mutex
mu sync.Mutex
exporterFailureQueue []*view.Data
exporterFailureQueue []*view.Data
logger *zap.Logger
}
}
func newHealthCheckExporter() *healthCheckExporter {
func newHealthCheckExporter(l *zap.Logger) *healthCheckExporter {
return &healthCheckExporter{}
return &healthCheckExporter{logger: l}
}
}
// ExportView function could export the failure view to the queue
// ExportView function could export the failure view to the queue
func (e *healthCheckExporter) ExportView(vd *view.Data) {
func (e *healthCheckExporter) ExportView(vd *view.Data) {
e.mu.Lock()
e.mu.Lock()
defer e.mu.Unlock()
defer e.mu.Unlock()
if vd.View.Name == exporterFailureView {
if vd.View.Name == exporterFailureView {
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
e.logger.Info(fmt.Sprintf("adding element %v to the queue", spew.Sdump(vd)))
e.logger.Info(fmt.Sprintf("HC queue size: %d", len(e.exporterFailureQueue)))
}
}
}
}
func (e *healthCheckExporter) checkHealthStatus(exporterFailureThreshold int) bool {
func (e *healthCheckExporter) checkHealthStatus(exporterFailureThreshold int) bool {
e.mu.Lock()
e.mu.Lock()
defer e.mu.Unlock()
defer e.mu.Unlock()
fmt.Printf("HC queue size: %d\n", len(e.exporterFailureQueue))
return exporterFailureThreshold >= len(e.exporterFailureQueue)
return exporterFailureThreshold >= len(e.exporterFailureQueue)
}
}
// rotate function could rotate the error logs that expired the time interval
// rotate function could rotate the error logs that expired the time interval
func (e *healthCheckExporter) rotate(interval time.Duration) {
func (e *healthCheckExporter) rotate(interval time.Duration) {
e.mu.Lock()
e.mu.Lock()
defer e.mu.Unlock()
defer e.mu.Unlock()
viewNum := len(e.exporterFailureQueue)
viewNum := len(e.exporterFailureQueue)
e.logger.Info(fmt.Sprintf("entering `rotate` function, queue size is %d", viewNum))
currentTime := time.Now()
currentTime := time.Now()
for i := 0; i < viewNum; i++ {
for i := 0; i < viewNum; i++ {
vd := e.exporterFailureQueue[0]
vd := e.exporterFailureQueue[0]
e.logger.Info(fmt.Sprintf("\telem %+v, curTime %v, interval %v\n", spew.Sdump(vd), currentTime, interval))
if vd.Start.Add(interval).After(currentTime) {
if vd.Start.Add(interval).After(currentTime) {
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
}
}
e.exporterFailureQueue = e.exporterFailureQueue[1:]
e.exporterFailureQueue = e.exporterFailureQueue[1:]
}
}
e.logger.Info(fmt.Sprintf("exiting `rotate` function, queue size is %d", viewNum))
}
}