Mastra中的工作流程可以帮助编排复杂的操作序列,具有如分支、并行执行、资源暂停等功能。
大多数的AI应用都需要不止一次的调用语言模型。如果想要运行多个步骤,有条件地跳过某些步骤,甚至完全暂停执行步骤,可以使用Workflow。
Mastra的Workflow提供以下功能:
- 定义步骤,并将这些步骤聚合链式执行的标准。
- 支持简单(线性)和高级(分支、并行)的执行方式。
- 调试和跟踪每个工作流的运行。
一个简单的Workflow
以下是一个简单的将执行步骤添加到workflow中并执行的例子
import {Step, Workflow} from "@mastra/core/workflows"; import {z} from "zod";
const myWorkflow = new Workflow({ name: "my-workflow", triggerSchema: z.object({ input: z.number() }) })
const stepOne = new Step({ id: "stepOne", outputSchema: z.object({ doubledValue: z.number() }), execute: async ({context}) => { const doubledValue = context?.triggerData?.input * 2 return { doubledValue } } })
myWorkflow.step(stepOne).commit()
const { runId, start } = myWorkflow.createRun()
const res = await start({ triggerData: { input: 90 } })
console.log('res', JSON.stringify(res))
|

线性执行
如下,当有多个步骤的时候,将这些步骤以线性的方式排列起来,一步一步执行

import {Step, Workflow} from "@mastra/core/workflows"; import {z} from "zod";
const stepOne = new Step({ id: 'stepOne', execute: async ({context}) => ({ doubledValue: context.triggerData.inputValue * 2 }) })
const stepTwo = new Step({ id: 'stepTwo', execute: async ({context}) => { if (context.steps.stepOne.status !== 'success') { return { incrementedValue: 0} }
return { incrementedValue: context.getStepResult<{doubledValue: number}>('stepOne').doubledValue + 1 } } })
const stepThree = new Step({ id: 'stepThree', execute: async ({context}) => { if (context.steps.stepTwo.status !== 'success') { return { tripledValue: 0} }
return { tripledValue: context.getStepResult<{incrementedValue: number}>('stepTwo').incrementedValue * 3} } })
const seq_workflow = new Workflow({ name: 'seqWorkflow', triggerSchema: z.object({ inputValue: z.number() }) })
seq_workflow.step(stepOne).then(stepTwo).then(stepThree)
seq_workflow.commit()
const { start } = seq_workflow.createRun() const res = await start({ triggerData: { inputValue: 90 } })
console.log('res', JSON.stringify(res))
|

并行执行
在workflow中可以设置多个步骤同时执行,同时有好几个执行链路。

import {Step, Workflow} from "@mastra/core/workflows"; import {z} from "zod";
const stepOne = new Step({ id: "stepOne", execute: async ({context}) => { return { doubledValue: context.triggerData.inputValue * 2 } } })
const stepTwo = new Step({ id: "stepTwo", execute: async ({context}) => { if (context.steps.stepOne.status !== 'success') { return { incrementedValue: 0 } }
return { incrementedValue: context.getStepResult<{doubledValue: number}>('stepOne').doubledValue + 1 } } })
const stepThree = new Step({ id: "stepThree", execute: async ({context}) => { return { tripledValue: context.triggerData.inputValue * 3 } } })
const stepFour = new Step({ id: "stepFour", execute: async ({context}) => { if (context.steps.stepThree.status !== 'success') { return { isEven: false } }
return { isEven: context.getStepResult<{tripledValue: number}>('stepThree').tripledValue % 2 } } })
const parallelWorkflow = new Workflow({ name: 'parallelWorkflow', triggerSchema: z.object({ inputValue: z.number() }) })
parallelWorkflow .step(stepOne) .then(stepTwo) .step(stepThree) .then(stepFour) .commit()
const { start } = parallelWorkflow.createRun() const result = await start({ triggerData: {inputValue: 3}}) console.log('result', JSON.stringify(result))
|
