CDA数据分析师(CDA.cn)——真本事,心舒适
加快Python算法的四个方法(三)数据并行化
相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下Python的计算时间,减少大家在算法上的等待时间。以下给大家讲解关于数据并行化这方面的内容。
1.介绍
随着时间和处理器计算能力的增长,数据呈指数级增长,我们需要找到有效地处理数据的方法。那我们应该怎么办呢?
GPU是一种非常有效的解决方案。但是,GPU并不是为了机器学习而设计的,它是专门为复杂的图像处理和游戏而设计的。我们使算法能够在现有GPU上运行,并且确实取得了成果。现在,谷歌推出了一种名为TPU(张量处理单元)的新设备,该设备专门针对TensorFlow上的机器学习工作而量身定做的,其结果确实令人激动。同时英伟达在这方面也并没有退缩。 但是我们将来会在某个时候达到顶峰。即使我们我们现在拥有大量可用的数据集,但是单台机器或计算单元也不足以处理这样的负载。我们将不得不使用多台机器来完成我们的任务。我们将不得不并行化完成我们的任务。
接下来,我们将研究大多数情况下你将在Python中使用的一些方法。然后再介绍一下Dask和torch.multiprocessing。
CDA数据分析师(CDA.cn)——真本事,心舒适
2.池和进程
Python库的Pool和Process方法都来自于multiprocessing它为我们的任务启动了一个新的过程,但是方式有所不同。Process每次调用仅执行一个进程:
import multiprocessing as mp p = mp.Process(target= ##目标函数, args= ##参数到函数)
# 此调用将只生产一个进程,该进程将处理在后台使用给定的参数处理目标函数
但是这个过程还没有开始。要启动它,你必须执行以下操作:
p.start()
现在,你可以将其保留在此处,或者通过以下方式检查该过程是否完成:
p.join()
#现在它将等待进程完成。
不检查过程是否已完成有许多用途。例如,在客户端-服务器应用程序中,数据包丢失的可能性或无响应进程的可能性确实很低,我们可以忽略它,这可以使我们的速度大大提高。[取决于申请程序]
对于多个进程,你必须创建多个Process。你想做多少就可以做多少。当你调用.start()它们时,它们全部都将会启动。
processes =[mp.Process(target=func, args=(a, b)) for (a, b) in list] for p in processes: p.start() for p in processes: p.join()
另一方面, Pool启动固定数量的进程,然后我们可以为这些进程分配一些任务。因此,在特定的时间实例中,只有固定数量的进程将在运行,其余的将在等待状态中。进程的数量通常被选作设备的内核数,如果此参数为空,也是可以作为默认的状态的。
pool = mp.Pool(processes=2)
现在有许多方法可以应用在Pool。在Data Science中,我们可以避免使用的是Pool.apply和
Pool.map,因为它们会在任务完成后立即返回结果。Pool.apply仅采用一个参数,并且仅使用
一个过程,而Pool.map将接受许多参数,并将其放入我们Pool的过程中。
CDA数据分析师(CDA.cn)——真本事,心舒适
results = [pool.apply(func, (x)) for x in X]
# 或者
results = pool.map(func, (arg)) # 仅需要一个参数
考虑到我们前面的客户端-服务器应用程序的例子,此处预定义了要运行的最大进程数,因此,如果我们有很多请求/数据包,则n(仅在Pool中的最大进程)将运行一次,而其他将在等待其中一个进程插槽的队列中排队。
向量的所有元素的平方
# 我们如何使用数据框
# A: 你可以使用一些可以并行化的函数
df.shape
# (100, 100)
dfs = [df.iloc[i*25:i*25+25, 0] for i in range(4)] with Pool(4) as p:
res = p.map(np.exp, dfs)
for i in range(4): df.iloc[i*25:i*25+25, 0] = res[i]
# 它可以方便的对数据进行预处理
CDA数据分析师(CDA.cn)——真本事,心舒适
什么时候使用什么?
如果你有很多任务,但其中很少的任务是计算密集型的,则应使用Process。因为如果它们需要大量计算,它们可能会阻塞你的CPU,并且你的系统可能会崩溃。如果你的系统可以一次处理所有这些操作,那么他们就不必在队列中等待机会了。
并且当你的任务数量固定且它们的计算量很大时,应使用Pool。因为你同时释放他们,那么你的系统很可能会崩溃。
3.线程处理
什么!线程处理在python中进行?
python中的线程声誉。人们的这一点看法是对的。实际上,线程在大多数情况下是不起作用的。那么问题到底是什么呢?
问题就出在GIL(全局解释器锁定)上。GIL是在Python的开发初期就引入的,当时甚至在操作系统中都没有线程的概念。选择它是因为它的简单性。
GIL一次仅允许一个CPU进程。也就是说,它一次仅允许一个线程访问python解释器。因此,一个线程将整个解释器Lock,直到它完成。
对于单线程程序,它非常快,因为只有一个Lock要维护。随着python的流行,有效地推出GIL而不损害所有相关应用程序变得越来越困难。这就是为什么它仍然存在的原因。 但是,如果你的任务不受CPU限制,则仍然可以使用多线程并行(y)。也就是说,如果你的任务受I / O约束,则可以使用多个线程并获得加速。因为大多数时候这些任务都在等待其他代理(例如磁盘等)的响应,并且在这段时间内它们可以释放锁,而让其他任务同时获取它。?
NOTE: (来自于官方网页)
The GIL is controversial because it prevents multithreaded CPython programs from taking full advantage of multiprocessor systems in certain situations. Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPy number crunching, happen outside the GIL. Therefore it is only in multithreaded programs that spend a lot of time inside the GIL, interpreting CPython bytecode, that the GIL becomes a bottleneck. 以下是对官方网页的解释:
GIL是有争议的,因为它阻止多线程CPython程序在某些情况下充分利用多处理器系统。注意,潜在的阻塞或长时间运行的操作,如I/O、图像处理和NumPy数字处理,都发生在GIL之外。
CDA数据分析师(CDA.cn)——真本事,心舒适
因此,只有在花费大量时间在GIL内部解释CPython字节码的多线程程序中,GIL才会成为瓶颈。
因此,如果你的任务受IO限制,例如从服务器下载一些数据,对磁盘进行读/写等操作,则可以使用多个线程并获得加速。
from threading import Thread as t import queue
q = queue.Queue() # 用于放置和获取线程的结果 func_ = lambda q, args: q.put(func(args))
threads = [t(target=func_, args=(q, args)) for args in args_array] for t in threads: t.start() for t in threads: t.join() res = []
for t in threads: res.append(q.get())
# 这些结果不一定是按顺序排列的
要保存线程的结果,可以使用类似于Queue 的方法。为此,你将必须如上所示定义函数,或者可以在函数内部使用Queue.put(),但是为此,你必须更改函数定义以Queue`做为参数。 现在,你在队列中的结果不一定是按顺序排列的。如果希望结果按顺序排列,则可以传入一些计数器作为参数,如id作为参数,然后使用这些id来标识结果的来源。
threads = [t(func_, args = (i, q, args)) for i, args in enumerate(args_array)] # 并相应地更新函数 NOTE:
在pandas中的多处理中由于某些原因 'read.csv' 的方法并没有提供太多的加速,你可以考虑使用Dask做为替代
线程还是进程?
一个进程是重量级的,因为它可能包含许多自己的线程(包含至少一个线程),并且分配了自己的内存空间,而线程是轻量级的,因为它在父进程的内存区域上工作,因此制作起来更快。 进程内的线程之间的通信比较容易,因为它们共享相同的内存空间。而进程间的通信(IPC-进程间通信)则比较慢。但是,共享相同数据的线程又可能进入竞争状态,应谨慎使用Locks或使用类似的解决方案。