Flyte propeller作为集群当中真正的执行器,负责CRD的监听、集群资源的分配、三方子系统的交互和向admin server反馈等等
1.总体架构
propeller命令行有3个子命令,总体架构图如下
- init-certs:用于生成webhook访问api-server的证书
- webhook:用于向监听pod中注入注入环境变量,为运行flyte SDK和访问api-server提供secret
- controller-server: 用于接收admin发来的CRD,创建controller从插件仓库中生成指定模板,然后创建对应pod或者CRD,发送给k8s或子系统。

我们还是从先从controller开始讲起
2. 创建controller
controller启动的入口函数是executeRootCmd
func executeRootCmd(baseCtx context.Context, cfg *config2.Config) error {
// 处理退出信号
ctx := signals.SetupSignalHandler(baseCtx)
// 设置prometheus labels
keys := contextutils.MetricKeysFromStrings(cfg.MetricKeys)
logger.Infof(context.TODO(), "setting metrics keys to %+v", keys)
if len(keys) > 0 {
labeled.SetMetricKeys(keys...)
}
// 注册服务追踪
for _, serviceName := range []string{otelutils.AdminClientTracer, otelutils.BlobstoreClientTracer,
otelutils.DataCatalogClientTracer, otelutils.FlytePropellerTracer, otelutils.K8sClientTracer} {
if err := otelutils.RegisterTracerProviderWithContext(ctx, serviceName, otelutils.GetConfig()); err != nil {
logger.Errorf(ctx, "Failed to create otel tracer provider. %v", err)
return err
}
}
// Add the propeller subscope because the MetricsPrefix only has "flyte:" to get uniform collection of metrics.
propellerScope := promutils.NewScope(cfg.MetricsPrefix).NewSubScope("propeller").NewSubScope(cfg.LimitNamespace)
limitNamespace := ""
var namespaceConfigs map[string]cache.Config
if cfg.LimitNamespace != defaultNamespace {
limitNamespace = cfg.LimitNamespace
namespaceConfigs = map[string]cache.Config{
limitNamespace: {},
}
}
options := manager.Options{
Cache: cache.Options{
SyncPeriod: &cfg.DownstreamEval.Duration,
DefaultNamespaces: namespaceConfigs,
},
NewCache: executors.NewCache,
NewClient: executors.BuildNewClientFunc(propellerScope),
Metrics: metricsserver.Options{
// Disable metrics serving
BindAddress: "0",
},
}
// 向api server注册controller,k8s 会创建informer、client、cache、leaderelection对应的资源
/**
┌────────────────────────────┐
│ Controller Manager │
│ (control center) │
│ │
│ ┌──────────────────────┐ │
│ │ Controllers │ │
│ │ (Reconciler) │ │
│ └─────────┬────────────┘ │
│ │ │
│ uses Client │
│ │ │
│ uses Cache |
| (Informer) │
│ │ │
│ ▼ │
│ API Server │
│ │
│ Webhook Server(admission) │
└────────────────────────────┘
**/
// API Request -> Authentication & Authorization -> Admission(Webhook) ->
// Persist to etcd -> Reconciliation (controller) -> Status Update
mgr, err := controller.CreateControllerManager(ctx, cfg, options)
if err != nil {
logger.Fatalf(ctx, "Failed to create controller manager. Error: %v", err)
return err
}
handlers := map[string]http.Handler{
"/k8smetrics": promhttp.HandlerFor(metrics.Registry,
promhttp.HandlerOpts{
ErrorHandling: promhttp.HTTPErrorOnError,
},
),
}
g, childCtx := errgroup.WithContext(ctx)
g.Go(func() error {
err := profutils.StartProfilingServerWithDefaultHandlers(childCtx, cfg.ProfilerPort.Port, handlers)
if err != nil {
logger.Fatalf(childCtx, "Failed to Start profiling and metrics server. Error: %v", err)
}
return err
})
// 1. 设置性能采样label
// 2. 启动controller manager
g.Go(func() error {
err := controller.StartControllerManager(childCtx, mgr)
if err != nil {
logger.Fatalf(childCtx, "Failed to start controller manager. Error: %v", err)
}
return err
})
g.Go(func() error {
// controller 核心函数
err := controller.StartController(childCtx, cfg, defaultNamespace, mgr, &propellerScope)
if err != nil {
logger.Fatalf(childCtx, "Failed to start controller. Error: %v", err)
}
return err
})
return g.Wait()
}
最终会启动异步协程运行controller.StartController,我们再看一下这个代码
func StartController(ctx context.Context, cfg *config.Config, defaultNamespace string, mgr manager.Manager, scope *promutils.Scope) error {
// Setup cancel on the context
ctx, cancel := context.WithCancel(ctx)
defer cancel()
kubeClient, kubecfg, err := utils.GetKubeConfig(ctx, cfg)
if err != nil {
return errors.Wrapf(err, "error building Kubernetes Clientset")
}
// k8sResolver 主要解析k8s 中的 service
// resolver监听目标service的ip、port变化后, 更新
resolver.Register(k8sResolver.NewBuilder(ctx, kubeClient, k8sResolver.Schema))
flyteworkflowClient, err := clientset.NewForConfig(kubecfg)
if err != nil {
return errors.Wrapf(err, "error building FlyteWorkflow clientset")
}
// 如何CRD没有注册,创建FlyteWorkflow CRD
if cfg.CreateFlyteWorkflowCRD {
logger.Infof(ctx, "creating FlyteWorkflow CRD")
apiextensionsClient, err := apiextensionsclientset.NewForConfig(kubecfg)
if err != nil {
return errors.Wrapf(err, "error building apiextensions clientset")
}
_, err = apiextensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, &flyteworkflow.CRD, v1.CreateOptions{})
if err != nil {
if apierrors.IsAlreadyExists(err) {
logger.Warnf(ctx, "FlyteWorkflow CRD already exists")
} else {
return errors.Wrapf(err, "failed to create FlyteWorkflow CRD")
}
}
}
opts := SharedInformerOptions(cfg, defaultNamespace)
flyteworkflowInformerFactory := informers.NewSharedInformerFactoryWithOptions(flyteworkflowClient, cfg.WorkflowReEval.Duration, opts...)
informerFactory := k8sInformers.NewSharedInformerFactoryWithOptions(kubeClient, flyteK8sConfig.GetK8sPluginConfig().DefaultPodTemplateResync.Duration)
// 1. 创建admin grpc客户端
// 2. 创建metadata s3 store
// 3. 创建其他交互系统客户端封装: admin client
// 4. 创建GC,定制回收CRD目标的namespace资源
// 5. 创建CRD information
// 6. 创建 catalogClient
// 7. 创建请求队列
// 8. 创建业务监控和LP status
// 9. 创建node、wf的执行器
// 10. 关联inform的回调函数
c, err := New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, informerFactory, mgr, *scope)
if err != nil {
return errors.Wrap(err, "failed to start FlytePropeller")
} else if c == nil {
return errors.Errorf("Failed to create a new instance of FlytePropeller")
}
// 观测CRD变更
go flyteworkflowInformerFactory.Start(ctx.Done())
// 观测k8s变化
go informerFactory.Start(ctx.Done())
// 1.运行Controller、启动线程池、监控、GC检查等
if err = c.Run(ctx); err != nil {
return errors.Wrapf(err, "Error running FlytePropeller.")
}
return nil
}
可以看到New函数是真正创建服务组件的核心函数,
func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Interface, flytepropellerClientset clientset.Interface,
flyteworkflowInformerFactory informers.SharedInformerFactory, informerFactory k8sInformers.SharedInformerFactory,
kubeClient executors.Client, scope promutils.Scope) (*Controller, error) {
// 创建admin server client
adminClient, signalClient, authOpts, err := getAdminClient(ctx)
if err != nil {
logger.Errorf(ctx, "failed to initialize Admin client, err :%s", err.Error())
return nil, err
}
sCfg := storage.GetConfig()
if sCfg == nil {
logger.Errorf(ctx, "Storage configuration missing.")
}
store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore"))
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Metadata storage")
}
logger.Info(ctx, "Setting up event sink and recorder")
eventSink, err := events.ConstructEventSink(ctx, events.GetConfig(ctx), scope.NewSubScope("event_sink"))
if err != nil {
return nil, errors.Wrapf(err, "Failed to create EventSink [%v], error %v", events.GetConfig(ctx).Type, err)
}
gc, err := NewGarbageCollector(cfg, scope, clock.RealClock{}, kubeClientset.CoreV1().Namespaces(), flytepropellerClientset.FlyteworkflowV1alpha1())
if err != nil {
logger.Errorf(ctx, "failed to initialize GC for workflows")
return nil, errors.Wrapf(err, "failed to initialize WF GC")
}
eventRecorder, err := utils.NewK8sEventRecorder(ctx, kubeClientset, controllerAgentName, cfg.PublishK8sEvents)
if err != nil {
logger.Errorf(ctx, "failed to event recorder %v", err)
return nil, errors.Wrapf(err, "failed to initialize resource lock.")
}
controller := &Controller{
metrics: newControllerMetrics(scope),
recorder: eventRecorder,
gc: gc,
numWorkers: cfg.Workers,
}
// 尝试获取master lock
lock, err := leader.NewResourceLock(kubeClientset.CoreV1(), kubeClientset.CoordinationV1(), eventRecorder, cfg.LeaderElection)
if err != nil {
logger.Errorf(ctx, "failed to initialize resource lock.")
return nil, errors.Wrapf(err, "failed to initialize resource lock.")
}
if lock != nil {
logger.Infof(ctx, "Creating leader elector for the controller.")
controller.leaderElector, err = leader.NewLeaderElector(lock, cfg.LeaderElection, controller.onStartedLeading, func() {
logger.Fatal(ctx, "Lost leader state. Shutting down.")
})
if err != nil {
logger.Errorf(ctx, "failed to initialize leader elector.")
return nil, errors.Wrapf(err, "failed to initialize leader elector.")
}
}
// 创建 client/informers/externalversions/flyteworkflow/v1alpha1 监听
flyteworkflowInformer := flyteworkflowInformerFactory.Flyteworkflow().V1alpha1().FlyteWorkflows()
controller.flyteworkflowSynced = flyteworkflowInformer.Informer().HasSynced
podTemplateInformer := informerFactory.Core().V1().PodTemplates()
// set default namespace(flyte) for pod template store
podNamespace, found := os.LookupEnv(podNamespaceEnvVar)
if !found {
podNamespace = podDefaultNamespace
}
// pod创建的默认命名空间: flyte
flytek8s.DefaultPodTemplateStore.SetDefaultNamespace(podNamespace)
logger.Info(ctx, "Setting up Catalog client.")
catalogClient, err := catalog.NewCatalogClient(ctx, authOpts...)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create datacatalog client")
}
workQ, err := NewCompositeWorkQueue(ctx, cfg.Queue, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create WorkQueue [%v]", scope.CurrentScope())
}
controller.workQueue = workQ
// 内存中设置WF保存中间件,可以按workflowStore policy,指定缓存的颗粒度
controller.workflowStore, err = workflowstore.NewWorkflowStore(ctx, workflowstore.GetConfig(), flyteworkflowInformer.Lister(), flytepropellerClientset.FlyteworkflowV1alpha1(), scope)
if err != nil {
return nil, stdErrs.Wrapf(errors3.CausedByError, err, "failed to initialize workflow store")
}
// 用于记录业务处理耗费的时间
controller.levelMonitor = NewResourceLevelMonitor(scope.NewSubScope("collector"), flyteworkflowInformer.Lister())
// 为launchplan的同步模块,向admin同步LP状态,可以自动进行数据同步
var launchPlanActor launchplan.FlyteAdmin
if cfg.EnableAdminLauncher {
launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, launchplan.GetAdminConfig(),
scope.NewSubScope("admin_launcher"), store, controller.enqueueWorkflowForNodeUpdates)
if err != nil {
logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error())
return nil, err
}
if err := launchPlanActor.Initialize(ctx); err != nil {
logger.Errorf(ctx, "failed to initialize Admin workflow Launcher, err: %v", err.Error())
return nil, err
}
} else {
launchPlanActor = launchplan.NewFailFastLaunchPlanExecutor()
}
recoveryClient := recovery.NewClient(adminClient)
// 对DAG图中的task、branch、workflow、gate、array、start、end类型,注册handle方法
nodeHandlerFactory, err := factory.NewHandlerFactory(ctx, launchPlanActor, launchPlanActor,
kubeClient, kubeClientset, catalogClient, recoveryClient, &cfg.EventConfig, cfg.LiteralOffloadingConfig, cfg.ClusterID, signalClient, scope)
if err != nil {
return nil, errors.Wrapf(err, "failed to create node handler factory")
}
// 创建node executor,用户DAG的执行
nodeExecutor, err := nodes.NewExecutor(ctx, cfg.NodeConfig, store, controller.enqueueWorkflowForNodeUpdates, eventSink,
launchPlanActor, launchPlanActor, storage.DataReference(cfg.DefaultRawOutputPrefix), kubeClient,
catalogClient, recoveryClient, cfg.LiteralOffloadingConfig, &cfg.EventConfig, cfg.ClusterID, signalClient, nodeHandlerFactory, scope)
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Controller.")
}
activeExecutions, err := workflowstore.NewExecutionStatsHolder()
if err != nil {
return nil, err
}
// 对执行的时间进行采集监控
controller.executionStats = workflowstore.NewExecutionStatsMonitor(scope.NewSubScope("execstats"), flyteworkflowInformer.Lister(), activeExecutions)
// 创建上层WF执行器
workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope, activeExecutions)
if err != nil {
return nil, err
}
// 执行入口
handler := NewPropellerHandler(ctx, cfg, store, controller.workflowStore, workflowExecutor, scope)
// 将执行器管理至worker池
controller.workerPool = NewWorkerPool(ctx, scope, workQ, handler)
if cfg.EnableGrpcLatencyMetrics {
grpc_prometheus.EnableClientHandlingTimeHistogram()
}
logger.Info(ctx, "Setting up event handlers")
// Set up an event handler for when FlyteWorkflow resources change
_, err = flyteworkflowInformer.Informer().AddEventHandler(controller.getWorkflowUpdatesHandler())
if err != nil {
return nil, fmt.Errorf("failed to register event handler for FlyteWorkflow resource changes: %w", err)
}
// 内存中保存pod模板
updateHandler := flytek8s.GetPodTemplateUpdatesHandler(&flytek8s.DefaultPodTemplateStore)
_, err = podTemplateInformer.Informer().AddEventHandler(updateHandler)
if err != nil {
return nil, fmt.Errorf("failed to register event handler for PodTemplate resource changes: %w", err)
}
return controller, nil
}
New创建后,后面的几行代码是正式启动服务器,其中Run函数,会启动controller、以及处理线程池、WF GC检查,和监控等等组件。
func (c *Controller) Run(ctx context.Context) error {
if c.leaderElector == nil {
logger.Infof(ctx, "Running without leader election.")
// 这里如果是多propeller实例的话,需要选举leader,如果为普通propeller,直接启动
return c.run(ctx)
}
// 否则设置自己为leader
logger.Infof(ctx, "Attempting to acquire leader lease and act as leader.")
go c.leaderElector.Run(ctx)
<-ctx.Done()
return nil
}
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer le.config.Callbacks.OnStoppedLeading()
// 此处获取锁,并向Prometheus上报host name
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 获取锁回调,进行业务信息更新, 调用run函数
go le.config.Callbacks.OnStartedLeading(ctx)
// 尝试向k8s获取锁,失败后上报Prometheus leader退出
le.renew(ctx)
}
func (c *Controller) run(ctx context.Context) error {
// Initializing WorkerPool
logger.Info(ctx, "Initializing controller")
if err := c.workerPool.Initialize(ctx); err != nil {
return err
}
// Start the WF GC
if err := c.gc.StartGC(ctx); err != nil {
logger.Errorf(ctx, "failed to start background GC")
return err
}
// Start the collector process
c.levelMonitor.RunCollector(ctx)
c.executionStats.RunStatsMonitor(ctx)
// Start the informer factories to begin populating the informer caches
logger.Info(ctx, "Starting FlyteWorkflow controller")
return c.workerPool.Run(ctx, c.numWorkers, c.flyteworkflowSynced)
}
至此controller开始正式监听k8s事件和WF CRD,CRD会通过getWorkflowUpdatesHandler返回的informer回调触发controller的action

