Traefik 2.5 源码分析1 – 程序启动部分
目录
1. 前言
Traefik版本为2.5.2,从今天开始,计划陆续看完Traefik的核心部分的源码。 本文是第一部分,Traefik的程序启动分析。
2. 主流程
main.go在cmd/traefik/traefik.go内,其主要流程为:
先构造3个Command启动器,分别是traefik,healthcheck,version,Command结构如下:
源码文件:github.com/traefik/paerser@v0.1.4/cli/commands.go
package cli
type Command struct {
Name string
Description string
Configuration interface{}
Resources []ResourceLoader
Run func([]string) error
CustomHelpFunc func(io.Writer, *Command) error
Hidden bool
// AllowArg if not set, disallows any argument that is not a known command or a sub-command.
AllowArg bool
subCommands []*Command
}
可以看到,每个Command都可以配置名字,资源,Run函数等。
然后通过cli.Execute(cmdTraefik)来启动上面3个服务,也就是调用Command内的Run函数。
ResourceLoader是用于配置的加载,有文件,参数,环境变量3种加载方法,下面会提到。
源码文件:cmd/traefik/traefik.go
package main
//启动函数
func main() {
// traefik config inits
tConfig := cmd.NewTraefikConfiguration()
//Traefik可以从File,Flag,ENV三个地方loader配置
//ResourceLoader数据类型定义了一个Load接口
//将pkg/cli包内的3个loader引入一个cli.ResourceLoader类型的list
loaders := []cli.ResourceLoader{&tcli.FileLoader{}, &tcli.FlagLoader{}, &tcli.EnvLoader{}}
//构造一个新的cli.Command类型初始数据,用于启动主程序,名字为traefik
//启动func 为runCmd
cmdTraefik := &cli.Command{
Name: "traefik",
Description: `Traefik is a modern HTTP reverse proxy and load balancer made to deploy microservices with ease.
Complete documentation is available at https://traefik.io`,
Configuration: tConfig,
Resources: loaders,
Run: func(_ []string) error {
return runCmd(&tConfig.Configuration)
},
}
//增加health check服务
//名字为healthcheck,func为healthcheck包内的runCmd
err := cmdTraefik.AddCommand(healthcheck.NewCmd(&tConfig.Configuration, loaders))
if err != nil {
stdlog.Println(err)
os.Exit(1)
}
//增加version服务
//名字为version,启动func内主要功能为输出版本号
err = cmdTraefik.AddCommand(cmdVersion.NewCmd())
if err != nil {
stdlog.Println(err)
os.Exit(1)
}
//cli.Execute会检查命令行参数,并通过58行的3个加载器来加载配置
//然后执行cmdTraefik.Run
//对主服务traefik来说也就是main.runCmd()
//对healthcheck服务来说是healthcheck.runCmd()
err = cli.Execute(cmdTraefik)
if err != nil {
stdlog.Println(err)
logrus.Exit(1)
}
logrus.Exit(0)
}
下面是ResourceLoader的数据结构:
源码文件:github.com/traefik/paerser@v0.1.4/cli/loader.go
package cli
// ResourceLoader is a configuration resource loader.
type ResourceLoader interface {
// Load populates cmd.Configuration, optionally using args to do so.
Load(args []string, cmd *Command) (bool, error)
}
ResourceLoader是一个接口,paerser模块还通过这个接口实现了FileLoader(文件加载),FlagLoader(命令行参数加载),EnvLoader(环境变量加载)这三个配置加载器。
3. healthcheck服务
healthcheck服务比较简单,先看这个服务的源码再看traefik主服务,会更容易理解。
healthcheck服务的主要逻辑是:
- 先找到静态配置内的健康检查相关的配置,如果没有自定义EntryPoint,就把traefik服务当成EntryPoint
- 构建http 请求,超时为5s,这个超时代码内写死了,不可修改,请求URL为http://EntryPoint地址/ping,除了EntryPoint地址,其它部分也是写死的。
- 执行http 请求,并返回http Response
下面是主要源码,非常的简单。
源码文件:cmd/healthcheck/healthcheck.go
package healthcheck
func runCmd(traefikConfiguration *static.Configuration) func(_ []string) error {
return func(_ []string) error {
//如果healthcheck相关的配置没有设置,将选择默认最优配置
traefikConfiguration.SetEffectiveConfiguration()
//调用Do来执行http请求,并返回结果
resp, errPing := Do(*traefikConfiguration)
if resp != nil {
resp.Body.Close()
}
if errPing != nil {
fmt.Printf("Error calling healthcheck: %s\n", errPing)
os.Exit(1)
}
if resp.StatusCode != http.StatusOK {
fmt.Printf("Bad healthcheck status: %s\n", resp.Status)
os.Exit(1)
}
fmt.Printf("OK: %s\n", resp.Request.URL)
os.Exit(0)
return nil
}
}
// Do try to do a healthcheck.
func Do(staticConfiguration static.Configuration) (*http.Response, error) {
if staticConfiguration.Ping == nil {
return nil, errors.New("please enable `ping` to use health check")
}
//健康检查可以指定EntryPoint返回结果,如果没有指定,默认为traefik自身服务
ep := staticConfiguration.Ping.EntryPoint
if ep == "" {
ep = "traefik"
}
//取到EntryPoint
pingEntryPoint, ok := staticConfiguration.EntryPoints[ep]
if !ok {
return nil, fmt.Errorf("ping: missing %s entry point", ep)
}
//通过上面的EntryPoint,发起一个http请求,超时为5s,所以健康检查的超时为5s
client := &http.Client{Timeout: 5 * time.Second}
protocol := "http"
path := "/"
//执行http请求,返回http Response
return client.Head(protocol + "://" + pingEntryPoint.GetAddress() + path + "ping")
}
4. traefik主服务
traefik主服务的启动入口函数是main.runCmd()。主要流程为:
- 初始化日志系统
- 解析配置
- 通过main.setupServer()生成server
- 通过server.Start()启动server
核心在main.setupServer()及server.Start()。
下面是启动入口函数main.runCmd(),只保留了核心代码。
源码文件:cmd/traefik/traefik.go
package main
func runCmd(staticConfiguration *static.Configuration) error {
//初始化日志
configureLogging(staticConfiguration)
//配置proxy
http.DefaultTransport.(*http.Transport).Proxy = http.ProxyFromEnvironment
//配置轮循模式的默认权重为0
if err := roundrobin.SetDefaultWeight(0); err != nil {
log.WithoutContext().Errorf("Could not set round robin default weight: %v", err)
}
//针对配置文件中没有配置的参数,配置上默认的最优配置
staticConfiguration.SetEffectiveConfiguration()
if err := staticConfiguration.ValidateConfiguration(); err != nil {
return err
}
log.WithoutContext().Infof("Traefik version %s built on %s", version.Version, version.BuildDate)
//解析配置文件
jsonConf, err := json.Marshal(staticConfiguration)
if err != nil {
log.WithoutContext().Errorf("Could not marshal static configuration: %v", err)
log.WithoutContext().Debugf("Static configuration loaded [struct] %#v", staticConfiguration)
} else {
log.WithoutContext().Debugf("Static configuration loaded %s", string(jsonConf))
}
//生成server的配置
svr, err := setupServer(staticConfiguration)
if err != nil {
return err
}
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
if staticConfiguration.Ping != nil {
//ctx如终止将导致ping EntryPont返回非200状态码
staticConfiguration.Ping.WithContext(ctx)
}
//启动server
svr.Start(ctx)
//runCmd结束时执行close优雅退出
defer svr.Close()
svr.Wait()
log.WithoutContext().Info("Shutting down")
return nil
}
func setupServer(staticConfiguration *static.Configuration) (*server.Server, error) {
// 查找静态配置文件,生成Provider
providerAggregator := aggregator.NewProviderAggregator(*staticConfiguration.Providers)
ctx := context.Background()
routinesPool := safe.NewPool(ctx)
// adds internal provider
// 增加一个内部provider
err := providerAggregator.AddProvider(traefik.New(*staticConfiguration))
if err != nil {
return nil, err
}
// ACME
//ACME是自动获得Let's Encrypt https证书的客户端
//acmeProviders是一个自动获得Let's Encrypt https证书的提供者
tlsManager := traefiktls.NewManager()
httpChallengeProvider := acme.NewChallengeHTTP()
// we need to wait at least 2 times the ProvidersThrottleDuration to be sure to handle the challenge.
tlsChallengeProvider := acme.NewChallengeTLSALPN(time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration) * 2)
err = providerAggregator.AddProvider(tlsChallengeProvider)
if err != nil {
return nil, err
}
acmeProviders := initACMEProvider(staticConfiguration, &providerAggregator, tlsManager, httpChallengeProvider, tlsChallengeProvider)
// Entrypoints
// 生成TCP及UDP的EntryPoints
serverEntryPointsTCP, err := server.NewTCPEntryPoints(staticConfiguration.EntryPoints)
if err != nil {
return nil, err
}
serverEntryPointsUDP, err := server.NewUDPEntryPoints(staticConfiguration.EntryPoints)
if err != nil {
return nil, err
}
// Pilot
// Pilot是traefik官方的一个统一的监控系统
var aviator *pilot.Pilot
var pilotRegistry *metrics.PilotRegistry
if isPilotEnabled(staticConfiguration) {
pilotRegistry = metrics.RegisterPilot()
aviator = pilot.New(staticConfiguration.Pilot.Token, pilotRegistry, routinesPool)
routinesPool.GoCtx(func(ctx context.Context) {
aviator.Tick(ctx)
})
}
if staticConfiguration.Pilot != nil {
version.PilotEnabled = staticConfiguration.Pilot.Dashboard
}
// Plugins
// 生成第三方的Plugins Builder
pluginBuilder, err := createPluginBuilder(staticConfiguration)
if err != nil {
return nil, err
}
// Providers plugins
// 生成第三方的Providers plugins Builder
for name, conf := range staticConfiguration.Providers.Plugin {
p, err := pluginBuilder.BuildProvider(name, conf)
if err != nil {
return nil, fmt.Errorf("plugin: failed to build provider: %w", err)
}
err = providerAggregator.AddProvider(p)
if err != nil {
return nil, fmt.Errorf("plugin: failed to add provider: %w", err)
}
}
// Metrics
metricRegistries := registerMetricClients(staticConfiguration.Metrics)
if pilotRegistry != nil {
metricRegistries = append(metricRegistries, pilotRegistry)
}
metricsRegistry := metrics.NewMultiRegistry(metricRegistries)
// Service manager factory
roundTripperManager := service.NewRoundTripperManager()
acmeHTTPHandler := getHTTPChallengeHandler(acmeProviders, httpChallengeProvider)
managerFactory := service.NewManagerFactory(*staticConfiguration, routinesPool, metricsRegistry, roundTripperManager, acmeHTTPHandler)
// Router factory
accessLog := setupAccessLog(staticConfiguration.AccessLog)
chainBuilder := middleware.NewChainBuilder(*staticConfiguration, metricsRegistry, accessLog)
routerFactory := server.NewRouterFactory(*staticConfiguration, managerFactory, tlsManager, chainBuilder, pluginBuilder, metricsRegistry)
return server.NewServer(routinesPool, serverEntryPointsTCP, serverEntryPointsUDP, watcher, chainBuilder, accessLog), nil
}
上面的main.setupServer()只摘录了部分代码。其主要做了下面这些事:
- 查找静态配置文件,生成Provider
- 生成TCP及UDP的EntryPoints
- 生成第三方的Plugins Builder
- 生成第三方的Providers plugins Builder
- 以及生成其它traefik相关的对象
- 最后通过上面的对象组合,返回的是Server对象(参考server.NewServer)
server.Start()启动服务代码很简单,就是启动各个对象中的Start()函数。可参考以下代码:
源码文件:pkg/server/server.go
package server
// Server结构体:
// Server is the reverse-proxy/load-balancer engine.
type Server struct {
watcher *ConfigurationWatcher
tcpEntryPoints TCPEntryPoints
udpEntryPoints UDPEntryPoints
chainBuilder *middleware.ChainBuilder
accessLoggerMiddleware *accesslog.Handler
signals chan os.Signal
stopChan chan bool
routinesPool *safe.Pool
}
// NewServer returns an initialized Server.
func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, entryPointsUDP UDPEntryPoints, watcher *ConfigurationWatcher,
chainBuilder *middleware.ChainBuilder, accessLoggerMiddleware *accesslog.Handler) *Server {
srv := &Server{
watcher: watcher,
tcpEntryPoints: entryPoints,
chainBuilder: chainBuilder,
accessLoggerMiddleware: accessLoggerMiddleware,
signals: make(chan os.Signal, 1),
stopChan: make(chan bool, 1),
routinesPool: routinesPool,
udpEntryPoints: entryPointsUDP,
}
srv.configureSignals()
return srv
}
// Start starts the server and Stop/Close it when context is Done.
func (s *Server) Start(ctx context.Context) {
go func() {
<-ctx.Done()
logger := log.FromContext(ctx)
logger.Info("I have to go...")
logger.Info("Stopping server gracefully")
s.Stop()
}()
// 调用tcp及udp的EntryPoints的Start()启动服务
s.tcpEntryPoints.Start()
s.udpEntryPoints.Start()
// 启动watcher服务
s.watcher.Start()
// 监听系统信号,用于配置重载等
s.routinesPool.GoCtx(s.listenSignals)
}