-
Notifications
You must be signed in to change notification settings - Fork 354
Description
Is your feature request related to a problem?
Currently, there are two approaches to write data to object store:
- daft native IO
- pyarrow fs
If we are trying to implement a new native IO, we need to consider how to handle another approach, there are some coupled logic here.
From file format level
- for parquet format, will use native IO except the written schema cannot be converted to arrow schema(BTW, which schema cannot be converted? any unit test case on this case?)
- for json format, will use native IO
- for csv format, will use pyarrow fs, because we doesn't implement rust csv writer currently.
From the DataFrame level
if we are trying to write a empty DataFrame to parquet/csv/json file, since there is no any data/MicroPartition been written by native sink writer, so no file generated, and then we generate the empty in Python code level via pyarrow fs.
Issue/Question 1:
There is a issue that we don't support write a JSON file via pyfs, so write a empty DataFrame to json file will failed. Actually, unlike csv or parquet format, we also cannot write a empty json file even though we have the schema info, and also we cannot read a empty json file because cannot infer the schema.
So what's the better behavior for this case? Raise exception with better message directly, or just log warning info and won't generate files?
Issue/Question 2:
there is a strange behavior that if we write parquet/json/csv with partition columns. If the DataFrame is not empty, the path of generated files might be {root_dir}/p1=a/p2=b/xxxxxxxx.parquet, but if the DataFrame is empty, the path of generated empty file might be {root_dir}/xxxxx.parquet, they are in different layer. Actually, we cannot write the empty parquet into partition layer because missing partition value?
So is it a expected behavior? Is there any issue if we don't generate the empty file under root_dir? What I got is that the daft.read_parquet(path) might be failed after empty_df.write_parquet(path).
From overwrite level
If we declare overwrite the DataFrame during write_parquet/write_json, once the WriteSink finish write data, the CommitSink will receive the written files list, and then using pyarrow fs to scan the root dir and filter the files need to be deleted, and delete them one by one.
Issue/Question 3:
I guess the reason they use pyfs to delete files is that the current ObjectSoruce trait doesn't support delete API, I think we can add it?
Issue/Question 4:
The consistency problem about overwrite.(To be honest, it's hard to keep the consistency during handle multiple files on object storage). The currently implement might meet the follow problems:
- If the delete any files failed, there are some dirty files remaining.
- If there are multiple daft job write data to same folder(it's common in big data processing domain), it will delete some files wrongly? e.g. the Job A scan the files generated by a running Job B, and delete it.
From Mapreduce/Spark experience, the running job should write data into a staging dir, and then rename the staging dir to target file via Job Commiter. But unfortunately, most object storage doesn't support rename dir/object(the directory bucket of tos support it).
So there is another approach that write data to target files with MPU, but doesn't completeMPU before commit, and the then complete MPU during commit phase to make all written files visible. But this approach is useful for create/append case, regarding overwrite case, still need renaming abilities.
Describe the solution you'd like
TODO
Describe alternatives you've considered
No response
Additional Context
No response
Would you like to implement a fix?
Yes