root: initial merging of outpost and main project
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
106
internal/outpost/ak/api.go
Normal file
106
internal/outpost/ak/api.go
Normal file
@ -0,0 +1,106 @@
|
||||
package ak
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/recws-org/recws"
|
||||
pkg "goauthentik.io/internal/outpost"
|
||||
"goauthentik.io/outpost/api"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const ConfigLogLevel = "log_level"
|
||||
const ConfigErrorReportingEnabled = "error_reporting_enabled"
|
||||
const ConfigErrorReportingEnvironment = "error_reporting_environment"
|
||||
|
||||
// APIController main controller which connects to the authentik api via http and ws
|
||||
type APIController struct {
|
||||
Client *api.APIClient
|
||||
token string
|
||||
|
||||
Server Outpost
|
||||
|
||||
logger *log.Entry
|
||||
|
||||
reloadOffset time.Duration
|
||||
|
||||
wsConn *recws.RecConn
|
||||
instanceUUID uuid.UUID
|
||||
}
|
||||
|
||||
// NewAPIController initialise new API Controller instance from URL and API token
|
||||
func NewAPIController(akURL url.URL, token string) *APIController {
|
||||
config := api.NewConfiguration()
|
||||
config.Host = akURL.Host
|
||||
config.Scheme = akURL.Scheme
|
||||
config.HTTPClient = &http.Client{
|
||||
Transport: SetUserAgent(GetTLSTransport(), pkg.UserAgent()),
|
||||
}
|
||||
config.AddDefaultHeader("Authorization", fmt.Sprintf("Bearer %s", token))
|
||||
|
||||
// create the API client, with the transport
|
||||
apiClient := api.NewAPIClient(config)
|
||||
|
||||
log := log.WithField("logger", "authentik.outpost.ak-api-controller")
|
||||
|
||||
// Because we don't know the outpost UUID, we simply do a list and pick the first
|
||||
// The service account this token belongs to should only have access to a single outpost
|
||||
outposts, _, err := apiClient.OutpostsApi.OutpostsInstancesList(context.Background()).Execute()
|
||||
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to fetch configuration")
|
||||
os.Exit(1)
|
||||
}
|
||||
outpost := outposts.Results[0]
|
||||
doGlobalSetup(outpost.Config)
|
||||
|
||||
ac := &APIController{
|
||||
Client: apiClient,
|
||||
token: token,
|
||||
|
||||
logger: log,
|
||||
|
||||
reloadOffset: time.Duration(rand.Intn(10)) * time.Second,
|
||||
instanceUUID: uuid.New(),
|
||||
}
|
||||
ac.logger.Debugf("HA Reload offset: %s", ac.reloadOffset)
|
||||
ac.initWS(akURL, strfmt.UUID(outpost.Pk))
|
||||
return ac
|
||||
}
|
||||
|
||||
// Start Starts all handlers, non-blocking
|
||||
func (a *APIController) Start() error {
|
||||
err := a.Server.Refresh()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to run initial refresh")
|
||||
}
|
||||
go func() {
|
||||
a.logger.Debug("Starting WS Handler...")
|
||||
a.startWSHandler()
|
||||
}()
|
||||
go func() {
|
||||
a.logger.Debug("Starting WS Health notifier...")
|
||||
a.startWSHealth()
|
||||
}()
|
||||
go func() {
|
||||
a.logger.Debug("Starting Interval updater...")
|
||||
a.startIntervalUpdater()
|
||||
}()
|
||||
go func() {
|
||||
err := a.Server.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
20
internal/outpost/ak/api_uag.go
Normal file
20
internal/outpost/ak/api_uag.go
Normal file
@ -0,0 +1,20 @@
|
||||
package ak
|
||||
|
||||
import "net/http"
|
||||
|
||||
func SetUserAgent(inner http.RoundTripper, userAgent string) http.RoundTripper {
|
||||
return &addUGA{
|
||||
inner: inner,
|
||||
Agent: userAgent,
|
||||
}
|
||||
}
|
||||
|
||||
type addUGA struct {
|
||||
inner http.RoundTripper
|
||||
Agent string
|
||||
}
|
||||
|
||||
func (ug *addUGA) RoundTrip(r *http.Request) (*http.Response, error) {
|
||||
r.Header.Set("User-Agent", ug.Agent)
|
||||
return ug.inner.RoundTrip(r)
|
||||
}
|
||||
16
internal/outpost/ak/api_update.go
Normal file
16
internal/outpost/ak/api_update.go
Normal file
@ -0,0 +1,16 @@
|
||||
package ak
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"goauthentik.io/outpost/api"
|
||||
)
|
||||
|
||||
func (a *APIController) Update() ([]api.ProxyOutpostConfig, error) {
|
||||
providers, _, err := a.Client.OutpostsApi.OutpostsProxyList(context.Background()).Execute()
|
||||
if err != nil {
|
||||
a.logger.WithError(err).Error("Failed to fetch providers")
|
||||
return nil, err
|
||||
}
|
||||
return providers.Results, nil
|
||||
}
|
||||
128
internal/outpost/ak/api_ws.go
Normal file
128
internal/outpost/ak/api_ws.go
Normal file
@ -0,0 +1,128 @@
|
||||
package ak
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/recws-org/recws"
|
||||
pkg "goauthentik.io/internal/outpost"
|
||||
)
|
||||
|
||||
func (ac *APIController) initWS(akURL url.URL, outpostUUID strfmt.UUID) {
|
||||
pathTemplate := "%s://%s/ws/outpost/%s/"
|
||||
scheme := strings.ReplaceAll(akURL.Scheme, "http", "ws")
|
||||
|
||||
authHeader := fmt.Sprintf("Bearer %s", ac.token)
|
||||
|
||||
header := http.Header{
|
||||
"Authorization": []string{authHeader},
|
||||
"User-Agent": []string{pkg.UserAgent()},
|
||||
}
|
||||
|
||||
value, set := os.LookupEnv("AUTHENTIK_INSECURE")
|
||||
if !set {
|
||||
value = "false"
|
||||
}
|
||||
|
||||
ws := &recws.RecConn{
|
||||
NonVerbose: true,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: strings.ToLower(value) == "true",
|
||||
},
|
||||
}
|
||||
ws.Dial(fmt.Sprintf(pathTemplate, scheme, akURL.Host, outpostUUID.String()), header)
|
||||
|
||||
ac.logger.WithField("logger", "authentik.outpost.ak-ws").WithField("outpost", outpostUUID.String()).Debug("connecting to authentik")
|
||||
|
||||
ac.wsConn = ws
|
||||
// Send hello message with our version
|
||||
msg := websocketMessage{
|
||||
Instruction: WebsocketInstructionHello,
|
||||
Args: map[string]interface{}{
|
||||
"version": pkg.VERSION,
|
||||
"buildHash": pkg.BUILD(),
|
||||
"uuid": ac.instanceUUID.String(),
|
||||
},
|
||||
}
|
||||
err := ws.WriteJSON(msg)
|
||||
if err != nil {
|
||||
ac.logger.WithField("logger", "authentik.outpost.ak-ws").WithError(err).Warning("Failed to hello to authentik")
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown Gracefully stops all workers, disconnects from websocket
|
||||
func (ac *APIController) Shutdown() {
|
||||
// Cleanly close the connection by sending a close message and then
|
||||
// waiting (with timeout) for the server to close the connection.
|
||||
err := ac.wsConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||
if err != nil {
|
||||
ac.logger.Println("write close:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *APIController) startWSHandler() {
|
||||
logger := ac.logger.WithField("loop", "ws-handler")
|
||||
for {
|
||||
if !ac.wsConn.IsConnected() {
|
||||
continue
|
||||
}
|
||||
var wsMsg websocketMessage
|
||||
err := ac.wsConn.ReadJSON(&wsMsg)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warning("ws write error, reconnecting")
|
||||
ac.wsConn.CloseAndReconnect()
|
||||
continue
|
||||
}
|
||||
if wsMsg.Instruction == WebsocketInstructionTriggerUpdate {
|
||||
time.Sleep(ac.reloadOffset)
|
||||
logger.Debug("Got update trigger...")
|
||||
err := ac.Server.Refresh()
|
||||
if err != nil {
|
||||
logger.WithError(err).Debug("Failed to update")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *APIController) startWSHealth() {
|
||||
ticker := time.NewTicker(time.Second * 10)
|
||||
for ; true; <-ticker.C {
|
||||
if !ac.wsConn.IsConnected() {
|
||||
continue
|
||||
}
|
||||
aliveMsg := websocketMessage{
|
||||
Instruction: WebsocketInstructionHello,
|
||||
Args: map[string]interface{}{
|
||||
"version": pkg.VERSION,
|
||||
"buildHash": pkg.BUILD(),
|
||||
"uuid": ac.instanceUUID.String(),
|
||||
},
|
||||
}
|
||||
err := ac.wsConn.WriteJSON(aliveMsg)
|
||||
ac.logger.WithField("loop", "ws-health").Trace("hello'd")
|
||||
if err != nil {
|
||||
ac.logger.WithField("loop", "ws-health").WithError(err).Warning("ws write error, reconnecting")
|
||||
ac.wsConn.CloseAndReconnect()
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *APIController) startIntervalUpdater() {
|
||||
logger := ac.logger.WithField("loop", "interval-updater")
|
||||
ticker := time.NewTicker(time.Second * 150)
|
||||
for ; true; <-ticker.C {
|
||||
err := ac.Server.Refresh()
|
||||
if err != nil {
|
||||
logger.WithError(err).Debug("Failed to update")
|
||||
}
|
||||
}
|
||||
}
|
||||
17
internal/outpost/ak/api_ws_msg.go
Normal file
17
internal/outpost/ak/api_ws_msg.go
Normal file
@ -0,0 +1,17 @@
|
||||
package ak
|
||||
|
||||
type websocketInstruction int
|
||||
|
||||
const (
|
||||
// WebsocketInstructionAck Code used to acknowledge a previous message
|
||||
WebsocketInstructionAck websocketInstruction = 0
|
||||
// WebsocketInstructionHello Code used to send a healthcheck keepalive
|
||||
WebsocketInstructionHello websocketInstruction = 1
|
||||
// WebsocketInstructionTriggerUpdate Code received to trigger a config update
|
||||
WebsocketInstructionTriggerUpdate websocketInstruction = 2
|
||||
)
|
||||
|
||||
type websocketMessage struct {
|
||||
Instruction websocketInstruction `json:"instruction"`
|
||||
Args map[string]interface{} `json:"args"`
|
||||
}
|
||||
63
internal/outpost/ak/cert.go
Normal file
63
internal/outpost/ak/cert.go
Normal file
@ -0,0 +1,63 @@
|
||||
package ak
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"crypto/rsa"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/pem"
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// GenerateSelfSignedCert Generate a self-signed TLS Certificate, to be used as fallback
|
||||
func GenerateSelfSignedCert() (tls.Certificate, error) {
|
||||
priv, err := rsa.GenerateKey(rand.Reader, 2048)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to generate private key: %v", err)
|
||||
return tls.Certificate{}, err
|
||||
}
|
||||
|
||||
keyUsage := x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment
|
||||
|
||||
notBefore := time.Now()
|
||||
notAfter := notBefore.Add(365 * 24 * time.Hour)
|
||||
|
||||
serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 128)
|
||||
serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to generate serial number: %v", err)
|
||||
return tls.Certificate{}, err
|
||||
}
|
||||
|
||||
template := x509.Certificate{
|
||||
SerialNumber: serialNumber,
|
||||
Subject: pkix.Name{
|
||||
Organization: []string{"authentik"},
|
||||
CommonName: "authentik Proxy default certificate",
|
||||
},
|
||||
NotBefore: notBefore,
|
||||
NotAfter: notAfter,
|
||||
|
||||
KeyUsage: keyUsage,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
|
||||
BasicConstraintsValid: true,
|
||||
}
|
||||
|
||||
template.DNSNames = []string{"*"}
|
||||
|
||||
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv)
|
||||
if err != nil {
|
||||
log.Warning(err)
|
||||
}
|
||||
pemBytes := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
|
||||
privBytes, err := x509.MarshalPKCS8PrivateKey(priv)
|
||||
if err != nil {
|
||||
log.Warning(err)
|
||||
}
|
||||
privPemByes := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: privBytes})
|
||||
return tls.X509KeyPair(pemBytes, privPemByes)
|
||||
}
|
||||
68
internal/outpost/ak/global.go
Normal file
68
internal/outpost/ak/global.go
Normal file
@ -0,0 +1,68 @@
|
||||
package ak
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
httptransport "github.com/go-openapi/runtime/client"
|
||||
log "github.com/sirupsen/logrus"
|
||||
pkg "goauthentik.io/internal/outpost"
|
||||
)
|
||||
|
||||
func doGlobalSetup(config map[string]interface{}) {
|
||||
log.SetFormatter(&log.JSONFormatter{
|
||||
FieldMap: log.FieldMap{
|
||||
log.FieldKeyMsg: "event",
|
||||
log.FieldKeyTime: "timestamp",
|
||||
},
|
||||
})
|
||||
switch config[ConfigLogLevel].(string) {
|
||||
case "trace":
|
||||
log.SetLevel(log.TraceLevel)
|
||||
case "debug":
|
||||
log.SetLevel(log.DebugLevel)
|
||||
case "info":
|
||||
log.SetLevel(log.InfoLevel)
|
||||
case "warning":
|
||||
log.SetLevel(log.WarnLevel)
|
||||
case "error":
|
||||
log.SetLevel(log.ErrorLevel)
|
||||
default:
|
||||
log.SetLevel(log.DebugLevel)
|
||||
}
|
||||
log.WithField("buildHash", pkg.BUILD()).WithField("version", pkg.VERSION).Info("Starting authentik outpost")
|
||||
|
||||
var dsn string
|
||||
if config[ConfigErrorReportingEnabled].(bool) {
|
||||
dsn = "https://a579bb09306d4f8b8d8847c052d3a1d3@sentry.beryju.org/8"
|
||||
log.Debug("Error reporting enabled")
|
||||
}
|
||||
|
||||
err := sentry.Init(sentry.ClientOptions{
|
||||
Dsn: dsn,
|
||||
Environment: config[ConfigErrorReportingEnvironment].(string),
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatalf("sentry.Init: %s", err)
|
||||
}
|
||||
|
||||
defer sentry.Flush(2 * time.Second)
|
||||
}
|
||||
|
||||
// GetTLSTransport Get a TLS transport instance, that skips verification if configured via environment variables.
|
||||
func GetTLSTransport() http.RoundTripper {
|
||||
value, set := os.LookupEnv("AUTHENTIK_INSECURE")
|
||||
if !set {
|
||||
value = "false"
|
||||
}
|
||||
tlsTransport, err := httptransport.TLSTransport(httptransport.TLSClientOptions{
|
||||
InsecureSkipVerify: strings.ToLower(value) == "true",
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return tlsTransport
|
||||
}
|
||||
6
internal/outpost/ak/outpost.go
Normal file
6
internal/outpost/ak/outpost.go
Normal file
@ -0,0 +1,6 @@
|
||||
package ak
|
||||
|
||||
type Outpost interface {
|
||||
Start() error
|
||||
Refresh() error
|
||||
}
|
||||
Reference in New Issue
Block a user