Untitled Diff
162 lines
// Copyright The OpenTelemetry Authors
// Copyright The OpenTelemetry Authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// You may obtain a copy of the License at
//
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
//
// Unless required by applicable law or agreed to in writing, software
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// See the License for the specific language governing permissions and
// limitations under the License.
// limitations under the License.
package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
package healthcheckextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension"
import (
import (
"context"
"context"
"errors"
"errors"
"net"
"net"
"net/http"
"net/http"
"strconv"
"strconv"
"time"
"time"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"go.opencensus.io/stats/view"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"go.uber.org/zap"
)
)
type healthCheckExtension struct {
type healthCheckExtension struct {
config Config
config Config
logger *zap.Logger
logger *zap.Logger
state *healthcheck.HealthCheck
state *healthcheck.HealthCheck
server http.Server
server http.Server
stopCh chan struct{}
stopCh chan struct{}
exporter *healthCheckExporter
exporter *healthCheckExporter
}
}
var _ component.PipelineWatcher = (*healthCheckExtension)(nil)
var _ component.PipelineWatcher = (*healthCheckExtension)(nil)
func (hc *healthCheckExtension) Start(_ context.Context, host component.Host) error {
func (hc *healthCheckExtension) Start(_ context.Context, host component.Host) error {
hc.logger.Info("Starting health_check extension", zap.Any("config", hc.config))
hc.logger.Info("Starting health_check extension", zap.Any("config", hc.config))
// Initialize listener
// Initialize listener
var (
var (
ln net.Listener
ln net.Listener
err error
err error
)
)
if hc.config.Port != 0 && hc.config.TCPAddr.Endpoint == defaultEndpoint {
if hc.config.Port != 0 && hc.config.TCPAddr.Endpoint == defaultEndpoint {
hc.logger.Warn("`Port` is deprecated, use `Endpoint` instead")
hc.logger.Warn("`Port` is deprecated, use `Endpoint` instead")
portStr := ":" + strconv.Itoa(int(hc.config.Port))
portStr := ":" + strconv.Itoa(int(hc.config.Port))
ln, err = net.Listen("tcp", portStr)
ln, err = net.Listen("tcp", portStr)
} else {
} else {
ln, err = hc.config.TCPAddr.Listen()
ln, err = hc.config.TCPAddr.Listen()
}
}
if err != nil {
if err != nil {
return err
return err
}
}
if !hc.config.CheckCollectorPipeline.Enabled {
if !hc.config.CheckCollectorPipeline.Enabled {
// Mount HC handler
// Mount HC handler
mux := http.NewServeMux()
mux := http.NewServeMux()
mux.Handle(hc.config.Path, hc.state.Handler())
mux.Handle(hc.config.Path, hc.state.Handler())
hc.server.Handler = mux
hc.server.Handler = mux
hc.stopCh = make(chan struct{})
hc.stopCh = make(chan struct{})
go func() {
go func() {
defer close(hc.stopCh)
defer close(hc.stopCh)
// The listener ownership goes to the server.
// The listener ownership goes to the server.
if err = hc.server.Serve(ln); !errors.Is(err, http.ErrServerClosed) && err != nil {
if err = hc.server.Serve(ln); !errors.Is(err, http.ErrServerClosed) && err != nil {
host.ReportFatalError(err)
host.ReportFatalError(err)
}
}
}()
}()
} else {
} else {
// collector pipeline health check
// collector pipeline health check
hc.exporter = newHealthCheckExporter()
hc.exporter = newHealthCheckExporter(hc.logger)
view.RegisterExporter(hc.exporter)
view.RegisterExporter(hc.exporter)
interval, err := time.ParseDuration(hc.config.CheckCollectorPipeline.Interval)
interval, err := time.ParseDuration(hc.config.CheckCollectorPipeline.Interval)
if err != nil {
if err != nil {
return err
return err
}
}
// ticker used by collector pipeline health check for rotation
// ticker used by collector pipeline health check for rotation
ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(time.Second)
mux := http.NewServeMux()
mux := http.NewServeMux()
mux.Handle(hc.config.Path, hc.handler())
mux.Handle(hc.config.Path, hc.handler())
hc.server.Handler = mux
hc.server.Handler = mux
hc.stopCh = make(chan struct{})
hc.stopCh = make(chan struct{})
go func() {
go func() {
defer close(hc.stopCh)
defer close(hc.stopCh)
defer view.UnregisterExporter(hc.exporter)
defer view.UnregisterExporter(hc.exporter)
go func() {
go func() {
for {
for {
select {
select {
case <-ticker.C:
case <-ticker.C:
hc.exporter.rotate(interval)
hc.exporter.rotate(interval)
case <-hc.stopCh:
case <-hc.stopCh:
return
return
}
}
}
}
}()
}()
if errHTTP := hc.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
if errHTTP := hc.server.Serve(ln); !errors.Is(errHTTP, http.ErrServerClosed) && errHTTP != nil {
host.ReportFatalError(errHTTP)
host.ReportFatalError(errHTTP)
}
}
}()
}()
}
}
return nil
return nil
}
}
// new handler function used for check collector pipeline
// new handler function used for check collector pipeline
func (hc *healthCheckExtension) handler() http.Handler {
func (hc *healthCheckExtension) handler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
return http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
if hc.check() && hc.state.Get() == healthcheck.Ready {
if hc.check() && hc.state.Get() == healthcheck.Ready {
w.WriteHeader(200)
w.WriteHeader(200)
} else {
} else {
w.WriteHeader(500)
w.WriteHeader(500)
}
}
})
})
}
}
func (hc *healthCheckExtension) check() bool {
func (hc *healthCheckExtension) check() bool {
return hc.exporter.checkHealthStatus(hc.config.CheckCollectorPipeline.ExporterFailureThreshold)
return hc.exporter.checkHealthStatus(hc.config.CheckCollectorPipeline.ExporterFailureThreshold)
}
}
func (hc *healthCheckExtension) Shutdown(context.Context) error {
func (hc *healthCheckExtension) Shutdown(context.Context) error {
err := hc.server.Close()
err := hc.server.Close()
if hc.stopCh != nil {
if hc.stopCh != nil {
<-hc.stopCh
<-hc.stopCh
}
}
return err
return err
}
}
func (hc *healthCheckExtension) Ready() error {
func (hc *healthCheckExtension) Ready() error {
hc.state.Set(healthcheck.Ready)
hc.state.Set(healthcheck.Ready)
return nil
return nil
}
}
func (hc *healthCheckExtension) NotReady() error {
func (hc *healthCheckExtension) NotReady() error {
hc.state.Set(healthcheck.Unavailable)
hc.state.Set(healthcheck.Unavailable)
return nil
return nil
}
}
func newServer(config Config, logger *zap.Logger) *healthCheckExtension {
func newServer(config Config, logger *zap.Logger) *healthCheckExtension {
hc := &healthCheckExtension{
hc := &healthCheckExtension{
config: config,
config: config,
logger: logger,
logger: logger,
state: healthcheck.New(),
state: healthcheck.New(),
server: http.Server{},
server: http.Server{},
}
}
hc.state.SetLogger(logger)
hc.state.SetLogger(logger)
return hc
return hc
}
}