Mastra中的工作流程可以帮助编排复杂的操作序列,具有如分支、并行执行、资源暂停等功能。

大多数的AI应用都需要不止一次的调用语言模型。如果想要运行多个步骤,有条件地跳过某些步骤,甚至完全暂停执行步骤,可以使用Workflow。

Mastra的Workflow提供以下功能:

  1. 定义步骤,并将这些步骤聚合链式执行的标准。
  2. 支持简单(线性)和高级(分支、并行)的执行方式。
  3. 调试和跟踪每个工作流的运行。

一个简单的Workflow

以下是一个简单的将执行步骤添加到workflow中并执行的例子

import {Step, Workflow} from "@mastra/core/workflows";
import {z} from "zod";

//创建一个workflow
const myWorkflow = new Workflow({
name: "my-workflow",
//这个workflow接收一个number类型的input变量
triggerSchema: z.object({
input: z.number()
})
})


//定义步骤1
const stepOne = new Step({
id: "stepOne",
//输出一个包含number类型的doubledValue字段的对象
outputSchema: z.object({
doubledValue: z.number()
}),
execute: async ({context}) => {
//从上下文对象的triggerData中获取input参数 将其*2 并返回到对象中
const doubledValue = context?.triggerData?.input * 2
return { doubledValue }
}
})

//将步骤添加到workflow中,并提交该workflow
myWorkflow.step(stepOne).commit()

//运行workflow
const { runId, start } = myWorkflow.createRun()

const res = await start({
triggerData: {
input: 90
}
})

//查看结果
console.log('res', JSON.stringify(res))

线性执行

如下,当有多个步骤的时候,将这些步骤以线性的方式排列起来,一步一步执行

import {Step, Workflow} from "@mastra/core/workflows";
import {z} from "zod";


//创建步骤1,接受triggerData并将其结果 * 2 返回
const stepOne = new Step({
id: 'stepOne',
execute: async ({context}) => ({
doubledValue: context.triggerData.inputValue * 2
})
})


//创建步骤2
const stepTwo = new Step({
id: 'stepTwo',
execute: async ({context}) => {
//判断上一个步骤是否执行成功,如果执行失败,返回一个预设值
if (context.steps.stepOne.status !== 'success') {
return { incrementedValue: 0}
}

//执行成功的话,将结果 + 1 返回
return {
incrementedValue: context.getStepResult<{doubledValue: number}>('stepOne').doubledValue + 1
}
}
})


//创建步骤3
const stepThree = new Step({
id: 'stepThree',
execute: async ({context}) => {
//判断上一个步骤是否执行成功,如果执行失败,返回一个预设值
if (context.steps.stepTwo.status !== 'success') {
return { tripledValue: 0}
}

//执行成功的话,将结果 * 3 返回
return { tripledValue: context.getStepResult<{incrementedValue: number}>('stepTwo').incrementedValue * 3}
}
})


//创建一个workflow 并设置传入参数的类型
const seq_workflow = new Workflow({
name: 'seqWorkflow',
triggerSchema: z.object({
inputValue: z.number()
})
})


//将步骤线性排列,stepOne -> stepTwo -> stepThree
seq_workflow.step(stepOne).then(stepTwo).then(stepThree)

//提交工作流
seq_workflow.commit()

//执行工作流
const { start } = seq_workflow.createRun()
const res = await start({
triggerData: {
inputValue: 90
}
})

console.log('res', JSON.stringify(res))

并行执行

在workflow中可以设置多个步骤同时执行,同时有好几个执行链路。

import {Step, Workflow} from "@mastra/core/workflows";
import {z} from "zod";


//步骤1
const stepOne = new Step({
id: "stepOne",
execute: async ({context}) => {
return {
doubledValue: context.triggerData.inputValue * 2
}
}
})


//步骤2 数据来自步骤1
const stepTwo = new Step({
id: "stepTwo",
execute: async ({context}) => {
if (context.steps.stepOne.status !== 'success') {
return {
incrementedValue: 0
}
}

return {
incrementedValue: context.getStepResult<{doubledValue: number}>('stepOne').doubledValue + 1
}
}
})


//步骤3 数据来自workflow的triggerData
const stepThree = new Step({
id: "stepThree",
execute: async ({context}) => {
return {
tripledValue: context.triggerData.inputValue * 3
}
}
})

//步骤4 数据来子步骤3
const stepFour = new Step({
id: "stepFour",
execute: async ({context}) => {
if (context.steps.stepThree.status !== 'success') {
return {
isEven: false
}
}

return {
isEven: context.getStepResult<{tripledValue: number}>('stepThree').tripledValue % 2
}
}
})


//创建工作流以及对应输入参数的类型
const parallelWorkflow = new Workflow({
name: 'parallelWorkflow',
triggerSchema: z.object({
inputValue: z.number()
})
})

parallelWorkflow
//设置步骤1
.step(stepOne)
//设置步骤2,跟着步骤1
.then(stepTwo)
//另外开一个链路,设置步骤3
.step(stepThree)
//设置步骤4,跟着步骤3
.then(stepFour)
//提交workflow
.commit()

//运行workflow
const { start } = parallelWorkflow.createRun()
const result = await start({ triggerData: {inputValue: 3}})
console.log('result', JSON.stringify(result))