上文已经知道了 LlamaIndex 中基本都 Tools 和 Agents ,现在创建可配置和可管理的 Workflow。
创建异步 workdlow
@step装饰器用于将一个方法标记为一个 Workflow 步骤.StartEvent是 LlamaIndex 提供的特殊事件,用于开始 Workflow。StopEvent是 LlamaIndex 提供的特殊事件,用于结束 Workflow。
创建了简单的异步工作流。w.run() 是异步的,需要 await,实例:
from llama_index.core.workflow import StartEvent, StopEvent, Workflow, step
class MyWorkflow(Workflow):
@step
async def my_step(self, ev: StartEvent) -> StopEvent:
# do something here
# ...
return StopEvent(result="Hello, world!")
w = MyWorkflow(timeout=10, verbose=False)
result = await w.run()
创建多步的waorkflow:
class MultiStepWorkflow(Workflow):
@step
async def step1(self, ev: StartEvent) -> IntermediateEvent:
return IntermediateEvent(data="processed")
@step
async def step2(self, ev: IntermediateEvent) -> StopEvent:
return StopEvent(result=f"Final: {ev.data}")
workflow 中可以可串联多个 @step,通过自定义 Event 类传递数据。比如上例中的 IntermediateEvent。
创建多步骤 workflow
要连接多个步骤,我们需要创建自定义事件来在步骤之间传递数据。为此,我们需要添加一个 Event ,它在步骤之间传递,并将第一个步骤的输出传递给第二个步骤。
@step装饰器用于将一个方法标记为一个 Workflow 步骤.StartEvent是 LlamaIndex 提供的特殊事件,用于开始 Workflow。StopEvent是 LlamaIndex 提供的特殊事件,用于结束 Workflow。
from llama_index.core.workflow import Event
class ProcessingEvent(Event):
intermediate_result: str
class MultiStepWorkflow(Workflow):
# 装饰器用于将一个方法标记为一个 Workflow 步骤
# `StartEvent` 作为输入
# 返回一个 ProcessingEvent 对象
@step
async def step_one(self, ev: StartEvent) -> ProcessingEvent:
# Process initial data
return ProcessingEvent(intermediate_result="Step 1 complete")
# 接收一个 ProcessingEvent 作为输入。这意味着 step_two 会在 step_one 完成并发出 ProcessingEvent 后被触发。
# 返回一个 StopEvent 对象。StopEvent 是 LlamaIndex 提供的特殊事件,用于结束 Workflow。
@step
async def step_two(self, ev: ProcessingEvent) -> StopEvent:
# Use the intermediate result
final_result = f"Finished processing: {ev.intermediate_result}"
return StopEvent(result=final_result)
w = MultiStepWorkflow(timeout=10, verbose=False)
result = await w.run()
result
上述 将step_one 和 step_two 连接起来了,通过自定义 Event 类传递数据:ProcessingEvent。
使用分支和循环
from llama_index.core.workflow import Event
import random
class ProcessingEvent(Event):
intermediate_result: str
class LoopEvent(Event):
loop_output: str
class MultiStepWorkflow(Workflow):
@step
async def step_one(self, ev: StartEvent | LoopEvent) -> ProcessingEvent | LoopEvent:
if random.randint(0, 1) == 0:
print("Bad thing happened")
return LoopEvent(loop_output="Back to step one.")
else:
print("Good thing happened")
return ProcessingEvent(intermediate_result="First step complete.")
@step
async def step_two(self, ev: ProcessingEvent) -> StopEvent:
# Use the intermediate result
final_result = f"Finished processing: {ev.intermediate_result}"
return StopEvent(result=final_result)
w = MultiStepWorkflow(verbose=False)
result = await w.run()
result
可能的一个返回:
Bad thing happened
Bad thing happened
Bad thing happened
Good thing happened
'Finished processing: First step complete.'
绘制工作流 draw_all_possible_flows
draw_all_possible_flows(w, "flow.html")
向工作流中添加状态
# store query in the context
await ctx.set("query", "What is the capital of France?")