时事热点:spark 集群优化

admin 1个月前 (06-03) 科技 7 1

只有满怀自信的人,能在任何地方都怀有自信,沉醉在生活中,并熟悉自己的意志。

前言

最近公司有一个生产的小集群,专门用于运行spark作业。然则偶然会由于nn或dn压力过大而导致作业checkpoint操作失败进而导致spark 流义务失败。本篇纪录从应用层面临spark作业举行优化,进而到达优化集群的作用。

集群使用情况

有数据的目录以及使用情况如下:

目录 说明 巨细 文件数目 数据数目占比 数据巨细占比
/user/root/.sparkStaging/applicationIdxxx spark义务设置以及所需jar包 5G 约1k 约20% 约100%
/tmp/checkpoint/xxx/{comMits|metadata|offsets|sources} checkpoint文件,其中commits和offsets频仍更改 2M 约4k 约40% 约0%

对于.sparkStaging目录,不经常更改,只需要优化其巨细即可。

对于 checkpoint目录,频仍性增删,从天生周期和保留计谋两方面去思量。

&NBsp;.sparkStaging目录优化

对于/user/hadoop/.sparkStaging下文件,是spark义务依赖文件,可以将jar包上传到指定目录下,制止或削减了jar包的重复上传,进而削减义务的等待时间。

可以在spark的设置文件spark-defaults.conf设置如下内容:

spark.yarn.archive=hdfs://hdfscluster/user/hadoop/jars
spark.yarn.preserve.staging.files=false

参数说明

Property Name Default Meaning
spark.yarn.archive (none)
An archive containing needed Spark jars for distribution to the YARN cache. If set, this configuration replaces spark.yarn.jars and the archive is used in all the application's containers. The archive should contain jar files in its root directory. Like with the previous option, the archive can also be hosted on HDFS to speed up file distribution.
spark.yarn.preserve.staging.files false Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them.

checkpoint优化

首先领会一下 checkpoint文件代表的寄义。

checkpoint文件说明

  • offsets 目录 - 预先纪录日志,纪录每个批次中存在的偏移量。为了确保给定的批次将始终包罗相同的数据,我们在举行任何处置之前将其写入此日志。因此,该日志中的第N个纪录指示当前正在处置的数据,第N-1个条目指示哪些偏移已持久地提交给sink。

  • commits 目录 - 纪录已完成的批次ID的日志。这用于检查批处置是否已完全处置,而且其输出已提交给接收器,因此无需再次处置。(例如)在重新启动过程中使用,以辅助识别接下来要运行的批处置。

  • metadata 文件 - 与整个查询关联的元数据,只有一个 StreamingQuery 唯一ID

  • sources目录 - 保留起始offset信息

下面从两个方面来优化checkpoint。

第一,从触发checkpoint机制方面思量

trigger的机制

Trigger是用于指示 StreamingQuery 多久天生一次效果的计谋。

Trigger有三个实现类,分别为:

  • OneTimeTrigger - A Trigger that processes only one batch of data in a streaming query then terminates the query.

  • ProcessingTime - A trigger that runs a query periodically based on the processing time. If interval is 0, the query will run as fast as possiBLe.by default,trigger is ProcessingTime, and interval=0

  • ContinuousTrigger - A Trigger that continuously processes streaming data, asynchronously checkpointing at the specified interval.

可以为 ProcessingTime 指定一个时间 或者使用 指定时间的ContinuousTrigger ,牢固天生checkpoint的周期,制止checkpoint天生过于频仍,减轻多义务下小集群的nn的压力

 

第二,从checkpoint保留机制思量。

保留机制

spark.sql.streaming.miNBAtchesToRetain - 必须保留并使其可恢复的最小批次数,默以为 100

可以调小保留的batch的次数,好比调小到 20,这样 checkpoint 小文件数目整体可以削减到原来的 20%

checkpoint 参数验证

主要验证trigger机制保留机制

验证trigger机制

未设置trigger效果

未设置trigger前,spark structured streaming 的查询batch提交的周期截图如下:

每一个batch的query义务的提交是毫无周期纪律可寻。

设置trigger代码

trigger效果

设置trigger代码后效果截图如下:

每一个batch的query义务的提交是有纪律可寻的,即每隔5s提交一次代码,即trigger设置生效

注重,若是新闻不能马上被消费,新闻会有积压,structured streaming 现在并无与spark streaming效果等同的背压机制,为防止单批次query查询的数据源数据量过大,制止程序泛起数据倾斜或者无法挽回的OutOfMemory错误,可以通过 MAXOffsetsPerTrigger 参数来设置单个批次允许抓取的最大新闻条数。

使用案例如下:

spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "xxx:9092")
    .option("subscribe", "test-name")
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1)
    .option("group.id", "2")
    .option("auto.offset.reset", "earliest")
    .load()

验证保留机制

默认保留机制效果

spark义务提交参数

#!/bin/bash
spark-submit \
--class zd.Example \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/ \
/root/spark-test-1.0-SNAPSHOT.jar

 

如下图,offsets和commits最终最少各保留100个文件。

时事热点:spark 集群优化 第1张

修改保留计谋

通过修改义务提交参数来进一步修改checkpoint的保留计谋。

添加 --conf spark.sql.streaming.minBatchesToRetain=2 ,完整剧本如下:

#!/bin/bash
spark-submit \
--class zd.Example \
--master yarn \
--deploy-mode client \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3,org.apache.kafka:kafka-clients:2.0.0 \
--repositories http://maven.aliyun.com/nexus/content/groups/public/ \
--conf spark.sql.streaming.minBatchesToRetain=2 \
/root/spark-test-1.0-SNAPSHOT.jar

修改后保留计谋效果

修改后保留计谋截图如下:

时事热点:spark 集群优化 第2张

即 checkpoint的保留计谋参数设置生效

总结

综上,可以通过设置 trigger 来控制每一个batch的query提交的时间距离,可以通过设置checkpoint文件最少保留batch的巨细来削减checkpoint小文件的保留个数。

参照

  1. https://github.com/apache/spark/blob/master/docs/running-on-yarn.md
  2. https://blog.csdn.net/lm709409753/article/details/85250859
  3. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
  4. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
  5. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
  6. https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
  7. https://github.com/apache/spark/blob/v2.4.3/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

 

,

申博Sunbet

申博Sunbet www.cdzmxslvs.com Sunbet是申博娱乐的官方网站。Sunbt官网有你喜欢的Sunbet、申博APP下载、申博娱乐最新网址、申博娱乐网最新网址等。

皇冠体育声明:该文看法仅代表作者自己,与本平台无关。转载请注明:时事热点:spark 集群优化

网友评论

  • (*)

最新评论

  • 欧博代理 2020-06-03 00:24:37 回复

    欧博注册网址www.chengxin11.cn欢迎进入欧博网址(Allbet Gaming),欧博网址开放会员注册、代理开户、电脑客户端下载、苹果安卓下载等业务。文笔太好了吧。

    1

标签列表

    文章归档

    站点信息

    • 文章总数:533
    • 页面总数:0
    • 分类总数:8
    • 标签总数:863
    • 评论总数:169
    • 浏览总数:4654