You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
objectStreamingApp {
defmain(args: Array[String]):Unit= {
valparams=newParamsUtil(args)
require(params.hasParam("streaming.name"), "Application name should be set")
PlatformManager.getOrCreate.run(params)
}
}
原文:http://zqhxuyuan.github.io/2017/09/04/2017-09-04-StreamingPro/
StreamingPro支持Spark、SparkStreaming、SparkStruncture、Flink。入口类都是统一的
StreamingApp
。通过streaming.platform可以指定不同的运行平台。当然,不同的运行引擎的jar包也不同。
jar包会被用来加载不同的Runtime。Runtime运行的映射关系定义在
PlatformManager
的platformNameMapping
变量中。Runtime是一个接口,最主要的是startRuntime方法和params方法。后面我们把Runtime叫做执行引擎。
StreamingPro本质上还是通过spark-submit运行。框架的整体运行流程在
PlatformManager
的run
方法中。主要的步骤有:注意:StreamingPro的Runtime只是Spark作业的执行引擎,具体根据配置文件加载策略是ServiceframeworkDispatcher的工作。
假设我们定义了下面的一个配置文件,由于采用了shortName,需要定义一个ShortNameMapping
DefaultShortNameMapping的定义如下。这样配置文件中的spark就和ServiceframeworkDispatcher的加载过程对应起来了。
ServiceframeworkDispatcher的核心是StrategyDispatcher,这个类在创建的时候,会读取配置文件。
然后解析配置文件中的strategy、algorithm(processor)、ref、compositor、configParams等配置项,并构造对应的对象。
ServiceframeworkDispatcher是一个模块组合框架,它主要定义了Compositor、Processor、Strategy三个接口。
Strategy接口包含了processor、ref、compositor,以及初始化和result方法。
Strategy策略的初始化需要算法、引用、组合器,以及配置信息,对应的方法是StrategyDispatcher的createStrategy方法。
注意下面的initialize方法,createAlgorithms和createCompositors初始化时
会读取params配置,这是一个嵌套了Map的列表:
JList[JMap[String, Any]]
。ServiceframeworkDispatcher的核心是StrategyDispatcher,而StrategyDispatcher的核心是其dispatch方法。
不同执行引擎的启动方法实现不同:
但真正执行StreamingPro主流程在streamingpro-commons下的SparkStreamingStrategy类。
注意:如果是spark-1.6,则streamingpro-spark下也有一个SparkStreamingStrategy类。
注意:配置文件中每个Job都有一个
strategy
级别的configParams
,ref
也会使用这个全局的configParams
。它是一个
Map[String, Any]
的结构。每个Compositor和Processor内部也有一个params
配置,这是一个数组。接下来以读取多个数据源的Compositor实现类为例:
_configParams
是在创建Compositor时初始化调用的,这是一个List[Map[String, Any]]
的结构,对应了params
列表配置outputTable
为了支持配置的动态替换,
_cfg
参数会做一些处理,比如上面的s"streaming.sql.source.${name}.${f._1}"
如果需要被替换,则会被替换为f._2
。下表列举了StreamingPro支持的几种替换方式。
streaming.sql.source.[name].[参数]
streaming.sql.out.[name].[参数]
streaming.sql.params.[param-name]
假设有两个数据输入源和一个输出目标的配置如下:
Source的功能是:读取输入源形成DataFrame,然后创建临时表。其他组件比如SQL也是类似的。至此StreamingPro的大致流程就分析完了。
The text was updated successfully, but these errors were encountered: