目录

1. 前言

Traefik版本为2.5.2,从今天开始,计划陆续看完Traefik的核心部分的源码。 本文是第一部分,Traefik的程序启动分析。

2. 主流程

main.go在cmd/traefik/traefik.go内,其主要流程为:

image

先构造3个Command启动器,分别是traefik,healthcheck,version,Command结构如下:

源码文件:github.com/traefik/paerser@v0.1.4/cli/commands.go

package cli

type Command struct {
	Name           string
	Description    string
	Configuration  interface{}
	Resources      []ResourceLoader
	Run            func([]string) error
	CustomHelpFunc func(io.Writer, *Command) error
	Hidden         bool
	// AllowArg if not set, disallows any argument that is not a known command or a sub-command.
	AllowArg    bool
	subCommands []*Command
}

可以看到,每个Command都可以配置名字,资源,Run函数等。

然后通过cli.Execute(cmdTraefik)来启动上面3个服务,也就是调用Command内的Run函数。

ResourceLoader是用于配置的加载,有文件,参数,环境变量3种加载方法,下面会提到。

源码文件:cmd/traefik/traefik.go

package main

//启动函数
func main() {
	// traefik config inits
	tConfig := cmd.NewTraefikConfiguration()

	//Traefik可以从File,Flag,ENV三个地方loader配置
	//ResourceLoader数据类型定义了一个Load接口
	//将pkg/cli包内的3个loader引入一个cli.ResourceLoader类型的list
	loaders := []cli.ResourceLoader{&tcli.FileLoader{}, &tcli.FlagLoader{}, &tcli.EnvLoader{}}

	//构造一个新的cli.Command类型初始数据,用于启动主程序,名字为traefik
	//启动func 为runCmd
	cmdTraefik := &cli.Command{
		Name: "traefik",
		Description: `Traefik is a modern HTTP reverse proxy and load balancer made to deploy microservices with ease.
Complete documentation is available at https://traefik.io`,
		Configuration: tConfig,
		Resources:     loaders,
		Run: func(_ []string) error {
			return runCmd(&tConfig.Configuration)
		},
	}

	//增加health check服务
	//名字为healthcheck,func为healthcheck包内的runCmd
	err := cmdTraefik.AddCommand(healthcheck.NewCmd(&tConfig.Configuration, loaders))
	if err != nil {
		stdlog.Println(err)
		os.Exit(1)
	}

	//增加version服务
	//名字为version,启动func内主要功能为输出版本号
	err = cmdTraefik.AddCommand(cmdVersion.NewCmd())
	if err != nil {
		stdlog.Println(err)
		os.Exit(1)
	}

	//cli.Execute会检查命令行参数,并通过58行的3个加载器来加载配置
	//然后执行cmdTraefik.Run
	//对主服务traefik来说也就是main.runCmd()
	//对healthcheck服务来说是healthcheck.runCmd()
	err = cli.Execute(cmdTraefik)
	if err != nil {
		stdlog.Println(err)
		logrus.Exit(1)
	}

	logrus.Exit(0)
}

下面是ResourceLoader的数据结构:

源码文件:github.com/traefik/paerser@v0.1.4/cli/loader.go

package cli

// ResourceLoader is a configuration resource loader.
type ResourceLoader interface {
	// Load populates cmd.Configuration, optionally using args to do so.
	Load(args []string, cmd *Command) (bool, error)
}

ResourceLoader是一个接口,paerser模块还通过这个接口实现了FileLoader(文件加载),FlagLoader(命令行参数加载),EnvLoader(环境变量加载)这三个配置加载器。

3. healthcheck服务

healthcheck服务比较简单,先看这个服务的源码再看traefik主服务,会更容易理解。

healthcheck服务的主要逻辑是:

  1. 先找到静态配置内的健康检查相关的配置,如果没有自定义EntryPoint,就把traefik服务当成EntryPoint
  2. 构建http 请求,超时为5s,这个超时代码内写死了,不可修改,请求URL为http://EntryPoint地址/ping,除了EntryPoint地址,其它部分也是写死的。
  3. 执行http 请求,并返回http Response

下面是主要源码,非常的简单。

源码文件:cmd/healthcheck/healthcheck.go

package healthcheck

func runCmd(traefikConfiguration *static.Configuration) func(_ []string) error {
	return func(_ []string) error {
		//如果healthcheck相关的配置没有设置,将选择默认最优配置
		traefikConfiguration.SetEffectiveConfiguration()

		//调用Do来执行http请求,并返回结果
		resp, errPing := Do(*traefikConfiguration)
		if resp != nil {
			resp.Body.Close()
		}
		if errPing != nil {
			fmt.Printf("Error calling healthcheck: %s\n", errPing)
			os.Exit(1)
		}

		if resp.StatusCode != http.StatusOK {
			fmt.Printf("Bad healthcheck status: %s\n", resp.Status)
			os.Exit(1)
		}
		fmt.Printf("OK: %s\n", resp.Request.URL)
		os.Exit(0)
		return nil
	}
}

