Merge branch 'main' into celery-2-dramatiq
This commit is contained in:
@ -2,6 +2,7 @@ package web
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
@ -13,11 +14,15 @@ import (
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/securecookie"
|
||||
"github.com/pires/go-proxyproto"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"goauthentik.io/api/v3"
|
||||
"goauthentik.io/internal/config"
|
||||
"goauthentik.io/internal/constants"
|
||||
"goauthentik.io/internal/gounicorn"
|
||||
"goauthentik.io/internal/outpost/ak"
|
||||
"goauthentik.io/internal/outpost/proxyv2"
|
||||
"goauthentik.io/internal/utils"
|
||||
"goauthentik.io/internal/utils/web"
|
||||
@ -25,6 +30,12 @@ import (
|
||||
"goauthentik.io/internal/worker"
|
||||
)
|
||||
|
||||
const (
|
||||
IPCKeyFile = "authentik-core-ipc.key"
|
||||
MetricsKeyFile = "authentik-core-metrics.key"
|
||||
UnixSocketName = "authentik-core.sock"
|
||||
)
|
||||
|
||||
type WebServer struct {
|
||||
Bind string
|
||||
BindTLS bool
|
||||
@ -42,9 +53,10 @@ type WebServer struct {
|
||||
log *log.Entry
|
||||
upstreamClient *http.Client
|
||||
upstreamURL *url.URL
|
||||
}
|
||||
|
||||
const UnixSocketName = "authentik-core.sock"
|
||||
metricsKey string
|
||||
ipcKey string
|
||||
}
|
||||
|
||||
func NewWebServer() *WebServer {
|
||||
l := log.WithField("logger", "authentik.router")
|
||||
@ -78,7 +90,7 @@ func NewWebServer() *WebServer {
|
||||
mainRouter: mainHandler,
|
||||
loggingRouter: loggingHandler,
|
||||
log: l,
|
||||
gunicornReady: true,
|
||||
gunicornReady: false,
|
||||
upstreamClient: upstreamClient,
|
||||
upstreamURL: u,
|
||||
}
|
||||
@ -105,7 +117,59 @@ func NewWebServer() *WebServer {
|
||||
return ws
|
||||
}
|
||||
|
||||
func (ws *WebServer) prepareKeys() {
|
||||
tmp := os.TempDir()
|
||||
key := base64.StdEncoding.EncodeToString(securecookie.GenerateRandomKey(64))
|
||||
err := os.WriteFile(path.Join(tmp, MetricsKeyFile), []byte(key), 0o600)
|
||||
if err != nil {
|
||||
ws.log.WithError(err).Warning("failed to save metrics key")
|
||||
return
|
||||
}
|
||||
ws.metricsKey = key
|
||||
|
||||
key = base64.StdEncoding.EncodeToString(securecookie.GenerateRandomKey(64))
|
||||
err = os.WriteFile(path.Join(tmp, IPCKeyFile), []byte(key), 0o600)
|
||||
if err != nil {
|
||||
ws.log.WithError(err).Warning("failed to save ipc key")
|
||||
return
|
||||
}
|
||||
ws.ipcKey = key
|
||||
}
|
||||
|
||||
func (ws *WebServer) Start() {
|
||||
ws.prepareKeys()
|
||||
|
||||
u, err := url.Parse(fmt.Sprintf("http://%s%s", config.Get().Listen.HTTP, config.Get().Web.Path))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
apiConfig := api.NewConfiguration()
|
||||
apiConfig.Host = u.Host
|
||||
apiConfig.Scheme = u.Scheme
|
||||
apiConfig.HTTPClient = &http.Client{
|
||||
Transport: web.NewUserAgentTransport(
|
||||
constants.UserAgentIPC(),
|
||||
ak.GetTLSTransport(),
|
||||
),
|
||||
}
|
||||
apiConfig.Servers = api.ServerConfigurations{
|
||||
{
|
||||
URL: fmt.Sprintf("%sapi/v3", u.Path),
|
||||
},
|
||||
}
|
||||
apiConfig.AddDefaultHeader("Authorization", fmt.Sprintf("Bearer %s", ws.ipcKey))
|
||||
|
||||
// create the API client, with the transport
|
||||
apiClient := api.NewAPIClient(apiConfig)
|
||||
|
||||
// Init brand_tls here too since it requires an API Client,
|
||||
// so we just reuse the same one as the outpost uses
|
||||
tw := brand_tls.NewWatcher(apiClient)
|
||||
ws.BrandTLS = tw
|
||||
ws.g.AddHealthyCallback(func() {
|
||||
go tw.Start()
|
||||
})
|
||||
|
||||
go ws.runMetricsServer()
|
||||
go ws.attemptStartBackend()
|
||||
go ws.attemptStartWorker()
|
||||
@ -115,23 +179,23 @@ func (ws *WebServer) Start() {
|
||||
|
||||
func (ws *WebServer) attemptStartBackend() {
|
||||
for {
|
||||
if !ws.gunicornReady {
|
||||
if ws.gunicornReady {
|
||||
return
|
||||
}
|
||||
err := ws.g.Start()
|
||||
log.WithField("logger", "authentik.router").WithError(err).Warning("gunicorn process died, restarting")
|
||||
ws.log.WithError(err).Warning("gunicorn process died, restarting")
|
||||
if err != nil {
|
||||
log.WithField("logger", "authentik.router").WithError(err).Error("gunicorn failed to start, restarting")
|
||||
ws.log.WithError(err).Error("gunicorn failed to start, restarting")
|
||||
continue
|
||||
}
|
||||
failedChecks := 0
|
||||
for range time.NewTicker(30 * time.Second).C {
|
||||
if !ws.g.IsRunning() {
|
||||
log.WithField("logger", "authentik.router").Warningf("gunicorn process failed healthcheck %d times", failedChecks)
|
||||
ws.log.Warningf("gunicorn process failed healthcheck %d times", failedChecks)
|
||||
failedChecks += 1
|
||||
}
|
||||
if failedChecks >= 3 {
|
||||
log.WithField("logger", "authentik.router").WithError(err).Error("gunicorn process failed healthcheck three times, restarting")
|
||||
ws.log.WithError(err).Error("gunicorn process failed healthcheck three times, restarting")
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -155,6 +219,15 @@ func (ws *WebServer) upstreamHttpClient() *http.Client {
|
||||
func (ws *WebServer) Shutdown() {
|
||||
ws.log.Info("shutting down gunicorn")
|
||||
ws.g.Kill()
|
||||
tmp := os.TempDir()
|
||||
err := os.Remove(path.Join(tmp, MetricsKeyFile))
|
||||
if err != nil {
|
||||
ws.log.WithError(err).Warning("failed to remove metrics key file")
|
||||
}
|
||||
err = os.Remove(path.Join(tmp, IPCKeyFile))
|
||||
if err != nil {
|
||||
ws.log.WithError(err).Warning("failed to remove ipc key file")
|
||||
}
|
||||
ws.stop <- struct{}{}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user