-
Notifications
You must be signed in to change notification settings - Fork 403
Description
dlt version
1.19.1
Describe the problem
When a pipeline name contains double underscores (__), state files are written successfully but cannot be restored on subsequent runs. This causes incremental loading to silently fail, re-processing all data from the beginning.
This issue only manifests when state sync from destination is required (e.g., running on job runners like Databricks, Airflow, or CI systems that start with a clean filesystem). When running locally with a persistent pipeline working directory, the local state file is used and everything works fine - which made this bug quite hard to find.
Root Cause
The filesystem destination uses __ as FILENAME_SEPARATOR when naming state files:
| FILENAME_SEPARATOR = "__" |
dlt/dlt/destinations/impl/filesystem/filesystem.py
Lines 769 to 774 in 10cd908
| def _get_state_file_name(self, pipeline_name: str, version_hash: str, load_id: str) -> str: | |
| """gets full path for schema file for a given hash""" | |
| return self.pathlib.join( # type: ignore[no-any-return] | |
| self.get_table_dir(self.schema.state_table_name), | |
| f"{pipeline_name}{FILENAME_SEPARATOR}{load_id}{FILENAME_SEPARATOR}{self._to_path_safe_string(version_hash)}.jsonl", | |
| ) |
When parsing state files, dlt splits by __ and expects exactly 3 parts:
dlt/dlt/destinations/impl/filesystem/filesystem.py
Lines 727 to 741 in 10cd908
| def _list_dlt_table_files( | |
| self, table_name: str, pipeline_name: str = None | |
| ) -> Iterator[Tuple[str, List[str]]]: | |
| all_files = self.list_table_files(table_name) | |
| if len(all_files) == 0: | |
| if not self.is_storage_initialized(): | |
| raise DestinationUndefinedEntity(table_name) | |
| for filepath in all_files: | |
| filename = os.path.splitext(os.path.basename(filepath))[0] | |
| fileparts = filename.split(FILENAME_SEPARATOR) | |
| if len(fileparts) != 3: | |
| continue | |
| # Filters only if pipeline_name provided | |
| if pipeline_name is None or fileparts[0] == pipeline_name: | |
| yield filepath, fileparts |
(notice this bit:
if len(fileparts) != 3:
continue # <-- Silently skips files that don't have exactly 3 partsIf the pipeline name itself contains __, the split produces more than 3 parts, and the state file is silently skipped.
Example
With pipeline name my_pipeline__resource:
- State file written:
my_pipeline__resource__1234567890.123__abc123hash.jsonl - When split by
__:["my_pipeline", "resource", "1234567890.123", "abc123hash"]→ 4 parts - Expected: 3 parts → file is skipped
- Result: "The state was not found in the destination" even though it exists
Impact
- Incremental loading breaks silently - users may not notice until they see duplicate data or performance issues
- The warning message "state was not found" is misleading since the state file does exist
- This is especially problematic when pipeline names are constructed dynamically (e.g.,
f"{source}_{table}"where components may contain underscores that get normalized to__) - Debugging is difficult because the issue doesn't reproduce locally
Environment
- dlt version: 1.19.1 (likely affects all versions using filesystem destination)
- Destination: filesystem
- State sync: enabled (restore_from_destination=True)
- Runtime: Databricks job runner (clean filesystem between runs)
Expected behavior
We need to fail the run. And add validation in dlt.pipeline() to reject pipeline names containing __.
Steps to reproduce
This bash script:
# Clean up from previous runs
rm -rf /tmp/test_dlt_destination
rm -rf ~/.dlt/pipelines/my_pipeline__with__underscores
# Create the Python reproduction script
cat > /tmp/test_dlt_double_underscore.py << 'EOF'
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "dlt[filesystem]==1.19.1",
# ]
# ///
import os
# Enable dlt logging before importing dlt
os.environ["RUNTIME__LOG_LEVEL"] = "INFO"
import dlt
from dlt.destinations import filesystem
# Pipeline name with double underscores (common when normalizing dots or other characters)
pipeline = dlt.pipeline(
pipeline_name="my_pipeline__with__underscores",
destination=filesystem(bucket_url="/tmp/test_dlt_destination"),
dataset_name="test_data"
)
@dlt.resource
def my_data():
yield {"id": 1, "value": "test"}
# Run the pipeline
pipeline.run(my_data())
print("Pipeline run completed")
EOF
echo "=== First run (writes state) ==="
uv run /tmp/test_dlt_double_underscore.py 2>&1 | grep -i "state" || true
echo ""
echo "=== Checking state file exists ==="
find /tmp/test_dlt_destination -name "*.jsonl" -path "*_dlt_pipeline_state*" | head -5
echo ""
echo "=== Simulating clean job runner (deleting local pipeline state) ==="
rm -rf ~/.dlt/pipelines/my_pipeline__with__underscores
echo ""
echo "=== Second run (should restore state but fails) ==="
uv run /tmp/test_dlt_double_underscore.py 2>&1 | grep -i "state"
echo ""
echo "=== Notice: 'state was not found' even though the state file exists! ==="This gives:
=== First run (writes state) ===
2025-12-03 15:53:02,844|[INFO]|43342|8393318720|dlt|pipeline.py|_restore_state_from_destination:1606|The state was not found in the destination filesystem (dlt.destinations.filesystem):test_data
=== Checking state file exists ===
/tmp/test_dlt_destination/test_data/_dlt_pipeline_state/my_pipeline__with__underscores__1764773497.998475__bb32ee1f4580d6fbb5aa3f8696c18dfd99e04f12f1507eb21b483bee81661dea.jsonl
/tmp/test_dlt_destination/test_data/_dlt_pipeline_state/my_pipeline__with__underscores__1764773520.456136__bb32ee1f4580d6fbb5aa3f8696c18dfd99e04f12f1507eb21b483bee81661dea.jsonl
=== Simulating clean job runner (deleting local pipeline state) ===
=== Second run (should restore state but fails) ===
2025-12-03 15:53:03,270|[INFO]|43348|8393318720|dlt|pipeline.py|_restore_state_from_destination:1606|The state was not found in the destination filesystem (dlt.destinations.filesystem):test_data
2025-12-03 15:53:03,314|[INFO]|43348|8393318720|dlt|worker.py|_get_items_normalizer:132|A file format for table _dlt_pipeline_state was specified to preferred in the resource so jsonl format being used.
2025-12-03 15:53:03,315|[INFO]|43348|8393318720|dlt|validate.py|validate_and_update_schema:26|Updating schema for table _dlt_pipeline_state with 1 deltas
2025-12-03 15:53:03,315|[INFO]|43348|8393318720|dlt|normalize.py|clean_x_normalizer:174|Table _dlt_pipeline_state has seen data for the first time with load id 1764773583.275845
2025-12-03 15:53:03,324|[INFO]|43348|8393318720|dlt|load.py|submit_job:172|Will load file 1764773583.275845/new_jobs/_dlt_pipeline_state.fdfa61a7f3.0.jsonl.gz with table name _dlt_pipeline_state
2025-12-03 15:53:03,324|[INFO]|43348|8393318720|dlt|load.py|complete_jobs:471|Job for _dlt_pipeline_state.fdfa61a7f3.jsonl.gz completed in load 1764773583.275845
=== Notice: 'state was not found' even though the state file exists! ===
Operating system
Linux, macOS
Runtime environment
Other
Python version
3.12
dlt data source
doesn't matter.
dlt destination
Filesystem & buckets
Other deployment details
No response
Additional information
No response
Metadata
Metadata
Assignees
Labels
Type
Projects
Status