python – Pandas和多处理内存管理:将DataFrame拆分为多个块

前端之家收集整理的这篇文章主要介绍了python – Pandas和多处理内存管理:将DataFrame拆分为多个块前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

我必须逐行处理一个庞大的pandas.DataFrame(几十GB),其中每行操作都很长(几十毫秒).所以我有了将框架拆分成块并使用多处理并行处理每个块的想法.这确实加快了任务,但内存消耗是一场噩梦.

虽然每个子进程原则上只占用一小部分数据,但它需要(几乎)与包含原始DataFrame的原始父进程一样多的内存.即使删除父进程中使用过的部分也无济于事.

我写了一个复制这种行为的最小例子.它唯一能做的就是创建一个带有随机数的大型DataFrame,将其分成最多100行的小块,并在多处理期间简单地打印一些有关DataFrame的信息(这里通过大小为4的mp.Pool).

并行执行的主要功能

  1. def just_wait_and_print_len_and_idx(df):
  2. """Waits for 5 seconds and prints df length and first and last index"""
  3. # Extract some info
  4. idx_values = df.index.values
  5. first_idx,last_idx = idx_values[0],idx_values[-1]
  6. length = len(df)
  7. pid = os.getpid()
  8. # Waste some cpu cycles
  9. time.sleep(1)
  10. # Print the info
  11. print('First idx {},last idx {} and len {} '
  12. 'from process {}'.format(first_idx,last_idx,length,pid))

帮助生成器将DataFrame分块为小块:

  1. def df_chunking(df,chunksize):
  2. """Splits df into chunks,drops data of original df inplace"""
  3. count = 0 # Counter for chunks
  4. while len(df):
  5. count += 1
  6. print('Preparing chunk {}'.format(count))
  7. # Return df chunk
  8. yield df.iloc[:chunksize].copy()
  9. # Delete data in place because it is no longer needed
  10. df.drop(df.index[:chunksize],inplace=True)

主要例程:

  1. def main():
  2. # Job parameters
  3. n_jobs = 4 # Poolsize
  4. size = (10000,1000) # Size of DataFrame
  5. chunksize = 100 # Maximum size of Frame Chunk
  6. # Preparation
  7. df = pd.DataFrame(np.random.rand(*size))
  8. pool = mp.Pool(n_jobs)
  9. print('Starting MP')
  10. # Execute the wait and print function in parallel
  11. pool.imap(just_wait_and_print_len_and_idx,df_chunking(df,chunksize))
  12. pool.close()
  13. pool.join()
  14. print('DONE')

标准输出如下所示:

  1. Starting MP
  2. Preparing chunk 1
  3. Preparing chunk 2
  4. First idx 0,last idx 99 and len 100 from process 9913
  5. First idx 100,last idx 199 and len 100 from process 9914
  6. Preparing chunk 3
  7. First idx 200,last idx 299 and len 100 from process 9915
  8. Preparing chunk 4
  9. ...
  10. DONE

问题:

主进程需要大约120MB的内存.但是,池的子进程需要相同的内存量,尽管它们只包含原始DataFame的1%(块大小为100,原始长度为10000).为什么?

我能做些什么呢?尽管我的分块,Python(3)是否将整个DataFrame发送到每个子进程?这是大熊猫内存管理的问题还是多处理和数据酸洗的错误?谢谢!

用于简单复制和粘贴的整个脚本,以防您想自己尝试:

  1. import multiprocessing as mp
  2. import pandas as pd
  3. import numpy as np
  4. import time
  5. import os
  6. def just_wait_and_print_len_and_idx(df):
  7. """Waits for 5 seconds and prints df length and first and last index"""
  8. # Extract some info
  9. idx_values = df.index.values
  10. first_idx,pid))
  11. def df_chunking(df,inplace=True)
  12. def main():
  13. # Job parameters
  14. n_jobs = 4 # Poolsize
  15. size = (10000,chunksize))
  16. pool.close()
  17. pool.join()
  18. print('DONE')
  19. if __name__ == '__main__':
  20. main()
最佳答案
好的,所以我在SebastianOpałczyński的评论中暗示了这一点.

问题是子进程是从父进程分叉的,所以它们都包含对原始DataFrame的引用.但是,帧在原始进程中被操作,因此写时复制行为会缓慢地终止并最终在达到物理内存限制时终止.

有一个简单的解决方案:我使用多处理的新上下文功能代替pool = mp.Pool(n_jobs):

  1. ctx = mp.get_context('spawn')
  2. pool = ctx.Pool(n_jobs)

这可以保证池进程只是生成而不是从父进程分叉.因此,他们都没有访问原始DataFrame,并且所有这些只需要父级内存的一小部分.

请注意,mp.get_context(‘spawn’)仅在Python 3.4及更高版本中可用.

猜你在找的Python相关文章