""" 把 parquet 文件导出 n 个 csv 文件。 应用场景: 分割大个 parquet 文件成小文件,方便分块下载。 """ import math import os import shutil import pandas as pd import pyarrow.parquet as par def parquet2dataFrame(parquet_path: str, split_pieces: int): """ 根据 batch_size 把大个的 parquet 文件分成若干份。 采用生成器方式,节省内存(以时间换空间)。 :param parquet_path: parquet 文件路径。 :param split_pieces: 分割份数。 :return: """ parquet_file = par.ParquetFile(parquet_path, memory_map=True) num_rows = parquet_file.metadata.num_rows # 记录条数。 print("记录条数:", num_rows) # batch_size: Maximum number of records to yield per batch. # Batches may be smaller if there aren't enough rows in the file. batch_size = math.ceil(num_rows / split_pieces) print("每份的最大记录数:", batch_size) for batch in parquet_file.iter_batches(batch_size=batch_size): batch_df = batch.to_pandas() yield batch_df def dataFrame2file(df: pd.DataFrame, output_path: str, file_name: int, file_extension: str): """ dataFrame 到文件。 :param df: pandas.core.frame.DataFrame。 :param output_path:输出目录,提前创建好。 :param file_name: 文件名。 :param file_extension:文件扩展名。 :return: """ basename = "%02d.%s" % (file_name, file_extension) print("basename:", basename) # print("df:", df) file_path = os.path.join(output_path, basename) if file_extension == "parquet": df.to_parquet(file_path) elif file_extension == "csv": df.to_csv(file_path) elif file_extension == "xls": df.to_excel(file_path) else: print(f"Please enter the correct file format, error file_data --> {file_extension}") def main(): file_name = 0 # 导出的 csv 文件名序号。 # 删除目录。 if os.path.exists(output_path): shutil.rmtree(output_path) os.makedirs(output_path, exist_ok=True) # 创建目录。 for df in parquet2dataFrame(parquet_path, split_pieces): dataFrame2file(df, output_path, file_name, "csv") file_name += 1 print("done!") if __name__ == '__main__': parquet_path = 'part-00039-5114fd87-297e-42b0-9d11-50f1df323dfa-c000.snappy.parquet' split_pieces = 12 # 分割份数。 output_path = "39csv" main()
文章来源地址https://uudwc.com/A/LR1PV
文章来源:https://uudwc.com/A/LR1PV