// Do try to do a healthcheck.
func Do(staticConfiguration static.Configuration) (*http.Response, error) {
	if staticConfiguration.Ping == nil {
		return nil, errors.New("please enable `ping` to use health check")
	}

	//健康检查可以指定EntryPoint返回结果,如果没有指定,默认为traefik自身服务
	ep := staticConfiguration.Ping.EntryPoint
	if ep == "" {
		ep = "traefik"
	}

	//取到EntryPoint
	pingEntryPoint, ok := staticConfiguration.EntryPoints[ep]
	if !ok {
		return nil, fmt.Errorf("ping: missing %s entry point", ep)
	}

	//通过上面的EntryPoint,发起一个http请求,超时为5s,所以健康检查的超时为5s
	client := &http.Client{Timeout: 5 * time.Second}
	protocol := "http"

	path := "/"

	//执行http请求,返回http Response
	return client.Head(protocol + "://" + pingEntryPoint.GetAddress() + path + "ping")
}

4. traefik主服务

traefik主服务的启动入口函数是main.runCmd()。主要流程为:

  1. 初始化日志系统
  2. 解析配置
  3. 通过main.setupServer()生成server
  4. 通过server.Start()启动server

核心在main.setupServer()及server.Start()。

下面是启动入口函数main.runCmd(),只保留了核心代码。

源码文件:cmd/traefik/traefik.go

package main

func runCmd(staticConfiguration *static.Configuration) error {
	//初始化日志
	configureLogging(staticConfiguration)

	//配置proxy
	http.DefaultTransport.(*http.Transport).Proxy = http.ProxyFromEnvironment

	//配置轮循模式的默认权重为0
	if err := roundrobin.SetDefaultWeight(0); err != nil {
		log.WithoutContext().Errorf("Could not set round robin default weight: %v", err)
	}

	//针对配置文件中没有配置的参数,配置上默认的最优配置
	staticConfiguration.SetEffectiveConfiguration()
	if err := staticConfiguration.ValidateConfiguration(); err != nil {
		return err
	}

	log.WithoutContext().Infof("Traefik version %s built on %s", version.Version, version.BuildDate)

	//解析配置文件
	jsonConf, err := json.Marshal(staticConfiguration)
	if err != nil {
		log.WithoutContext().Errorf("Could not marshal static configuration: %v", err)
		log.WithoutContext().Debugf("Static configuration loaded [struct] %#v", staticConfiguration)
	} else {
		log.WithoutContext().Debugf("Static configuration loaded %s", string(jsonConf))
	}

	//生成server的配置
	svr, err := setupServer(staticConfiguration)
	if err != nil {
		return err
	}

	ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)

	if staticConfiguration.Ping != nil {
		//ctx如终止将导致ping EntryPont返回非200状态码
		staticConfiguration.Ping.WithContext(ctx)
	}

	//启动server
	svr.Start(ctx)
	//runCmd结束时执行close优雅退出
	defer svr.Close()

	svr.Wait()
	log.WithoutContext().Info("Shutting down")
	return nil
}


