我注意到,顺序n和参数x jv(n,x)的
scipy.special
贝塞尔函数在x中被矢量化:
在[14]中:将scipy.special导入sp
在[16]中:sp.jv(1,范围(3))#n = 1,[x = 0,1,2]
Out [16]:array([0.,0.44005059,0.57672481])
但是没有相应的矢量化形式的球形贝塞尔函数,sp.sph_jn:
- In [19]: sp.sph_jn(1,range(3))
- ---------------------------------------------------------------------------
- ValueError Traceback (most recent call last)
- <ipython-input-19-ea59d2f45497> in <module>()
- ----> 1 sp.sph_jn(1,range(3)) #n=1,3 value array
- /home/glue/anaconda/envs/fibersim/lib/python2.7/site-packages/scipy/special/basic.pyc in sph_jn(n,z)
- 262 """
- 263 if not (isscalar(n) and isscalar(z)):
- --> 264 raise ValueError("arguments must be scalars.")
- 265 if (n != floor(n)) or (n < 0):
- 266 raise ValueError("n must be a non-negative integer.")
- ValueError: arguments must be scalars.
此外,球形贝塞尔函数在一次通过中计算N的所有阶数.因此,如果我想要参数x = 10的n = 5 Bessel函数,则返回n = 1,2,3,4,5.它实际上在一次传递中返回jn及其衍生物:
- In [21]: sp.sph_jn(5,10)
- Out[21]:
- (array([-0.05440211,0.07846694,0.07794219,-0.03949584,-0.10558929,-0.05553451]),array([-0.07846694,-0.0700955,0.05508428,0.09374053,0.0132988,-0.07226858]))
为什么API中存在这种不对称性,并且有没有人知道一个库将返回矢量化的球形贝塞尔函数,或者至少更快(即在cython中)?
解决方法
你可以编写一个cython函数来加速计算,你要做的第一件事是获取fortran函数SPHJ的地址,这里是如何在Python中执行此操作:
- from scipy import special as sp
- sphj = sp.specfun.sphj
- import ctypes
- addr = ctypes.pythonapi.PyCObject_AsVoidPtr(ctypes.py_object(sphj._cpointer))
然后你可以直接在Cython中调用fortran函数,注意我使用prange()来使用multicore来加速计算:
- %%cython -c-Ofast -c-fopenmp --link-args=-fopenmp
- from cpython.mem cimport PyMem_Malloc,PyMem_Free
- from cython.parallel import prange
- import numpy as np
- import cython
- from cpython cimport PyCObject_AsVoidPtr
- from scipy import special
- ctypedef void (*sphj_ptr) (const int *n,const double *x,const int *nm,const double *sj,const double *dj) nogil
- cdef sphj_ptr _sphj=<sphj_ptr>PyCObject_AsVoidPtr(special.specfun.sphj._cpointer)
- @cython.wraparound(False)
- @cython.boundscheck(False)
- def cython_sphj2(int n,double[::1] x):
- cdef int count = x.shape[0]
- cdef double * sj = <double *>PyMem_Malloc(count * sizeof(double) * (n + 1))
- cdef double * dj = <double *>PyMem_Malloc(count * sizeof(double) * (n + 1))
- cdef int * mn = <int *>PyMem_Malloc(count * sizeof(int))
- cdef double[::1] res = np.empty(count)
- cdef int i
- if count < 100:
- for i in range(x.shape[0]):
- _sphj(&n,&x[i],mn + i,sj + i*(n+1),dj + i*(n+1))
- res[i] = sj[i*(n+1) + n] #choose the element you want here
- else:
- for i in prange(count,nogil=True):
- _sphj(&n,dj + i*(n+1))
- res[i] = sj[i*(n+1) + n] #choose the element you want here
- PyMem_Free(sj)
- PyMem_Free(dj)
- PyMem_Free(mn)
- return res.base
比较一下,这是在forloop中调用sphj()的Python函数:
- import numpy as np
- def python_sphj(n,x):
- sphj = special.specfun.sphj
- res = np.array([sphj(n,v)[1][n] for v in x])
- return res
以下是10个元素的%timit结果:
- x = np.linspace(1,10)
- r1 = cython_sphj2(4,x)
- r2 = python_sphj(4,x)
- assert np.allclose(r1,r2)
- %timeit cython_sphj2(4,x)
- %timeit python_sphj(4,x)
结果:
- 10000 loops,best of 3: 21.5 µs per loop
- 10000 loops,best of 3: 28.1 µs per loop
以下是100000个元素的结果:
- x = np.linspace(1,100000)
- r1 = cython_sphj2(4,x)
结果:
- 10 loops,best of 3: 44.7 ms per loop
- 1 loops,best of 3: 231 ms per loop