Untitled Diff

Created Diff never expires
2 removals
Lines
Total
Removed
Words
Total
Removed
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
69 lines
12 additions
Lines
Total
Added
Words
Total
Added
To continue using this feature, upgrade to
Diffchecker logo
Diffchecker Pro
79 lines
// Copyright The OpenTelemetry Authors
// Copyright The OpenTelemetry Authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// See the License for the specific language governing permissions and
// limitations under the License.
// limitations under the License.


package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"


import (
import (
"fmt"
"sync"
"sync"
"time"
"time"


"github.com/davecgh/go-spew/spew"
"go.opencensus.io/stats/view"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
)
)


const (
const (
exporterFailureView = "exporter/send_failed_requests"
exporterFailureView = "exporter/send_failed_requests"
)
)


// healthCheckExporter is a struct implement the exporter interface in open census that could export metrics
// healthCheckExporter is a struct implement the exporter interface in open census that could export metrics
type healthCheckExporter struct {
type healthCheckExporter struct {
mu sync.Mutex
mu sync.Mutex
exporterFailureQueue []*view.Data
exporterFailureQueue []*view.Data
logger *zap.Logger
}
}


func newHealthCheckExporter() *healthCheckExporter {
func newHealthCheckExporter(l *zap.Logger) *healthCheckExporter {
return &healthCheckExporter{}
return &healthCheckExporter{logger: l}
}
}


// ExportView function could export the failure view to the queue
// ExportView function could export the failure view to the queue
func (e *healthCheckExporter) ExportView(vd *view.Data) {
func (e *healthCheckExporter) ExportView(vd *view.Data) {
e.mu.Lock()
e.mu.Lock()
defer e.mu.Unlock()
defer e.mu.Unlock()


if vd.View.Name == exporterFailureView {
if vd.View.Name == exporterFailureView {
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
e.logger.Info(fmt.Sprintf("adding element %v to the queue", spew.Sdump(vd)))
e.logger.Info(fmt.Sprintf("HC queue size: %d", len(e.exporterFailureQueue)))
}
}
}
}


func (e *healthCheckExporter) checkHealthStatus(exporterFailureThreshold int) bool {
func (e *healthCheckExporter) checkHealthStatus(exporterFailureThreshold int) bool {
e.mu.Lock()
e.mu.Lock()
defer e.mu.Unlock()
defer e.mu.Unlock()


fmt.Printf("HC queue size: %d\n", len(e.exporterFailureQueue))
return exporterFailureThreshold >= len(e.exporterFailureQueue)
return exporterFailureThreshold >= len(e.exporterFailureQueue)
}
}


// rotate function could rotate the error logs that expired the time interval
// rotate function could rotate the error logs that expired the time interval
func (e *healthCheckExporter) rotate(interval time.Duration) {
func (e *healthCheckExporter) rotate(interval time.Duration) {
e.mu.Lock()
e.mu.Lock()
defer e.mu.Unlock()
defer e.mu.Unlock()


viewNum := len(e.exporterFailureQueue)
viewNum := len(e.exporterFailureQueue)
e.logger.Info(fmt.Sprintf("entering `rotate` function, queue size is %d", viewNum))
currentTime := time.Now()
currentTime := time.Now()
for i := 0; i < viewNum; i++ {
for i := 0; i < viewNum; i++ {
vd := e.exporterFailureQueue[0]
vd := e.exporterFailureQueue[0]
e.logger.Info(fmt.Sprintf("\telem %+v, curTime %v, interval %v\n", spew.Sdump(vd), currentTime, interval))
if vd.Start.Add(interval).After(currentTime) {
if vd.Start.Add(interval).After(currentTime) {
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
e.exporterFailureQueue = append(e.exporterFailureQueue, vd)
}
}
e.exporterFailureQueue = e.exporterFailureQueue[1:]
e.exporterFailureQueue = e.exporterFailureQueue[1:]
}
}
e.logger.Info(fmt.Sprintf("exiting `rotate` function, queue size is %d", viewNum))
}
}