3. CRD执行逻辑
当getWorkflowUpdatesHandler收到CRD信息后,会放入队列(CompositeWorkQueue)中,当然如果已经进行中的工作流被更新后,也会重新加入此队列中进行再次分发处理。
最终任务被线程池中的worker取出,调用Handle处理函数,handle函数会根据每个WF设置目前的执行状态,状态流转如下
func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
//
// +--------+ +---------+ +------------+ +---------+
// | | | | | | | |
// | Ready +--------> Running +--------> Succeeding +-----> Success |
// | | | | | | | |
// +--------+ +---------+ +------------+ +---------+
// | |
// | |
// | +----v----+ +---------------------+ +--------+
// | | | | (optional) | | |
// +-------------> Failing +--------> HandlingFailureNode +--------> Failed |
// | | | | | |
// +---------+ +---------------------+ +--------+
//
// 从内存中间件中取出WF
w, fetchErr := p.wfStore.Get(ctx, namespace, nam
// ...
// 终态
if w.GetExecutionStatus().IsTerminated() {
// Checking for the old finalizer for backwards compatibility
// This should be eventually removed
if HasCompletedLabel(w) && !controllerutil.ContainsFinalizer(w, Finalizer) && !controllerutil.ContainsFinalizer(w, OldFinalizer) {
logger.Debugf(ctx, "Workflow is terminated.")
// This workflow had previously completed, let us ignore it
return nil
}
}
// 如果存在引用WF(由于WF太大,存储量对象存储,我们使用ref再次导入WF)
var wfClosureCrdFields *k8s.WfClosureCrdFields
var err error
// 存储引用WF
if len(w.WorkflowClosureReference) > 0 {
wfClosureCrdFields, err = p.parseWorkflowClosureCrdFields(ctx, w.WorkflowClosureReference)
if err != nil {
return err
}
}
// 对于每个任务进行重试
for streak = 0; streak < maxLength; streak++ {
w, err = p.streak(ctx, w, wfClosureCrdFields)
if err != nil {
return err
} else if w == nil {
break
}
logger.Infof(ctx, "FastFollow Enabled. Detected State change, we will try another round. StreakLength [%d]", streak)
}
}
最终streak会调用TryMutateWorkflow对目前WF进行更新
func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {
t := p.metrics.DeepCopyTime.Start()
mutableW := originalW.DeepCopy()
t.Stop()
ctx = contextutils.WithWorkflowID(ctx, mutableW.GetID())
if execID := mutableW.GetExecutionID(); execID.WorkflowExecutionIdentifier != nil {
ctx = contextutils.WithProjectDomain(ctx, mutableW.GetExecutionID().Project, mutableW.GetExecutionID().Domain)
}
ctx = contextutils.WithResourceVersion(ctx, mutableW.GetResourceVersion())
maxRetries := uint32(p.cfg.MaxWorkflowRetries) // #nosec G115
// 用户标记为删除
if !mutableW.GetDeletionTimestamp().IsZero() || mutableW.Status.FailedAttempts > maxRetries {
var err error
func() {
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when aborting workflow, Stack: [%s]", string(stack))
logger.Errorf(ctx, err.Error())
p.metrics.PanicObserved.Inc(ctx)
}
}()
// 执行任务终止流程,会通知k8s结束此任务,并同步admin
err = p.workflowExecutor.HandleAbortedWorkflow(ctx, mutableW, maxRetries)
}()
if err != nil {
p.metrics.AbortError.Inc(ctx)
return nil, err
}
return mutableW, nil
}
// 查看现状状态,并执行
if !mutableW.GetExecutionStatus().IsTerminated() {
var err error
_ = controllerutil.AddFinalizer(mutableW, Finalizer)
SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion)
func() {
t := p.metrics.RawWorkflowTraversalTime.Start(ctx)
defer func() {
t.Stop()
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when reconciling workflow, Stack: [%s]", string(stack))
logger.Errorf(ctx, err.Error())
p.metrics.PanicObserved.Inc(ctx)
}
}()
// 此函数进行正常状态处理和流转
err = p.workflowExecutor.HandleFlyteWorkflow(ctx, mutableW)
}()
if err != nil {
logger.Errorf(ctx, "Error when trying to reconcile workflow. Error [%v]. Error Type[%v]",
err, reflect.TypeOf(err))
p.metrics.SystemError.Inc(ctx)
return nil, err
}
} else {
logger.Warn(ctx, "Workflow is marked as terminated but doesn't have the completed label, marking it as completed.")
}
return mutableW, nil
}
HandleFlyteWorkflow函数会根据当前运行的状态分发到不同的执行handle中,此时node执行器会根据任务的不同类型,去使用不同插件进行实际的状态转换变更。 我们以其中handleReadyWorkflow为例
func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1.FlyteWorkflow) (Status, error) {
startNode := w.StartNode()
if startNode == nil {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: errors.BadSpecificationError.String(),
Message: "StartNode not found."}), nil
}
ref, err := c.constructWorkflowMetadataPrefix(ctx, w)
if err != nil {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "MetadataPrefixCreationFailure",
Message: err.Error()}), nil
}
w.GetExecutionStatus().SetDataDir(ref)
inputs := &core.LiteralMap{}
if w.Inputs != nil {
if len(w.OffloadedInputs) > 0 {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: errors.BadSpecificationError.String(),
Message: "cannot specify inline inputs AND offloaded inputs"}), nil
}
inputs = w.Inputs.LiteralMap
} else if len(w.OffloadedInputs) > 0 {
err = c.store.ReadProtobuf(ctx, w.OffloadedInputs, inputs)
if err != nil {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "OffloadedInputsReadFailure",
Message: err.Error()}), nil
}
}
// Before starting the subworkflow, lets set the inputs for the Workflow. The inputs for a SubWorkflow are essentially
// Copy of the inputs to the Node
nodeStatus := w.GetNodeExecutionStatus(ctx, startNode.GetID())
dataDir, err := c.store.ConstructReference(ctx, ref, startNode.GetID(), "data")
if err != nil {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "MetadataPrefixCreationFailure",
Message: err.Error()}), nil
}
outputDir, err := c.store.ConstructReference(ctx, dataDir, "0")
if err != nil {
return StatusFailing(&core.ExecutionError{
Kind: core.ExecutionError_SYSTEM,
Code: "MetadataPrefixCreationFailure",
Message: err.Error()}), nil
}
logger.Infof(ctx, "Setting the MetadataDir for StartNode [%v]", dataDir)
nodeStatus.SetDataDir(dataDir)
nodeStatus.SetOutputDir(outputDir)
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())
// 定位到nodeExecutor,然后执行此node task
s, err := c.nodeExecutor.SetInputsForStartNode(ctx, execcontext, w, executors.NewNodeLookup(w, w.GetExecutionStatus(), w), inputs)
if err != nil {
return StatusReady, err
}
if s.HasFailed() {
return StatusFailing(s.Err), nil
}
// 并返回目前的转移状态
return StatusRunning, nil
}
向SetInputsForStartNode发送input后,nodeExecutor会调用通过PluginsRegistry获取对应task的插件组件,然后执行对应的CRD启动流程。以上就是完成了图所示的整个步骤。如下图所示

