|
| 1 | +--- |
| 2 | +title: "Pipeline Loops" |
| 3 | +id: pipeline-loops |
| 4 | +slug: "/pipeline-loops" |
| 5 | +description: "Understand how loops work in Haystack pipelines, how they terminate, and how to use them safely for feedback and self-correction." |
| 6 | +--- |
| 7 | + |
| 8 | +# Pipeline Loops |
| 9 | + |
| 10 | +Learn how loops work in Haystack pipelines, how they terminate, and how to use them for feedback and self-correction. |
| 11 | + |
| 12 | +Haystack pipelines support **loops**: cycles in the component graph where the output of a later component is fed back into an earlier one. |
| 13 | +This enables feedback flows such as self-correction, validation, or iterative refinement, as well as more advanced [agentic behavior](../pipelines.mdx#agentic-pipelines). |
| 14 | + |
| 15 | +At runtime, the pipeline re-runs a component whenever all of its required inputs are ready again. |
| 16 | +You control when loops stop either by designing your graph and routing logic carefully or by using built-in [safety limits](#loop-termination-and-safety-limits). |
| 17 | + |
| 18 | +## Multiple Runs of the Same Component |
| 19 | + |
| 20 | +If a component participates in a loop, it can be run multiple times within a single `Pipeline.run()` call. |
| 21 | +The pipeline keeps an internal visit counter for each component: |
| 22 | + |
| 23 | +- Each time the component runs, its visit count increases by 1. |
| 24 | +- You can use this visit count in debugging tools like [breakpoints](./pipeline-breakpoints.mdx) to inspect specific iterations of a loop. |
| 25 | + |
| 26 | +In the final pipeline result: |
| 27 | + |
| 28 | +- For each component that ran, the pipeline returns **only the last-produced output**. |
| 29 | +- To capture outputs from intermediate components (for example, a validator or a router) in the final result dictionary, use the `include_outputs_from` argument of `Pipeline.run()`. |
| 30 | + |
| 31 | +## Loop Termination and Safety Limits |
| 32 | + |
| 33 | +Loops must eventually stop so that a pipeline run can complete. |
| 34 | +There are two main ways a loop ends: |
| 35 | + |
| 36 | +1. **Natural completion**: No more components are runnable |
| 37 | + The pipeline finishes when the work queue is empty and no component can run again (for example, the router stops feeding inputs back into the loop). |
| 38 | + |
| 39 | +2. **Reaching the maximum run count** |
| 40 | + Every pipeline has a per-component run limit, controlled by the `max_runs_per_component` parameter of the `Pipeline` (or `AsyncPipeline`) constructor, which is `100` by default. If any component exceeds this limit, Haystack raises a `PipelineMaxComponentRuns` error. |
| 41 | + |
| 42 | + You can set this limit to a lower value: |
| 43 | + |
| 44 | + ```python |
| 45 | + from haystack import Pipeline |
| 46 | + |
| 47 | + pipe = Pipeline(max_runs_per_component=5) |
| 48 | + ``` |
| 49 | + |
| 50 | + The limit is checked before each execution, so a component with a limit of 3 will complete 3 runs successfully before the error is raised on the 4th attempt. |
| 51 | + |
| 52 | + This safeguard is especially important when experimenting with new loops or complex routing logic. |
| 53 | + If your loop condition is wrong or never satisfied, the error prevents the pipeline from running indefinitely. |
| 54 | + |
| 55 | +## Example: Feedback Loop for Self-Correction |
| 56 | + |
| 57 | +The following example shows a simple feedback loop where: |
| 58 | + |
| 59 | +- A `ChatPromptBuilder` creates a prompt that includes previous incorrect replies. |
| 60 | +- An `OpenAIChatGenerator` produces an answer. |
| 61 | +- A `ConditionalRouter` checks if the answer is correct: |
| 62 | + - If correct, it sends the answer to `final_answer` and the loop ends. |
| 63 | + - If incorrect, it sends the answer back to the `ChatPromptBuilder`, which triggers another iteration. |
| 64 | + |
| 65 | +```python |
| 66 | +from haystack import Pipeline |
| 67 | +from haystack.components.builders import ChatPromptBuilder |
| 68 | +from haystack.components.generators.chat import OpenAIChatGenerator |
| 69 | +from haystack.components.routers import ConditionalRouter |
| 70 | +from haystack.dataclasses import ChatMessage |
| 71 | + |
| 72 | +template = [ |
| 73 | + ChatMessage.from_system("Answer the following question concisely with just the answer, no punctuation."), |
| 74 | + ChatMessage.from_user( |
| 75 | + "{% if previous_replies %}" |
| 76 | + "Previously you replied incorrectly: {{ previous_replies[0].text }}\n" |
| 77 | + "{% endif %}" |
| 78 | + "Question: {{ query }}" |
| 79 | + ), |
| 80 | +] |
| 81 | + |
| 82 | +prompt_builder = ChatPromptBuilder(template=template, required_variables=["query"]) |
| 83 | +generator = OpenAIChatGenerator() |
| 84 | + |
| 85 | +router = ConditionalRouter( |
| 86 | + routes=[ |
| 87 | + { |
| 88 | + # End the loop when the answer is correct |
| 89 | + "condition": "{{ 'Rome' in replies[0].text }}", |
| 90 | + "output": "{{ replies }}", |
| 91 | + "output_name": "final_answer", |
| 92 | + "output_type": list[ChatMessage], |
| 93 | + }, |
| 94 | + { |
| 95 | + # Loop back when the answer is incorrect |
| 96 | + "condition": "{{ 'Rome' not in replies[0].text }}", |
| 97 | + "output": "{{ replies }}", |
| 98 | + "output_name": "previous_replies", |
| 99 | + "output_type": list[ChatMessage], |
| 100 | + }, |
| 101 | + ], |
| 102 | + unsafe=True, # Required to handle ChatMessage objects |
| 103 | +) |
| 104 | + |
| 105 | +pipe = Pipeline(max_runs_per_component=3) |
| 106 | + |
| 107 | +pipe.add_component("prompt_builder", prompt_builder) |
| 108 | +pipe.add_component("generator", generator) |
| 109 | +pipe.add_component("router", router) |
| 110 | + |
| 111 | +pipe.connect("prompt_builder.prompt", "generator.messages") |
| 112 | +pipe.connect("generator.replies", "router.replies") |
| 113 | +pipe.connect("router.previous_replies", "prompt_builder.previous_replies") |
| 114 | + |
| 115 | +result = pipe.run( |
| 116 | + { |
| 117 | + "prompt_builder": { |
| 118 | + "query": "What is the capital of Italy? If the statement 'Previously you replied incorrectly:' is missing " |
| 119 | + "above then answer with Milan.", |
| 120 | + } |
| 121 | + }, |
| 122 | + include_outputs_from={"router", "prompt_builder"}, |
| 123 | +) |
| 124 | + |
| 125 | +print(result["prompt_builder"]["prompt"][1].text) # Shows the last prompt used |
| 126 | +print(result["router"]["final_answer"][0].text) # Rome |
| 127 | +``` |
| 128 | + |
| 129 | +### What Happens During This Loop |
| 130 | + |
| 131 | +1. **First iteration** |
| 132 | + - `prompt_builder` runs with `query="What is the capital of Italy?"` and no previous replies. |
| 133 | + - `generator` returns a `ChatMessage` with the LLM's answer. |
| 134 | + - The router evaluates its conditions and checks if `"Rome"` is in the reply. |
| 135 | + - If the answer is incorrect, `previous_replies` is fed back into `prompt_builder.previous_replies`. |
| 136 | + |
| 137 | +2. **Subsequent iterations** (if needed) |
| 138 | + - `prompt_builder` runs again, now including the previous incorrect reply in the user message. |
| 139 | + - `generator` produces a new answer with the additional context. |
| 140 | + - The router checks again whether the answer contains `"Rome"`. |
| 141 | + |
| 142 | +3. **Termination** |
| 143 | + - When the router routes to `final_answer`, no more inputs are fed back into the loop. |
| 144 | + - The queue empties and the pipeline run finishes successfully. |
| 145 | + |
| 146 | +Because we used `max_runs_per_component=3`, any unexpected behavior that causes the loop to continue would raise a `PipelineMaxComponentRuns` error instead of looping forever. |
| 147 | + |
| 148 | +## Components for Building Loops |
| 149 | + |
| 150 | +Two components are particularly useful for building loops: |
| 151 | + |
| 152 | +- **[`ConditionalRouter`](../../pipeline-components/routers/conditionalrouter.mdx)**: Routes data to different outputs based on conditions. Use it to decide whether to exit the loop or continue iterating. The example above uses this pattern. |
| 153 | + |
| 154 | +- **[`BranchJoiner`](../../pipeline-components/joiners/branchjoiner.mdx)**: Merges inputs from multiple sources into a single output. Use it when a component inside the loop needs to receive both the initial input (on the first iteration) and looped-back values (on subsequent iterations). For example, you might use `BranchJoiner` to feed both user input and validation errors into the same Generator. See the [BranchJoiner documentation](../../pipeline-components/joiners/branchjoiner.mdx#enabling-loops) for a complete loop example. |
| 155 | + |
| 156 | +## Greedy vs. Lazy Variadic Sockets in Loops |
| 157 | + |
| 158 | +Some components support variadic inputs that can receive multiple values on a single socket. |
| 159 | +In loops, variadic behavior controls how inputs are consumed across iterations. |
| 160 | + |
| 161 | +- **Greedy variadic sockets** |
| 162 | + Consume exactly one value at a time and remove it after the component runs. |
| 163 | + This includes user-provided inputs, which prevents them from retriggering the component indefinitely. |
| 164 | + Most variadic sockets are greedy by default. |
| 165 | + |
| 166 | +- **Lazy variadic sockets** |
| 167 | + Accumulate all values received from predecessors across iterations. |
| 168 | + Useful when you need to collect multiple partial results over time (for example, gathering outputs from several loop iterations before proceeding). |
| 169 | + |
| 170 | +For most loop scenarios it's sufficient to just connect components as usual and use `max_runs_per_component` to protect against mistakes. |
| 171 | + |
| 172 | +## Troubleshooting Loops |
| 173 | + |
| 174 | +If your pipeline seems stuck or runs longer than expected, here are common causes and how to debug them. |
| 175 | + |
| 176 | +### Common Causes of Infinite Loops |
| 177 | + |
| 178 | +1. **Condition never satisfied**: Your exit condition (for example, `"Rome" in reply`) might never be true due to LLM behavior or data issues. Always set a reasonable `max_runs_per_component` as a safety net. |
| 179 | + |
| 180 | +2. **Relying on optional outputs**: When a component has multiple output sockets but only returns some of them, the unreturned outputs don't trigger their downstream connections. This can cause confusion in loops. |
| 181 | + |
| 182 | + For example, this pattern can be problematic: |
| 183 | + |
| 184 | + ```python |
| 185 | + @component |
| 186 | + class Validator: |
| 187 | + @component.output_types(valid=str, invalid=Optional[str]) |
| 188 | + def run(self, text: str): |
| 189 | + if is_valid(text): |
| 190 | + return {"valid": text} # "invalid" is never returned |
| 191 | + else: |
| 192 | + return {"invalid": text} |
| 193 | + ``` |
| 194 | + |
| 195 | + If you connect `invalid` back to an upstream component for retry, but also have other connections that keep the loop alive, you might get unexpected behavior. |
| 196 | + |
| 197 | + Instead, use a `ConditionalRouter` with explicit, mutually exclusive conditions: |
| 198 | + |
| 199 | + ```python |
| 200 | + router = ConditionalRouter( |
| 201 | + routes=[ |
| 202 | + {"condition": "{{ is_valid }}", "output": "{{ text }}", "output_name": "valid", ...}, |
| 203 | + {"condition": "{{ not is_valid }}", "output": "{{ text }}", "output_name": "invalid", ...}, |
| 204 | + ] |
| 205 | + ) |
| 206 | + ``` |
| 207 | + |
| 208 | +3. **User inputs retriggering the loop**: If a user-provided input is connected to a socket inside the loop, it might cause the loop to restart unexpectedly. |
| 209 | + |
| 210 | + ```python |
| 211 | + # Problematic: user input goes directly to a component inside the loop |
| 212 | + result = pipe.run({ |
| 213 | + "generator": {"prompt": query}, # This input persists and may retrigger the loop |
| 214 | + }) |
| 215 | + |
| 216 | + # Better: use an entry-point component outside the loop |
| 217 | + result = pipe.run({ |
| 218 | + "prompt_builder": {"query": query}, # Entry point feeds into the loop once |
| 219 | + }) |
| 220 | + ``` |
| 221 | + |
| 222 | + See [Greedy vs. Lazy Variadic Sockets](#greedy-vs-lazy-variadic-sockets-in-loops) for details on how inputs are consumed. |
| 223 | + |
| 224 | +4. **Multiple paths feeding the same component**: If a component inside the loop receives inputs from multiple sources, it runs whenever *any* path provides input. |
| 225 | + |
| 226 | + ```python |
| 227 | + # Component receives from two sources – runs when either provides input |
| 228 | + pipe.connect("source_a.output", "processor.input") |
| 229 | + pipe.connect("source_b.output", "processor.input") # Variadic input |
| 230 | + ``` |
| 231 | + |
| 232 | + Ensure you understand when each path produces output, or use `BranchJoiner` to explicitly control the merge point. |
| 233 | + |
| 234 | +### Debugging Tips |
| 235 | + |
| 236 | +1. **Start with a low limit**: When developing loops, set `max_runs_per_component=3` or similar. This helps you catch issues early with a clear error instead of waiting for a timeout. |
| 237 | + |
| 238 | +2. **Use `include_outputs_from`**: Add intermediate components (like your router) to see what's happening at each step: |
| 239 | + ```python |
| 240 | + result = pipe.run(data, include_outputs_from={"router", "validator"}) |
| 241 | + ``` |
| 242 | + |
| 243 | +3. **Enable tracing**: Use tracing to see every component execution, including inputs and outputs. This makes it easy to follow each iteration of the loop. For quick debugging, use `LoggingTracer` ([setup instructions](./debugging-pipelines.mdx#real-time-pipeline-logging)). For deeper analysis, integrate with tools like Langfuse or other [tracing backends](../../development/tracing.mdx). |
| 244 | + |
| 245 | +4. **Visualize the pipeline**: Use `pipe.draw()` or `pipe.show()` to see the graph structure and verify your connections are correct. See the [Pipeline Visualization](./visualizing-pipelines.mdx) documentation for details. |
| 246 | + |
| 247 | +5. **Use breakpoints**: Set a `Breakpoint` on a specific component and visit count to inspect the state at that iteration. See [Pipeline Breakpoints](./pipeline-breakpoints.mdx) for details. |
| 248 | + |
| 249 | +6. **Check for blocked pipelines**: If you see a `PipelineComponentsBlockedError`, it means no components can run. This typically indicates a missing connection or a circular dependency. Check that all required inputs are provided. |
| 250 | + |
| 251 | +By combining careful graph design, per-component run limits, and these debugging tools, you can build robust feedback loops in your Haystack pipelines. |
0 commit comments