Chapter 3: Workflow Engine (Pocket Flow)#
Welcome back! In the previous chapters, we learned about the Web Interface (Frontend) that takes your request and the Serverless Deployment (Azure Functions) that receives that request and puts it on a waiting list (a queue) for processing.
But what exactly happens inside the Azure Function that takes a job from the queue? Generating a tutorial isn't a single, simple step. It involves fetching code, understanding it, talking to AI, organizing content, and saving files. That's a lot of tasks! How do we make sure they happen in the right order, handle information between them, and deal with potential issues?
This is where our Workflow Engine, a small framework called Pocket Flow, comes in.
The Problem: Orchestrating Complex Tasks#
Imagine you're baking a complicated cake. You don't just mix everything at once. You follow steps: measure flour, add sugar, mix wet ingredients, mix dry ingredients, combine, bake, cool, frost. Each step needs the result of the previous one, and you need to make sure you don't add the frosting before baking!
Similarly, generating a tutorial involves a sequence of operations. We need: 1. To get the code. 2. To figure out the main ideas or "abstractions" in the code. 3. To understand how those ideas connect. 4. To decide the best order to explain them in the tutorial. 5. To write each chapter based on its concept and relevant code. 6. To put all the chapters together into the final tutorial format.
If any step fails (like the AI call times out), we might need to retry that step without starting everything over. We also need a way for the results of one step (like the fetched code) to be easily available to the next step (like the step that identifies concepts).
The Solution: Pocket Flow#
Pocket Flow acts like a project supervisor or the recipe book for our tutorial generation process. It breaks the entire job into smaller, manageable pieces called Nodes. It then defines the order in which these Nodes should run, making sure the 'ingredients' (data) from one Node are passed correctly to the next.
Pocket Flow is designed to be lightweight and flexible. Its core ideas are simple but powerful:
- Nodes: The individual steps or tasks (like "Fetch Code" or "Write Chapter").
- Flows: How the Nodes are connected and the order they run in.
- Shared Store: A central place where all Nodes can read and write data they need.
Let's dive into these concepts using analogies.
Nodes: The Task Boxes#
Think of a Node as a small, self-contained box that does just one specific job. Our Nodes are written in Python.
Each Node typically has three main phases:
prep(shared)
: This is the "Get Ready" phase. The Node looks at theshared
data (our central table) and gathers everything it needs to do its job. It prepares the 'ingredients'. It returns this prepared data.exec(prep_res)
: This is the "Do the Work" phase. The Node performs its core task using the data prepared inprep
. This is where the main computation or external interaction happens (like calling the AI). This phase can be set up to automatically retry if it fails. It returns the result of its work.post(shared, prep_res, exec_res)
: This is the "Clean Up and Pass On" phase. The Node takes the result fromexec
and saves it back into theshared
data so other Nodes can use it. It also decides what happens next in the workflow (which Node should run after this one).
flowchart LR
shared_in[(Shared Store)] --> prep_node[Node.prep()]
prep_node --> prep_res_data[(Prepared Data)]
prep_res_data --> exec_node[Node.exec()]
exec_node --> exec_res_data[(Execution Result)]
exec_res_data --> post_node[Node.post()]
prep_res_data --> post_node
shared_in --> post_node
post_node --> shared_out[(Shared Store)]
post_node --> action_out[Action]
This diagram shows the flow within a single Node and how it interacts with the Shared Store
and returns an Action
to decide the next step.
Flows: The Recipe Steps#
A Flow is like the full recipe. It defines the sequence of Nodes and tells Pocket Flow which Node to run after the current one finishes.
You connect Nodes using a simple >>
syntax for the default path, or - "action_name" >>
for branching based on what a Node returns from its post
method. In our tutorial generation, it's mostly a straightforward sequence.
# Example connection
node_a >> node_b
node_b >> node_c
# Create the flow starting point
my_flow = Flow(start=node_a)
When you run my_flow.run(shared)
, Pocket Flow starts node_a
, waits for it to finish and decide the next action, then runs that next Node, and so on, until there are no more connected Nodes.
Shared Store: The Central Whiteboard#
The Shared Store
is a central place (like a Python dictionary in memory) where all Nodes can access and modify data. This is how information flows between the steps. The prep
phase reads from it, and the post
phase writes results back to it.
# Example using shared store
class NodeA(Node):
def post(self, shared, prep_res, exec_res):
# Put some data into the shared store
shared["step_a_result"] = "Data from A"
class NodeB(Node):
def prep(self, shared):
# Read data from the shared store
data_from_a = shared["step_a_result"]
return data_from_a # Pass to exec
# ... connect NodeA >> NodeB and run flow ...
This central shared store makes it easy for any Node in the workflow to get the information it needs from previous steps without complex direct connections between Nodes.
Our Project's Workflow#
In our tutorial generation project, the generate
Azure Function (from Chapter 2) is where a Pocket Flow runs. This Flow orchestrates all the steps needed to turn a GitHub repository or local code into a tutorial.
The main Flow is defined in function_app/flow.py
. Let's look at it:
# function_app/flow.py (Simplified)
from pocketflow import Flow
# Import all node classes from nodes.py
from .nodes import ( # Note the '.' for relative import
FetchRepo,
IdentifyAbstractions,
AnalyzeRelationships,
OrderChapters,
WriteChapters,
CombineTutorial
)
def create_tutorial_flow():
"""Creates and returns the codebase tutorial generation flow."""
# Instantiate nodes
fetch_repo = FetchRepo()
# Nodes that interact with LLM often have retries set
identify_abstractions = IdentifyAbstractions(max_retries=5, wait=20)
analyze_relationships = AnalyzeRelationships(max_retries=5, wait=20)
order_chapters = OrderChapters(max_retries=5, wait=20)
# WriteChapters is a BatchNode (explained below)
write_chapters = WriteChapters(max_retries=5, wait=20)
combine_tutorial = CombineTutorial()
# Connect nodes in sequence
fetch_repo >> identify_abstractions
identify_abstractions >> analyze_relationships
analyze_relationships >> order_chapters
order_chapters >> write_chapters
write_chapters >> combine_tutorial
# Create the flow starting with FetchRepo
tutorial_flow = Flow(start=fetch_repo)
return tutorial_flow
This code defines our "recipe" for tutorial generation. It creates instances of each "Task Box" (Node) and connects them one after another using the >>
operator. The whole process starts with the fetch_repo
node.
Here's a diagram of this flow:
flowchart LR
A[Fetch Repo] --> B[Identify Abstractions]
B --> C[Analyze Relationships]
C --> D[Order Chapters]
D --> E[Write Chapters]
E --> F[Combine Tutorial]
This visualizes the sequence: First fetch, then identify, then analyze, then order, then write, and finally combine and save the output.
The Nodes in Detail#
Let's look at simplified examples of what each of these Nodes does (you can see the full code in function_app/nodes.py
):
-
FetchRepo(Node)
:- Job: Get the code files from GitHub or a local directory.
prep
: Reads the repo URL, GitHub token (if any), and file patterns fromshared
.exec
: Calls utility functions (crawl_github_files
orcrawl_local_files
) to actually fetch the files. Returns a list of file paths and content.post
: Stores the list of files inshared["files"]
.
# function_app/nodes.py (Simplified FetchRepo) from pocketflow import Node # ... other imports ... class FetchRepo(Node): def prep(self, shared): # Reads repo_url, github_token, patterns, etc. from shared return {"repo_url": shared["repo_url"], "token": shared.get("github_token")} # Simplified def exec(self, prep_res): print(f"Crawling repository: {prep_res['repo_url']}...") # Calls actual fetching logic (omitted) result = {"files": {"path/to/file.py": "print('hello')"}, "info": {}} # Dummy result files_list = list(result.get("files", {}).items()) print(f"Fetched {len(files_list)} files.") return files_list # List of (path, content) def post(self, shared, prep_res, exec_res): shared["files"] = exec_res # Store list of files # returns "default" implicitly
-
IdentifyAbstractions(Node)
:- Job: Analyze the fetched code to find the main concepts or "abstractions" (like classes, modules, or key logic areas).
prep
: Reads thefiles
list fromshared
and formats the code content into a prompt for the AI.exec
: Calls the AI (call_llm
) with a prompt asking it to identify abstractions, names, descriptions, and relevant files (by index). Returns the AI's parsed response (a list of dictionaries). This node hasmax_retries=5, wait=20
because AI calls can be flaky.post
: Stores the list of identified abstractions inshared["abstractions"]
.
# function_app/nodes.py (Simplified IdentifyAbstractions) from pocketflow import Node from utils.call_llm import call_llm # Need call_llm class IdentifyAbstractions(Node): def prep(self, shared): files_data = shared["files"] # Formats files_data into a prompt string (omitted) context_prompt = "...formatted file content..." return context_prompt # Pass prompt to exec def exec(self, context_prompt): print("Identifying abstractions using LLM...") # Calls AI (omitted) # Returns parsed AI response, e.g., [{"name": "...", "description": "...", "files": [...]}] return [{"name": "Core Concept", "description": "...", "files": [0, 1]}] # Dummy result def post(self, shared, prep_res, exec_res): shared["abstractions"] = exec_res # Store list of abstractions
-
AnalyzeRelationships(Node)
:- Job: Figure out how the identified abstractions relate to each other.
prep
: Readsabstractions
andfiles
fromshared
, formats them into a prompt for the AI.exec
: Calls the AI (call_llm
) with a prompt asking for a project summary and a list of relationships between abstractions. Returns the parsed AI response. This node also has retries.post
: Stores the summary and relationship details inshared["relationships"]
.
# function_app/nodes.py (Simplified AnalyzeRelationships) from pocketflow import Node from utils.call_llm import call_llm class AnalyzeRelationships(Node): def prep(self, shared): abstractions = shared["abstractions"] # Formats abstractions and files into a prompt string (omitted) context_prompt = "...formatted abstractions and relevant files..." return context_prompt def exec(self, context_prompt): print("Analyzing relationships using LLM...") # Calls AI (omitted) # Returns parsed AI response, e.g., {"summary": "...", "details": [...]}} return {"summary": "...", "details": [{"from": 0, "to": 1, "label": "uses"}]} # Dummy result def post(self, shared, prep_res, exec_res): shared["relationships"] = exec_res # Store summary/relationships
-
OrderChapters(Node)
:- Job: Determine the best order to present the identified abstractions as tutorial chapters.
prep
: Readsabstractions
andrelationships
fromshared
, formats them into a prompt for the AI.exec
: Calls the AI (call_llm
) with a prompt asking for an ordered list of abstraction indices. Returns the parsed list of indices. This node has retries.post
: Stores the ordered list of indices inshared["chapter_order"]
.
# function_app/nodes.py (Simplified OrderChapters) from pocketflow import Node from utils.call_llm import call_llm class OrderChapters(Node): def prep(self, shared): abstractions = shared["abstractions"] relationships = shared["relationships"] # Formats abstractions and relationships into a prompt string (omitted) context_prompt = "...formatted data..." return context_prompt def exec(self, context_prompt): print("Determining chapter order using LLM...") # Calls AI (omitted) # Returns parsed AI response, e.g., [2, 0, 1] return [0, 1] # Dummy result (indices of abstractions) def post(self, shared, prep_res, exec_res): shared["chapter_order"] = exec_res # Store ordered indices
-
WriteChapters(BatchNode)
:- Job: Write the content for each tutorial chapter based on the ordered abstractions and relevant code.
- This is a special type of Node called a BatchNode. Instead of
exec
running once,exec
runs multiple times, once for each item returned byprep
. This is perfect for tasks that need to process a list of things independently, like writing multiple chapters. prep
: Readschapter_order
,abstractions
, andfiles
fromshared
. It prepares a list of data packets, one for each chapter to be written.exec(item)
: This is called for each chapter data packet prepared byprep
. It takes that chapter's data (abstraction details, relevant code) and formats a detailed prompt for the AI to write a single chapter's Markdown content. It calls the AI (call_llm
) and returns the generated Markdown string. It also keeps track of previously written chapters to include their summaries in the prompt context.post(shared, prep_res, exec_res_list)
: This is called afterexec
has finished for all items in the batch. It receives a list of all the results from theexec
calls (the Markdown content for every chapter). It stores this list inshared["chapters"]
.
# function_app/nodes.py (Simplified WriteChapters) from pocketflow import BatchNode # Note BatchNode from utils.call_llm import call_llm class WriteChapters(BatchNode): # This is a BatchNode def prep(self, shared): chapter_order = shared["chapter_order"] abstractions = shared["abstractions"] files_data = shared["files"] # Prepares a LIST of chapter-specific data items (omitted) items_to_process = [{"chapter_num": 1, "...": "..."}, {"chapter_num": 2, "...": "..."}, ...] # Dummy list print(f"Preparing to write {len(items_to_process)} chapters...") return items_to_process # Returns an iterable list def exec(self, item): # exec is called for EACH item chapter_num = item["chapter_num"] abstraction_name = item["abstraction_details"]["name"] print(f"Writing chapter {chapter_num} for: {abstraction_name} using LLM...") # Formats prompt for THIS chapter (omitted) # Calls AI (omitted) chapter_content = f"# Chapter {chapter_num}: {abstraction_name}\n..." # Dummy content return chapter_content # Returns content for ONE chapter def post(self, shared, prep_res, exec_res_list): # exec_res_list is ALL results shared["chapters"] = exec_res_list # Store list of ALL chapter contents print(f"Finished writing {len(exec_res_list)} chapters.")
-
CombineTutorial(Node)
:- Job: Take the individual chapter content, combine it with the project summary and relationship diagram, and save the final output.
prep
: Readsproject_name
,relationships
(for the summary/diagram),chapter_order
,abstractions
, andchapters
content fromshared
. It formats theindex.md
file content (including the project summary and a Mermaid diagram of relationships) and creates a list of dictionaries containing each chapter's filename and content.exec
: Takes the prepared index content and chapter files list. It then uploads these files to Azure Blob Storage using a helper function (upload_to_blob_storage
), structuring them under a path named after the project. It also creates a minimal localinfo.txt
file pointing to the blob storage URLs for convenience during development/debugging. Returns details about the uploaded files (including URLs). If blob storage fails, it falls back to saving files locally.post
: Stores the path where files were saved (either local or a reference to blob storage) inshared["final_output_dir"]
and information about the blob upload inshared["blob_storage_info"]
. This makes the output path available for potential later steps or logging.
# function_app/nodes.py (Simplified CombineTutorial) import os from pocketflow import Node # Need upload_to_blob_storage helper class CombineTutorial(Node): def prep(self, shared): project_name = shared["project_name"] relationships_data = shared["relationships"] chapters_content = shared["chapters"] # Formats index.md and list of chapter files (omitted) index_content = f"# Tutorial: {project_name}\n..." chapter_files = [{"filename": "01_chap.md", "content": "..."}] # Dummy list return {"output_path": f"output/{project_name}", "index_content": index_content, "chapter_files": chapter_files} def exec(self, prep_res): output_path = prep_res["output_path"] index_content = prep_res["index_content"] chapter_files = prep_res["chapter_files"] print(f"Combining tutorial and uploading...") # Calls upload_to_blob_storage or saves locally (omitted) # Returns details of saved/uploaded files return {"local_path": output_path, "blob_container": "tutorials", "blob_path": output_path} # Dummy result def post(self, shared, prep_res, exec_res): # exec_res might be a dict (blob info) or string (local path) if isinstance(exec_res, dict): shared["final_output_dir"] = exec_res["local_path"] shared["blob_storage_info"] = exec_res # Store blob info else: shared["final_output_dir"] = exec_res # Store local path print(f"\nTutorial generation complete!")
Key Features Used#
- Task Decomposition: Breaking the large job into smaller Nodes.
- Workflow: Defining the sequence using
Flow
and>>
. - Shared Store: Passing data between Nodes using a central dictionary.
- Batch Processing: Using
BatchNode
(WriteChapters
) to efficiently process multiple independent items (chapters). - Fault Tolerance: Using
max_retries
andwait
on Nodes that call external services (like the LLM) to automatically handle transient errors.
Pocket Flow provides the structure and reliability needed to coordinate these complex operations, ensuring that the tutorial generation pipeline runs step-by-step, handles failures, and correctly passes data from code fetching all the way to saving the final output.
Conclusion#
The Workflow Engine, powered by Pocket Flow, is the brain that coordinates the entire tutorial generation process. By breaking the job into Nodes, defining the execution Flow, and managing data with a Shared Store, it ensures that each step is performed correctly and in the right order, turning raw code into a structured, beginner-friendly tutorial. The use of features like Batch Nodes and retries makes the process efficient and robust.
Now that we understand the overall orchestration, let's look at the first major step in the process: fetching the code itself.
Generated by AI Codebase Knowledge Builder. References: 1(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/.clinerules), 2(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/.cursorrules), 3(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/.windsurfrules), 4(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/flow.py), 5(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/function_app/flow.py), 6(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/function_app/nodes.py), 7(https://github.com/hieuminh65/Tutorial-Codebase-Knowledge/blob/be7f595a38221b3dd7b1585dc226e47c815dec6e/nodes.py)