分布式计算情景下难以避免的会碰到自变量/文件在好几个测算连接点中间的共享文件。
共享文件的功效:
(1)降低数据传输:根据共享文件,同一个连接点只需复制传送一份数据信息
(2)降低运行内存耗费:根据共享文件,同一个连接点上的好几个task应用同一份数据信息,不用复制好几个团本
spark中包含下列几类共享文件方法:
1、一般自变量共享文件
下列编码中,“share_variable”自变量会拷到每一个测算连接点上的每一个task中,即:当一个Executor中另外运作了好几个task时,这一自变量会存有好几个团本,task中间不开展共享资源,当信息量很大时,运行内存耗费很大。
# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName(‘share-test’).setMaster(‘local[4]’)sc = SparkContext()
# driver中执行的代码share_variable = 0.5
# executor中执行的代码def handle(x): # 通过普通共享方式,获取driver中的变量 return x * share_variable
sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()
2、广播变量(Broadcast)
广播变量在每个节点只拷贝一份,在Executor启动task之前进行反序列化,同一个Executor的所有task共享同一个广播对象。广播变量在Executor端是只读的,适用于对大数据集进行广播。
# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName(‘broadcast-test’).setMaster(‘local[4]’)sc = SparkContext()
# driver中执行的代码:定义广播变量bc_variable = sc.broadcast({‘item’: ‘food’, ‘price’: 5})
# executor中执行的代码def handle(x): # 获取广播变量的值 share_variable = bc_variable.value # 通过普通共享方式,获取driver中的变量 return x * share_variable[‘price’]
sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()
3、累加器(Accumulator)
累加器是可以在多个task之间进行add操作的一种共享变量,在Executor中只允许修改,不能读取,通常用于指标的统计,可以在Spark的UI中显示。
(图片来自https://www.jianshu.com/p/9a32123af0a6)
# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName(‘broadcast-test’).setMaster(‘local[4]’)sc = SparkContext()
# driver中执行的代码:定义累加器acc=sc.accumulator(0)
# executor中执行的代码def handle(x): # 累加器进行add操作 acc.add(1) return x
sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()
4、文件共享
Spark文件共享方式与广播变量共享类似,每个节点只会拷贝一份数据,供该节点上所有的Executor的所有任务共同使用。
# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName(‘broadcast-test’).setMaster(‘local[4]’)sc = SparkContext()
# driver中执行的代码:共享文件sc.addFile(“hdfs://path/to/file.txt” )
# executor中执行的代码def handle(x):
# 读取文件 share_file = SparkFiles.get(‘file.txt’) with open(share_file,’r’) as f: print(f.read())
return x
sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()
5、从hdfs拉取文件
这种方式不推荐用于文件共享,因为每个task需要主动的从分布式文件系统拉取文件,即每个task存在一个副本,网络带宽消耗和内存消耗都较大。但是它有自己的应用场景,当数据按任务切分成不同文件存放在hdfs上时,每个task只需要拉取属于自己的文件。
# encoding utf-8import subprocess
from pyspark import SparkConf, SparkContext, SparkFiles
conf = SparkConf().setAppName(‘broadcast-test’).setMaster(‘local[4]’)sc = SparkContext()
# executor中执行的代码def handle(x): # 从hdfs拉取数据 subprocess.call(“hdfs dfs -get /path/to/file.txt “, shell=True) with open(‘/path/to/file.txt’, ‘r’) as f: print(f.read())
return x
sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))
sc.stop()
评论前必须登录!
注册