1. 写在前面
上个月做了一个SparkStreaming的小项目,数据量很小,功能也比较单一,从Kafka读取数据经过简单处理,将最终的数据落地到Mysql中。虽然项目小,但这是我第一个SparkStreaming项目,通过这个简单实践,让我对SparkStreaming应用和Scala编程都有了一个更深的认识。这个过程中,遇到一些问题,也引发了一些思考,记录于此,算是对这个项目的简单总结吧,保密起见,文中代码都是网上找的一些示例代码,不涉及项目实际代码。
2. SparkStreaming读取Kafka数据
2.1 Kafka High Level Consumer方式
从Kafka读取数据有两种方式,第一种方式是使用High Level Consumer API接口,该接口中除了有Kafka Topic的概念,还包含Consumer Group的概念,每个Topic的每个Group的Offset都保存在Zookeeper,由ZK负责管理Offset。用户通过访问Zookeeper来获得Offset,从而进行数据读取,该模式下可以实现数据的广播分发和单次分发等功能。该模式的优势在于Offset交由Zookeeper管理,用户代码实现简单,只需设置好Topic和Group(若Group不存在,Zookeeper会自动新建),与Zookeeper通讯即可,不需要关系Offset。缺点在于容易导致数据丢失或数据重复问题。同时需要注意的是,同一个Topic和Group的每个Partion中数据只能由一个Consumer消费,Kafka针对这个问题有一个Consumer Rebalance功能,可以根据Cousmer与Partion的数量关系,进行不同的数据分配机制,具体可以在网上查询相关资料。
2.2 Kafka Low Level Consumer方式
从Kafka读取数据的第二种方式为使用Low Level Consumer API,虽然是Low Level,但该API赋予了用户更多的权限,用户直接管理Topic的Offset,不存在Group的概念。由于Kafka不会删除数据,所有该方式下的优势即为读取效率更高,不会出现数据丢失和重复的问题,缺点是需要用户代码完成Topic Offset的管理与备份,实现较为麻烦。
2.3 SparkStreaming 读取Kafka数据
SparkStreaming读取Kafka数据也根据High Level和Low Level提供了两种读取方式。
Receiver-based Approach采用High Level,只需指定Zookeeper Quorum、Group以及Map(topic->consumerNum)即可,该方式优点是实现简单方便,可以实现At Least Once,SparkStreaming会有多个Receiver作为Consumer消费Kafka Topic中数据,并将数据放入Executor内存中,SparkStreaming的Partion与Kafka的Partion无关,不存在对应关系。该方式的缺点是为了保证数据不会在SparkStreaming异常时丢失,需要启动SparkStreaming的CheckoutPoint机制,即WAL机制,这样会造成数据在Kafka和Spark出现重复保存,浪费磁盘空间,同时读取效率也会降低。1
2
3
4import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
Direct Approach采用Low Level,该方式下SparkStreaming的Partion与Kafak的Partion一一对应,SparkStreaming读取数据效率会非常快,且SparkStreaming自主管理Kafka的Offset,可以实现Exactly-once。但为了保证SparkStreaming在异常时不会丢失内存中的Kafka Offset,需要用户自己实现Offset的容错,常见的是用户将Offset同步到Zookeeper中备份。1
2
3
4
5import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
本项目中,由于数据量较小,同时考虑到实现的简便性,我们采用了SparkStreaming Receiver-based Approach方式,并添加了CheckoutPoint机制,用户可以通过配置文件conf.properties选择是否开启CheckoutPoint机制。
3. SparkStreaming CheckoutPoint机制
SparkStreaming的CheckoutPoint机制是为了保证数据不丢失将Streaming中每批RDD的数据保存到HDFS中,当该批次RDD处理完成后再从HDFS中删除的机制,类似WAL机制。因此CheckoutPoint机制需要在创建StreamingContext时调用 StreamingContext.getOrCreate(path,creatingFunc)方法,该方法在Driver程序启动并创建StreamingContext时首先从指定的HDFS路径中判断是否存在备份数据,若存在则从备份数据中重启StreamingContext,若不存在则新建StreamingContext。因此再使用CheckoutPoint时务必设置好HDFS的文件路径,否则程序会报异常。
另一方面,Yarn集群中默认Application失败重启的次数为2,所以SparkStreaming再失败从CheckoutPoint重启时,只能重启一次,为了增加重启次数,需要修改Yarn的配置mapreduce.am.max-attempts
MR ApplicationMaster最大失败尝试次数,默认为2,我们可以根据实际情况增大该参数。
4. SparkStreaming 对象序列化问题
在SparkStreaming程序中我们使用到了JDBC连接池以及Logger日志打印等功能,Spark官网中建议对JDBC连接池实例进行lazy懒惰创建处理,而且实际应用中Logger的实例也需要加入@transient注解,确保Logger在序列化时不会被序列化。
针对这个问题,我的猜想是若程序中,未在Drvier中使用而是在分布式算子中使用了JDBC连接池,则为了保证JDBC不会在Drvier实例化,需要在分布式算子中对JDBC连接池实例化代码加入lazy关键字,保证每个Executor中新建一个JDBC连接池实例。
Logger由于我们在Driver和分布式算子中都使用到了,因此Drvier会将进程中的Driver的Logger对象进行序列化,并传输给各Executors,然而Logger是无法序列化的,程序会出错。解决方法就是在Logger字段中加入@transient注解,指明该字段在序列化时不进行序列化。这样即使在Driver中使用实例化Logger对象,在序列化时Logger对象也不会序列化进去,Executors中仍然会新建一个Logger对象。
序列化问题的根本原因是:在使用到分布式算子时,Diver会将进程内存中的对象序列化并传输到各Executors中,若有些类无法序列化,则这个序列化过程中会报序列化异常,为了避免该情况的发生,则需要lazy关键字和@transient注解的配合,若类只在Executors中使用,则加入lazy保证只在Executor中实例化即可。若类在Driver和Executors中均被使用,则需要加入@transient注解和lazy关键字,保证该类实例在Executor中新建,而不使用Driver中的实例。
分布式计算由于涉及到在各分布式节点中传输数据和程序,因此会出现一系列问题,在实际应用中尤其要注意这点。
5. Spark在Yarn集群的内存分配问题
当我们将程序编译成jar包使用spark-submit向Yarn集群提交作业后,在Yarn ResourceManager Web UI上发现,任务所占用的内存远比我们设定的要大,比如我们设定--drvier-memory 1G --executor-memory 1G --num-executors 2
正常来讲,Yarn只需要分配3G内存即可,但Yarn集群实际分配内存为6G,我一度怀疑是自己编写的代码存在Bug,但查询资料发现是Yarn资源分配机制的原因所造成的。
在设定完drvier-memory和executor-memory以后,Yarn还需要为driver和每个executor分配额外内存用于运行JVM进程,因此总内存较指定的分配内存要大,同时Yarn集群设置了Container最小分配内存,这就造成了上述明明只需3G,却分配了6G内存的问题。
spark.driver.memory:默认值512m
spark.executor.memory:默认值512m
spark.yarn.am.memory:默认值512m
spark.yarn.executor.memoryOverhead:值为executorMemory * 0.07, with minimum of 384
spark.yarn.driver.memoryOverhead:值为driverMemory * 0.07, with minimum of 384
spark.yarn.am.memoryOverhead:值为AM memory * 0.07, with minimum of 384
–executor-memory/spark.executor.memory 控制 executor 的堆的大小,但是 JVM 本身也会占用一定的堆空间,比如内部的 String 或者直接 byte buffer,spark.yarn.XXX.memoryOverhead属性决定向 YARN 请求的每个 executor 或dirver或am 的额外堆内存大小,默认值为 max(384, 0.07 * spark.executor.memory)yarn.app.mapreduce.am.resource.mb:AM能够申请的最大内存,默认值为1536MB
yarn.nodemanager.resource.memory-mb:nodemanager能够申请的最大内存,默认值为8192MB
yarn.scheduler.minimum-allocation-mb:调度时一个container能够申请的最小资源,默认值为1024MB
yarn.scheduler.maximum-allocation-mb:调度时一个container能够申请的最大资源,默认值为8192MB
关于这个话题的具体信息可以参考Spark on Yarn的内存分配问题和Spark配置参数6. c3p0 JDBC连接 8小时失效问题
该项目中需要在SparkStreaming程序中将数据写入Mysql,采用了c3p0 JDBC连接池的方式。一开始由于测试数据较少,一天都可能没有一条数据,所以出现了JDBC连接池中的Connection连接在第二天失效的问题,即使用前一天建立的Connection向Mysql写入数据时报出异常,提示连接失效。上网查询资料后发现这是Mysql的连接8小时自动失效导致的问题。只需对c3p0稍加配置即可。idleConnectionTestPeriod = 3600
每60秒检查所有连接池中的空闲连接testConnectionOnCheckout = false
因性能消耗大请只在需要的时候使用它。如果设为true那么在每个connection提交的时候都将校验其有效性。建议使用idleConnectionTestPeriod或automaticTestTable等方法来提升连接测试的性能。testConnectionOnCheckin = true
如果设为true那么在取得连接的同时将校验连接的有效性。Default: false
关于c3p0详细配置可以参考c3p0详细配置7.总结
通过这个项目,更加体会到了只看书学理论是远远不能掌握所有要点的,且容易忘记,只有结合项目实践,从实践中体会书中所提到的理论知识,才能真正的将知识点化为自身内在的东西。