func setupServer(staticConfiguration *static.Configuration) (*server.Server, error) {
	// 查找静态配置文件,生成Provider
	providerAggregator := aggregator.NewProviderAggregator(*staticConfiguration.Providers)

	ctx := context.Background()
	routinesPool := safe.NewPool(ctx)

	// adds internal provider
	// 增加一个内部provider
	err := providerAggregator.AddProvider(traefik.New(*staticConfiguration))
	if err != nil {
		return nil, err
	}

	// ACME
	//ACME是自动获得Let's Encrypt https证书的客户端
	//acmeProviders是一个自动获得Let's Encrypt https证书的提供者
	tlsManager := traefiktls.NewManager()
	httpChallengeProvider := acme.NewChallengeHTTP()

	// we need to wait at least 2 times the ProvidersThrottleDuration to be sure to handle the challenge.
	tlsChallengeProvider := acme.NewChallengeTLSALPN(time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration) * 2)
	err = providerAggregator.AddProvider(tlsChallengeProvider)
	if err != nil {
		return nil, err
	}

	acmeProviders := initACMEProvider(staticConfiguration, &providerAggregator, tlsManager, httpChallengeProvider, tlsChallengeProvider)

	// Entrypoints
	// 生成TCP及UDP的EntryPoints
	serverEntryPointsTCP, err := server.NewTCPEntryPoints(staticConfiguration.EntryPoints)
	if err != nil {
		return nil, err
	}

	serverEntryPointsUDP, err := server.NewUDPEntryPoints(staticConfiguration.EntryPoints)
	if err != nil {
		return nil, err
	}

	// Pilot
	// Pilot是traefik官方的一个统一的监控系统
	var aviator *pilot.Pilot
	var pilotRegistry *metrics.PilotRegistry
	if isPilotEnabled(staticConfiguration) {
		pilotRegistry = metrics.RegisterPilot()

		aviator = pilot.New(staticConfiguration.Pilot.Token, pilotRegistry, routinesPool)

		routinesPool.GoCtx(func(ctx context.Context) {
			aviator.Tick(ctx)
		})
	}

	if staticConfiguration.Pilot != nil {
		version.PilotEnabled = staticConfiguration.Pilot.Dashboard
	}

	// Plugins
	// 生成第三方的Plugins Builder
	pluginBuilder, err := createPluginBuilder(staticConfiguration)
	if err != nil {
		return nil, err
	}

	// Providers plugins
	// 生成第三方的Providers plugins Builder
	for name, conf := range staticConfiguration.Providers.Plugin {
		p, err := pluginBuilder.BuildProvider(name, conf)
		if err != nil {
			return nil, fmt.Errorf("plugin: failed to build provider: %w", err)
		}

		err = providerAggregator.AddProvider(p)
		if err != nil {
			return nil, fmt.Errorf("plugin: failed to add provider: %w", err)
		}
	}

	// Metrics

	metricRegistries := registerMetricClients(staticConfiguration.Metrics)
	if pilotRegistry != nil {
		metricRegistries = append(metricRegistries, pilotRegistry)
	}
	metricsRegistry := metrics.NewMultiRegistry(metricRegistries)

	// Service manager factory

	roundTripperManager := service.NewRoundTripperManager()
	acmeHTTPHandler := getHTTPChallengeHandler(acmeProviders, httpChallengeProvider)
	managerFactory := service.NewManagerFactory(*staticConfiguration, routinesPool, metricsRegistry, roundTripperManager, acmeHTTPHandler)

	// Router factory

	accessLog := setupAccessLog(staticConfiguration.AccessLog)
	chainBuilder := middleware.NewChainBuilder(*staticConfiguration, metricsRegistry, accessLog)
	routerFactory := server.NewRouterFactory(*staticConfiguration, managerFactory, tlsManager, chainBuilder, pluginBuilder, metricsRegistry)

	return server.NewServer(routinesPool, serverEntryPointsTCP, serverEntryPointsUDP, watcher, chainBuilder, accessLog), nil
}

上面的main.setupServer()只摘录了部分代码。其主要做了下面这些事:

  1. 查找静态配置文件,生成Provider
  2. 生成TCP及UDP的EntryPoints
  3. 生成第三方的Plugins Builder
  4. 生成第三方的Providers plugins Builder
  5. 以及生成其它traefik相关的对象
  6. 最后通过上面的对象组合,返回的是Server对象(参考server.NewServer)

server.Start()启动服务代码很简单,就是启动各个对象中的Start()函数。可参考以下代码:

源码文件:pkg/server/server.go

package server

// Server结构体:
// Server is the reverse-proxy/load-balancer engine.
type Server struct {
	watcher        *ConfigurationWatcher
	tcpEntryPoints TCPEntryPoints
	udpEntryPoints UDPEntryPoints
	chainBuilder   *middleware.ChainBuilder
	accessLoggerMiddleware *accesslog.Handler
	signals  chan os.Signal
	stopChan chan bool
	routinesPool *safe.Pool
}

// NewServer returns an initialized Server.
func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, entryPointsUDP UDPEntryPoints, watcher *ConfigurationWatcher,
	chainBuilder *middleware.ChainBuilder, accessLoggerMiddleware *accesslog.Handler) *Server {
	srv := &Server{
		watcher:                watcher,
		tcpEntryPoints:         entryPoints,
		chainBuilder:           chainBuilder,
		accessLoggerMiddleware: accessLoggerMiddleware,
		signals:                make(chan os.Signal, 1),
		stopChan:               make(chan bool, 1),
		routinesPool:           routinesPool,
		udpEntryPoints:         entryPointsUDP,
	}
	srv.configureSignals()

	return srv
}

// Start starts the server and Stop/Close it when context is Done.
func (s *Server) Start(ctx context.Context) {
	go func() {
		<-ctx.Done()
		logger := log.FromContext(ctx)
		logger.Info("I have to go...")
		logger.Info("Stopping server gracefully")
		s.Stop()
	}()

	// 调用tcp及udp的EntryPoints的Start()启动服务
	s.tcpEntryPoints.Start()
	s.udpEntryPoints.Start()
	// 启动watcher服务
	s.watcher.Start()
	// 监听系统信号,用于配置重载等
	s.routinesPool.GoCtx(s.listenSignals)
}