Spark变量与文件共享方式

2021年2月23日 分类:文件共享 作者:网盘

分布式计算情景下难以避免的会碰到自变量/文件在好几个测算连接点中间的共享文件

共享文件的功效:

(1)降低数据传输:根据共享文件,同一个连接点只需复制传送一份数据信息

(2)降低运行内存耗费:根据共享文件,同一个连接点上的好几个task应用同一份数据信息,不用复制好几个团本

spark中包含下列几类共享文件方法:

1、一般自变量共享文件

下列编码中,“share_variable”自变量会拷到每一个测算连接点上的每一个task中,即:当一个Executor中另外运作了好几个task时,这一自变量会存有好几个团本,task中间不开展共享资源,当信息量很大时,运行内存耗费很大。

# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles

conf = SparkConf().setAppName(‘share-test’).setMaster(‘local[4]’)sc = SparkContext()

# driver中执行的代码share_variable = 0.5

# executor中执行的代码def handle(x): # 通过普通共享方式,获取driver中的变量 return x * share_variable

sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))

sc.stop()

2、广播变量(Broadcast)

广播变量在每个节点只拷贝一份,在Executor启动task之前进行反序列化,同一个Executor的所有task共享同一个广播对象。广播变量在Executor端是只读的,适用于对大数据集进行广播。

# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles

conf = SparkConf().setAppName(‘broadcast-test’).setMaster(‘local[4]’)sc = SparkContext()

# driver中执行的代码:定义广播变量bc_variable = sc.broadcast({‘item’: ‘food’, ‘price’: 5})

# executor中执行的代码def handle(x): # 获取广播变量的值 share_variable = bc_variable.value # 通过普通共享方式,获取driver中的变量 return x * share_variable[‘price’]

sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))

sc.stop()

3、累加器(Accumulator)

累加器是可以在多个task之间进行add操作的一种共享变量,在Executor中只允许修改,不能读取,通常用于指标的统计,可以在Spark的UI中显示。

(图片来自https://www.jianshu.com/p/9a32123af0a6)

# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles

conf = SparkConf().setAppName(‘broadcast-test’).setMaster(‘local[4]’)sc = SparkContext()

# driver中执行的代码:定义累加器acc=sc.accumulator(0)

# executor中执行的代码def handle(x): # 累加器进行add操作 acc.add(1) return x

sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))

sc.stop()

4、文件共享

Spark文件共享方式与广播变量共享类似,每个节点只会拷贝一份数据,供该节点上所有的Executor的所有任务共同使用。

# encoding utf-8from pyspark import SparkConf, SparkContext, SparkFiles

conf = SparkConf().setAppName(‘broadcast-test’).setMaster(‘local[4]’)sc = SparkContext()

# driver中执行的代码:共享文件sc.addFile(“hdfs://path/to/file.txt” )

# executor中执行的代码def handle(x):

# 读取文件 share_file = SparkFiles.get(‘file.txt’) with open(share_file,’r’) as f: print(f.read())

return x

sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))

sc.stop()

5、从hdfs拉取文件

这种方式不推荐用于文件共享,因为每个task需要主动的从分布式文件系统拉取文件,即每个task存在一个副本,网络带宽消耗和内存消耗都较大。但是它有自己的应用场景,当数据按任务切分成不同文件存放在hdfs上时,每个task只需要拉取属于自己的文件。

# encoding utf-8import subprocess

from pyspark import SparkConf, SparkContext, SparkFiles

conf = SparkConf().setAppName(‘broadcast-test’).setMaster(‘local[4]’)sc = SparkContext()

# executor中执行的代码def handle(x): # 从hdfs拉取数据 subprocess.call(“hdfs dfs -get /path/to/file.txt “, shell=True) with open(‘/path/to/file.txt’, ‘r’) as f: print(f.read())

return x

sc.parallelize(range(10), 2).map(lambda x: handle(x)).foreach(lambda x: print(x))

sc.stop()

阅读已结束,喜欢的话就点个赞吧
注册坚果云网盘
还有其他问题,可以咨询小坚果咨询小坚果
继续阅读