上文已经知道了 LlamaIndex 中基本都 Tools 和 Agents ,现在创建可配置和可管理的 Workflow。

创建异步 workdlow

  • @step 装饰器用于将一个方法标记为一个 Workflow 步骤.
  • StartEvent 是 LlamaIndex 提供的特殊事件,用于开始 Workflow。
  • StopEvent 是 LlamaIndex 提供的特殊事件,用于结束 Workflow。

创建了简单的异步工作流。w.run() 是异步的,需要 await,实例:

from llama_index.core.workflow import StartEvent, StopEvent, Workflow, step

class MyWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> StopEvent:
        # do something here
        # ...
        return StopEvent(result="Hello, world!")

w = MyWorkflow(timeout=10, verbose=False)
result = await w.run()

创建多步的waorkflow:

class MultiStepWorkflow(Workflow):
    @step
    async def step1(self, ev: StartEvent) -> IntermediateEvent:
        return IntermediateEvent(data="processed")

    @step
    async def step2(self, ev: IntermediateEvent) -> StopEvent:
        return StopEvent(result=f"Final: {ev.data}")

workflow 中可以可串联多个 @step,通过自定义 Event 类传递数据。比如上例中的 IntermediateEvent

创建多步骤 workflow

要连接多个步骤,我们需要创建自定义事件来在步骤之间传递数据。为此,我们需要添加一个 Event ,它在步骤之间传递,并将第一个步骤的输出传递给第二个步骤

  • @step 装饰器用于将一个方法标记为一个 Workflow 步骤.
  • StartEvent 是 LlamaIndex 提供的特殊事件,用于开始 Workflow。
  • StopEvent 是 LlamaIndex 提供的特殊事件,用于结束 Workflow。
from llama_index.core.workflow import Event

class ProcessingEvent(Event):
    intermediate_result: str

class MultiStepWorkflow(Workflow):
    # 装饰器用于将一个方法标记为一个 Workflow 步骤
    # `StartEvent` 作为输入
    # 返回一个 ProcessingEvent 对象
    @step  
    async def step_one(self, ev: StartEvent) -> ProcessingEvent:
        # Process initial data
        return ProcessingEvent(intermediate_result="Step 1 complete")

    # 接收一个 ProcessingEvent 作为输入。这意味着 step_two 会在 step_one 完成并发出 ProcessingEvent 后被触发。
    # 返回一个 StopEvent 对象。StopEvent 是 LlamaIndex 提供的特殊事件,用于结束 Workflow。
    @step
    async def step_two(self, ev: ProcessingEvent) -> StopEvent:
        # Use the intermediate result
        final_result = f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)

w = MultiStepWorkflow(timeout=10, verbose=False)
result = await w.run()
result

上述 将step_one 和 step_two 连接起来了,通过自定义 Event 类传递数据:ProcessingEvent

使用分支和循环

from llama_index.core.workflow import Event
import random

class ProcessingEvent(Event):
    intermediate_result: str

class LoopEvent(Event):
    loop_output: str

class MultiStepWorkflow(Workflow):
    @step
    async def step_one(self, ev: StartEvent | LoopEvent) -> ProcessingEvent | LoopEvent:
        if random.randint(0, 1) == 0:
            print("Bad thing happened")
            return LoopEvent(loop_output="Back to step one.")
        else:
            print("Good thing happened")
            return ProcessingEvent(intermediate_result="First step complete.")

    @step
    async def step_two(self, ev: ProcessingEvent) -> StopEvent:
        # Use the intermediate result
        final_result = f"Finished processing: {ev.intermediate_result}"
        return StopEvent(result=final_result)


w = MultiStepWorkflow(verbose=False)
result = await w.run()
result

可能的一个返回:

Bad thing happened
Bad thing happened
Bad thing happened
Good thing happened
'Finished processing: First step complete.'

绘制工作流 draw_all_possible_flows

draw_all_possible_flows(w, "flow.html")

向工作流中添加状态

# store query in the context
await ctx.set("query", "What is the capital of France?")