开发环境:pycharm2018 + python3.6 + spark2.3 + pyspark2.3 + hadoop2.6****
1.安装jdk1.8以上版本
1.下载并安装jdk1.8,记住安装路径
下载地址:
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
2.配置java环境变量
win7按照以下方式配置,更高系统可直接搜索 环境变量
变量名:JAVA_HOME
变量值:java 安装路径
最后添加将%JAVA_HOME%\bin 目录 添加至path,要用分号隔开
注意:最后在cmd 敲 java -version 出现 java版本号则表示java环境变量配置成功
2.安装spark
下载路径:https://archive.apache.org/dist/spark/
下载对应hadoop版本的.taz包,直接解压到自定义路径
然后配置spark环境变量, 与配置JAVA_HOME 一样,只需新增 SPARK_HOME, 并添加到path
最后spark配置成功,在cmd 输入 spark-shell,出现以下界面,说明配置 spark 成功,如果报错,详见第三步 hadoop 安装
3.安装hadoop
详见:https://blog.csdn.net/u011513853/article/details/52865076
有详细的下载与安装,包括配置环境变量,至此 spark-shell 运行成功,就开始下一步
4.安装 pyspark 模块
pip install pyspark==2.3.2
如果是IDE为pycharm,直接搜索 install 即可
5.代码测试pyspark是否可用
import numpy as np
from pyspark import SparkContext
from pyspark import SparkConf
import warnings
warnings.filterwarnings('ignore')
import os
# 此处为python对应路径
os.environ['PYSPARK_PYTHON']='D:\develop\python\\anaconda\python.exe'
# 此处为spark安装路径
os.environ['SPARK_HOME']="D:\develop\spark\spark-2.3.2-bin-hadoop2.6"
# 设置配置参数
conf = SparkConf().setAppName('myfirstpyspark').setMaster('local[*]')
sc = SparkContext.getOrCreate(conf)
array1 = np.array([
[1,2,1,1,1],
[2,2,1,2,2],
[3,3,3,1,3],
[4,4,4,4,4],
[5,5,5,5,5],
[6,6,6,6,6]
])
# 将本地数组并行化为RDD
rdd = sc.parallelize(array1)
print(rdd.getNumPartitions())
print(rdd.glom().collect())
# sys.exit(2)
def square(x):
y = x**2
return y
def filter_(x):
x = x[x!=1]
return x
# 验证lambda函数
square_rdd = rdd.map(lambda x: square(x))
print(square_rdd.glom().collect())
filter_rdd = rdd.map(lambda x: filter_(x))
# 将RDD collect 到本地,触发spark 提交 job
print(filter_rdd.glom().collect())
# 最后如果能打印出数组,则证明pyspark环境配置成功
6. 使用pyspark本地测试连接Hive
说明:由于要连接hive,所以本地的spark要需要做一些连接hive的相关配置
1. 将服务器上 hive 的配置文件hive-site.xml 文件拷贝到本地 %SPARK_HOME%\conf 下,使得spark能连接hive元数据库
2. 将 mysql-connector-java-5.1.40.jar拷贝到 %SPARK_HOME%\jars 下(spark1.X 是 %SPARK_HOME%\lib),因为笔者的hive元数据库为mysql,所以需要驱动
3. hive-site.xml示例
<configuration>
<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files -->
<!-- that are implied by Hadoop setup variables. -->
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive -->
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
<!-- resource). -->
<!-- Hive Execution Parameters -->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://localhost:3306/hive?characterEncoding=utf-8</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
</configuration>
示例2
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://dev04:9083</value>
</property>
<property>
<name>hive.querylog.location</name>
<value>/tmp/hive/querylog</value>
<description>Location of Hive run time structured log file</description>
</property>
<property>
<name>hive.exec.scratchdir</name>
<value>/tmp/hive</value>
<description>HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with ${hive.scratch.dir.permission}.</description>
</property>
</configuration>
4.pyspark操作hive代码示例
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import HiveContext
import warnings
warnings.filterwarnings('ignore')
import os
# 配置python和spark的路径
os.environ['PYSPARK_PYTHON']='D:\develop\python\\anaconda\python.exe'
os.environ['SPARK_HOME']="D:\develop\spark\spark-2.3.2-bin-hadoop2.6"
# 配置参数
conf = SparkConf().setAppName('myfirstpyspark').setMaster('local[*]')
sc = SparkContext(conf = conf)
sqlContext = HiveContext(sc)
# 读取表数据 test.test_table, 其中 test是库名,test_table 为表名
df=sqlContext.table(tableName='test.test_table')
# 注册临时表
df.registerTempTable('test_table')
# sql查询临时表
my_dataframe = sqlContext.sql("select * from test_table limit 10")
my_dataframe.show()
sc.stop()
# 如果能打印出表数据,那么就说明你成功了
网友评论