4. 插件注册与使用
最后,我们再介绍一下propeller的插件系统,flyte一大亮点就是插件系统,覆盖从客户端、admin到propeller。propeller的插件是在函数入口时进行引用。
package main
import (
"github.com/flyteorg/flyte/flytepropeller/cmd/controller/cmd"
// 插件注册
_ "github.com/flyteorg/flyte/flytepropeller/plugins"
)
func main() {
cmd.Execute()
}
// flytepropeller/plugins/loader.go 文件
package plugins
import (
// Common place to import all plugins, so that it can be imported by Singlebinary (flytelite) or by propeller main
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/array/awsbatch"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/array/k8s"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/dask"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/kfoperators/mpi"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/kfoperators/tensorflow"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/pod"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/ray"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/spark"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/testing"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/webapi/athena"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/webapi/bigquery"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/webapi/databricks"
_ "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/webapi/snowflake"
)
// 以"github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/kfoperators/pytorch"为例
// 会在模块初始化的时候注册插件
func init() {
if err := kubeflowv1.AddToScheme(scheme.Scheme); err != nil {
panic(err)
}
pluginmachinery.PluginRegistry().RegisterK8sPlugin(
k8s.PluginEntry{
ID: common.PytorchTaskType,
RegisteredTaskTypes: []pluginsCore.TaskType{common.PytorchTaskType},
ResourceToWatch: &kubeflowv1.PyTorchJob{},
Plugin: pytorchOperatorResourceHandler{},
IsDefault: false,
})
}
插件系统需要实现以下接口, 从而可以让nodeexecute可以标注调度
type Plugin interface {
// Defines a func to create a query object (typically just object and type meta portions) that's used to query k8s
// resources.
BuildIdentityResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionMetadata) (client.Object, error)
// Defines a func to create the full resource object that will be posted to k8s.
BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext) (client.Object, error)
// Analyses the k8s resource and reports the status as TaskPhase. This call is expected to be relatively fast,
// any operations that might take a long time (limits are configured system-wide) should be offloaded to the
// background.
GetTaskPhase(ctx context.Context, pluginContext PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error)
// Properties desired by the plugin
GetProperties() PluginProperties
}
5 总结
以上步骤,展示了propeller从服务器启动、CRD监听、状态转移和执行等步骤。至此,整个flyte主流程都已经介绍完毕,后续还会介绍一些旁路组件,比如flytecopilot、datacatalog、flytectl等。