1. 需求描述
在Centos6系统上安装Hadoop、Spark集群,并使用TensorFlowOnSpark的 YARN运行模式下执行TensorFlow的代码。(最好可以在不联网的集群中进行配置并运行)
2. 系统环境(拓扑)
操作系统:Centos6.5 Final ; Hadoop:2.7.4 ; Spark:1.5.1-Hadoop2.6; TensorFlow 1.3.0;TensorFlowOnSpark (github最新下载);Python:2.7.12;
s0.centos.com: memory:1.5G namenode/resourcemanager ; 1核<property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>2</value> </property>
3. 参考
https://blog.abysm.org/2016/06/building-tensorflow-centos-6/: Centos6 build TensorFlow
TensorFlow github wiki :https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_YARN ; installTensorFlowOnSpark ;
TensorFlow github wiki: https://github.com/yahoo/TensorFlowOnSpark/wiki/Conversion-Guide ;conversionTensorFlow code ;
4. 步骤
1.安装devtoolset-6 及Python:
安装repo库: yum install -y centos-release-scl 安装 devtoolset: yum install -y devtoolset-6
安装Python:
yum install python27 python27-numpy python27-python-devel python27-python-wheel安装一些常用包:
yum install –y vim zip unzip openssh-clients
2.下载bazel,这里下载的是0.5.1(虽然也下载了0.4.X的版本,下载包难下)
先执行: export CC=/opt/rh/devtoolset-6/root/usr/bin/gcc 接着进入编译环境: scl enable devtoolset-6 python27 bash 接着以此执行: unzip bazel-0.5.1-dist.zip -d bazel-0.5.1-dist cd bazel-0.5.1-dist # compile ./compile.sh # install mkdir -p ~/bin cp output/bazel ~/bin/ exit //退出scl环境 // 耗时较久
3.下载TensorFlow1.3.0源码并解压
4.进入tensorflow-1.3.0 ,修改tensorflow/tensorflow.bzl文件中的tf_extension_linkopts函数如下形式:(添加一个-lrt)
def tf_extension_linkopts(): return ["-lrt"] # No extension link opts
5.编译安装TensorFlow:
安装基本软件: yum install –y patch 接着,进入编译环境: scl enable devtoolset-6 python27 bash cd tensorflow-1.3.0 ./configure # build ~/bin/bazel build --config=opt //tensorflow/tools/pip_package:build_pip_package bazel-bin/tensorflow/tools/pip_package/build_pip_package /tmp/tensorflow_pkg exit // 退出编译环境 // 耗时同样很久,同样使用bazel0.4.X的版本编译TensorFlow1.3提示版本过低
编译后在/tmp/tensorflow_pkg则会生成一个TensorFlow的 安装包 ,并且是属于当前系统也就是Centos系统的安装包;
6.安装Python自定义包(保持在联网状态下);
由于想在未联网的情况下使用TensorFlow以及TensorFlowOnSpark,所以参考TensorFlowOnSpark github WIKI,直接编译一个Python包,并且把TensorFlow、TensorFlowOnSpark及其他常用module安装在这个Python包中,后面就可以直接把这个包上传到HDFS,使得各个子节点都可以共享共同一个Python.zip包的环境变量。
export PYTHON_ROOT=~/Python // 设置环境变量,并下载Python curl -O https://www.python.org/ftp/python/2.7.12/Python-2.7.12.tgz tar -xvf Python-2.7.12.tgz
编译并安装Python:
pushd Python-2.7.12 ./configure --prefix="${PYTHON_ROOT}" --enable-unicode=ucs4 make make install popd
安装Pip:
pushd "${PYTHON_ROOT}" curl -O https://bootstrap.pypa.io/get-pip.py bin/python get-pip.py popd
安装TensorFlow:
pushd "${PYTHON_ROOT}" bin/pip install /tmp/tensorflow_pkg/tensorflow-1.3.0-cp27-none-linux_x86_64.whl popd
在安装TensorFlow的时候会自动安装诸如 numpy等常用Python包;
安装TensorFlowOnSpark:pushd "${PYTHON_ROOT}" bin/pip install tensorflowonspark popd
把“武装”好的Python打包并上传到HDFS:
pushd "${PYTHON_ROOT}" zip -r Python.zip * popd hadoop fs -put ${PYTHON_ROOT}/Python.zip
现在就可以使用TensorFlow了;
7. 修改TensorFlow代码,比如下面的TensorFlow代码是可以在TensorFlow环境中运行的:
# from __future__ import absolute_import # from __future__ import division # from __future__ import print_function import numpy as np import tensorflow as tf X_FEATURE = 'x' # Name of the input feature. train_percent = 0.8 def load_data(data_file_name): data = np.loadtxt(open(data_file_name),delimiter=",",skiprows=0) return data def data_selection(iris,train_per): data,target = np.hsplit(iris[np.random.permutation(iris.shape[0])],np.array([-1])) row_split_index = int(data.shape[0] * train_per) x_train,x_test = (data[1:row_split_index],data[row_split_index:]) y_train,y_test = (target[1:row_split_index],target[row_split_index:]) return x_train,x_test,y_train.astype(int),y_test.astype(int) def run(): # Load dataset. data_file = 'iris01.csv' iris = load_data(data_file) # x_train,y_train,y_test = model_selection.train_test_split( # iris.data,iris.target,test_size=0.2,random_state=42) x_train,y_test = data_selection(iris,train_percent) # print(x_test) # print(y_test) # # # Build 3 layer DNN with 10,20,10 units respectively. feature_columns = [ tf.feature_column.numeric_column( X_FEATURE,shape=np.array(x_train).shape[1:])] classifier = tf.estimator.DNNClassifier( feature_columns=feature_columns,hidden_units=[10,10],n_classes=3) # # # Train. train_input_fn = tf.estimator.inputs.numpy_input_fn( x={X_FEATURE: x_train},y=y_train,num_epochs=None,shuffle=True) classifier.train(input_fn=train_input_fn,steps=200) # # # Predict. test_input_fn = tf.estimator.inputs.numpy_input_fn( x={X_FEATURE: x_test},y=y_test,num_epochs=1,shuffle=False) predictions = classifier.predict(input_fn=test_input_fn) y_predicted = np.array(list(p['class_ids'] for p in predictions)) y_predicted = y_predicted.reshape(np.array(y_test).shape) # # # # # score with sklearn. # score = metrics.accuracy_score(y_test,y_predicted) # print('Accuracy (sklearn): {0:f}'.format(score)) print(np.concatenate(( y_predicted,y_test),axis= 1)) # score with tensorflow. scores = classifier.evaluate(input_fn=test_input_fn) print('Accuracy (tensorflow): {0:f}'.format(scores['accuracy'])) print(classifier.params) if __name__ == '__main__': run()
其中iris01.csv 数据如下:
5.1,3.5,1.4,0.2,0 4.9,3.0,0 4.7,3.2,1.3,0 4.6,3.1,1.5,0 5.0,3.6,0 5.4,3.9,1.7,0.4,3.4,0.3,0 4.4,2.9,0.1,3.7,0 4.8,1.6,0 4.3,1.1,0 5.8,4.0,1.2,0 5.7,4.4,0 5.1,3.8,1.0,3.3,0.5,1.9,0 5.2,4.1,0 5.5,4.2,0 4.5,2.3,0.6,0 5.3,0 7.0,4.7,1 6.4,4.5,1 6.9,4.9,1 5.5,1 6.5,2.8,4.6,1 5.7,1 6.3,1 4.9,2.4,1 6.6,1 5.2,2.7,1 5.0,2.0,1 5.9,1 6.0,2.2,1 6.1,1 5.6,1 6.7,1 5.8,1 6.2,2.5,4.8,1.8,4.3,1 6.8,5.0,2.6,5.1,1 5.4,1 5.1,6.0,2 5.8,2 7.1,5.9,2.1,2 6.3,5.6,2 6.5,5.8,2 7.6,6.6,2 4.9,2 7.3,6.3,2 6.7,2 7.2,6.1,2 6.4,5.3,2 6.8,5.5,2 5.7,2 7.7,6.7,6.9,2 6.0,2 6.9,5.7,2 5.6,2 6.2,2 6.1,2 7.4,2 7.9,6.4,5.4,5.2,2 5.9,2
1). 导入必要的包:
from pyspark.context import SparkContext from pyspark.conf import SparkConf from tensorflowonspark import TFCluster,TFNode #from com.yahoo.ml.tf import TFCluster,TFNode from datetime import datetime
这里要注意,导入TFCluster的时候,不要参考官网的导入方式,而应该从tensorflowonspark导入;
2.) 修改main函数,比如我这里的函数run,只需要添加两个参数即可:(argv,cxt)
3) 把原来的main函数调用,替换成下面的调用方式 ,比如我这里原来只需要在main函数执行run即可,这里需要调用TFCluster.run,并且把我的run函数传递给第二个参数值:
sc = SparkContext(conf=SparkConf().setAppName("your_app_name")) num_executors = int(sc._conf.get("spark.executor.instances")) num_ps = 1 tensorboard = True cluster = TFCluster.run(sc,run,sys.argv,num_executors,num_ps,tensorboard,TFCluster.InputMode.TENSORFLOW) cluster.shutdown()
然后就可以运行了,修改后的代码如下:
# from __future__ import absolute_import # from __future__ import division # from __future__ import print_function from pyspark.context import SparkContext from pyspark.conf import SparkConf from tensorflowonspark import TFCluster,TFNode from datetime import datetime import numpy as np import sys # from sklearn import metrics # from sklearn import model_selection import tensorflow as tf X_FEATURE = 'x' # Name of the input feature. train_percent = 0.8 def load_data(data_file_name): data = np.loadtxt(open(data_file_name),y_test.astype(int) def map_run(argv,ctx): # Load dataset. data_file = 'iris01.csv' iris = load_data(data_file) # x_train,axis= 1)) # score with tensorflow. scores = classifier.evaluate(input_fn=test_input_fn) print('Accuracy (tensorflow): {0:f}'.format(scores['accuracy'])) print(classifier.params) if __name__ == '__main__': import tensorflow as tf import sys sc = SparkContext(conf=SparkConf().setAppName("your_app_name")) num_executors = int(sc._conf.get("spark.executor.instances")) num_ps = 1 tensorboard = False cluster = TFCluster.run(sc,map_run,TFCluster.InputMode.TENSORFLOW) cluster.shutdown()
7. 设置环境变量,并运行:
1)上传iris01.csv到HDFS: hdfs dfs -put iris01.csv
2) 设置环境变量:
export PYTHON_ROOT=./Python export LD_LIBRARY_PATH=${PATH} export PYSPARK_PYTHON=${PYTHON_ROOT}/bin/python export SPARK_YARN_USER_ENV="PYSPARK_PYTHON=Python/bin/python" export PATH=${PYTHON_ROOT}/bin/:$PATH #export QUEUE=gpu # set paths to libjvm.so,libhdfs.so,and libcuda*.so #export LIB_HDFS=/opt/cloudera/parcels/CDH/lib64 # for CDH (per @wangyum) export LIB_HDFS=$HADOOP_PREFIX/lib/native export LIB_JVM=$JAVA_HOME/jre/lib/amd64/server #export LIB_CUDA=/usr/local/cuda-7.5/lib64 # for cpu mode: export QUEUE=default
3) 调用代码:
/usr/local/spark-1.5.1-bin-hadoop2.6/bin/spark-submit --master yarn --deploy-mode cluster --num-executors 3 --executor-memory 1024m --archives hdfs://s0:8020/user/root/Python.zip#Python,/root/iris01.csv /root/iris_c.py
4) 查看yarn日志,可以看到执行成功;
5. 问题及解决
File "iris_c.py",line 6,in <module>
from com.yahoo.ml.tf import TFCluster,TFNode
ImportError: No module named com.yahoo.ml.tf