分割parquet文件.py

"""
把 parquet 文件导出 n 个 csv 文件。

应用场景:
    分割大个 parquet 文件成小文件,方便分块下载。

"""
import math
import os
import shutil

import pandas as pd
import pyarrow.parquet as par


def parquet2dataFrame(parquet_path: str, split_pieces: int):
    """
    根据 batch_size 把大个的 parquet 文件分成若干份。
    采用生成器方式,节省内存(以时间换空间)。
    :param parquet_path: parquet 文件路径。
    :param split_pieces: 分割份数。
    :return:
    """
    parquet_file = par.ParquetFile(parquet_path, memory_map=True)
    num_rows = parquet_file.metadata.num_rows  # 记录条数。
    print("记录条数:", num_rows)

    # batch_size: Maximum number of records to yield per batch.
    #     Batches may be smaller if there aren't enough rows in the file.
    batch_size = math.ceil(num_rows / split_pieces)
    print("每份的最大记录数:", batch_size)

    for batch in parquet_file.iter_batches(batch_size=batch_size):
        batch_df = batch.to_pandas()
        yield batch_df


def dataFrame2file(df: pd.DataFrame, output_path: str, file_name: int, file_extension: str):
    """
    dataFrame 到文件。
    :param df: pandas.core.frame.DataFrame。
    :param output_path:输出目录,提前创建好。
    :param file_name: 文件名。
    :param file_extension:文件扩展名。
    :return:
    """
    basename = "%02d.%s" % (file_name, file_extension)
    print("basename:", basename)
    # print("df:", df)
    file_path = os.path.join(output_path, basename)
    if file_extension == "parquet":
        df.to_parquet(file_path)
    elif file_extension == "csv":
        df.to_csv(file_path)
    elif file_extension == "xls":
        df.to_excel(file_path)
    else:
        print(f"Please enter the correct file format, error file_data --> {file_extension}")


def main():
    file_name = 0  # 导出的 csv 文件名序号。

    # 删除目录。
    if os.path.exists(output_path):
        shutil.rmtree(output_path)
    os.makedirs(output_path, exist_ok=True)  # 创建目录。

    for df in parquet2dataFrame(parquet_path, split_pieces):
        dataFrame2file(df, output_path, file_name, "csv")
        file_name += 1

    print("done!")


if __name__ == '__main__':
    parquet_path = 'part-00039-5114fd87-297e-42b0-9d11-50f1df323dfa-c000.snappy.parquet'
    split_pieces = 12  # 分割份数。
    output_path = "39csv"

    main()

文章来源地址https://uudwc.com/A/LR1PV

原文地址:https://blog.csdn.net/weixin_42193179/article/details/131617690

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请联系站长进行投诉反馈,一经查实,立即删除!

上一篇 2023年07月09日 09:16
下一篇 2023年07月09日 09:17